From 50575a8eebccbcab9f01cdc06c87de98133f4309 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 12 Oct 2023 23:25:00 +0200 Subject: [PATCH 1/6] Add Snapshot logic and Summary generation --- pyiceberg/table/snapshots.py | 153 +++++++++++++++++++++++++++++- tests/table/test_snapshots.py | 171 +++++++++++++++++++++++++++++++++- 2 files changed, 321 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index fe828db029..05e28ace78 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -19,13 +19,14 @@ Any, Dict, List, + Mapping, Optional, ) from pydantic import Field, PrivateAttr, model_serializer from pyiceberg.io import FileIO -from pyiceberg.manifest import ManifestFile, read_manifest_list +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, read_manifest_list from pyiceberg.typedef import IcebergBaseModel OPERATION = "operation" @@ -51,7 +52,7 @@ def __repr__(self) -> str: return f"Operation.{self.name}" -class Summary(IcebergBaseModel): +class Summary(IcebergBaseModel, Mapping[str, str]): """A class that stores the summary information for a Snapshot. The snapshot summary’s operation field is used by some operations, @@ -65,6 +66,25 @@ def __init__(self, operation: Operation, **data: Any) -> None: super().__init__(operation=operation, **data) self._additional_properties = data + def __getitem__(self, __key: str) -> Optional[Any]: # type: ignore + """Return a key as it is a map.""" + if __key == 'operation': + return self.operation + else: + return self._additional_properties.get(__key) + + def __setitem__(self, key: str, value: Any) -> None: + """Set a key as it is a map.""" + if key == 'operation': + self.operation = value + else: + self._additional_properties[key] = value + + def __len__(self) -> int: + """Return the number of keys in the summary.""" + # Operation is required + return 1 + len(self._additional_properties) + @model_serializer def ser_model(self) -> Dict[str, str]: return { @@ -116,3 +136,132 @@ class MetadataLogEntry(IcebergBaseModel): class SnapshotLogEntry(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") timestamp_ms: int = Field(alias="timestamp-ms") + + +class SnapshotSummaryCollector: + added_size: int + removed_size: int + added_files: int + removed_files: int + added_eq_delete_files: int + removed_eq_delete_files: int + added_pos_delete_files: int + removed_pos_delete_files: int + added_delete_files: int + removed_delete_files: int + added_records: int + deleted_records: int + added_pos_deletes: int + removed_pos_deletes: int + added_eq_deletes: int + removed_eq_deletes: int + + def __init__(self) -> None: + self.added_size = 0 + self.removed_size = 0 + self.added_files = 0 + self.removed_files = 0 + self.added_eq_delete_files = 0 + self.removed_eq_delete_files = 0 + self.added_pos_delete_files = 0 + self.removed_pos_delete_files = 0 + self.added_delete_files = 0 + self.removed_delete_files = 0 + self.added_records = 0 + self.deleted_records = 0 + self.added_pos_deletes = 0 + self.removed_pos_deletes = 0 + self.added_eq_deletes = 0 + self.removed_eq_deletes = 0 + + def add_file(self, data_file: DataFile) -> None: + if data_file.content == DataFileContent.DATA: + self.added_files += 1 + self.added_records += data_file.record_count + elif data_file.content == DataFileContent.POSITION_DELETES: + self.added_delete_files += 1 + self.added_pos_delete_files += 1 + self.added_pos_deletes += data_file.record_count + elif data_file.content == DataFileContent.EQUALITY_DELETES: + self.added_delete_files += 1 + self.added_eq_delete_files += 1 + self.added_eq_deletes += data_file.record_count + else: + raise ValueError(f"Unknown data file content: {data_file.content}") + + def removed_file(self, data_file: DataFile) -> None: + if data_file.content == DataFileContent.DATA: + self.removed_files += 1 + self.deleted_records += data_file.record_count + elif data_file.content == DataFileContent.POSITION_DELETES: + self.removed_delete_files += 1 + self.removed_pos_delete_files += 1 + self.removed_pos_deletes += data_file.record_count + elif data_file.content == DataFileContent.EQUALITY_DELETES: + self.removed_delete_files += 1 + self.removed_eq_delete_files += 1 + self.removed_eq_deletes += data_file.record_count + else: + raise ValueError(f"Unknown data file content: {data_file.content}") + + def added_manifest(self, manifest: ManifestFile) -> None: + if manifest.content == ManifestContent.DATA: + self.added_files += manifest.added_files_count or 0 + self.added_records += manifest.added_rows_count or 0 + self.removed_files += manifest.deleted_files_count or 0 + self.deleted_records += manifest.deleted_rows_count or 0 + elif manifest.content == ManifestContent.DELETES: + self.added_delete_files += manifest.added_files_count or 0 + self.removed_delete_files += manifest.deleted_files_count or 0 + else: + raise ValueError(f"Unknown manifest file content: {manifest.content}") + + def build(self) -> Dict[str, str]: + def set_non_zero(properties: Dict[str, str], num: int, property_name: str) -> None: + if num > 0: + properties[property_name] = str(num) + + properties: Dict[str, str] = {} + set_non_zero(properties, self.added_size, 'added-files-size') + set_non_zero(properties, self.removed_size, 'removed-files-size') + set_non_zero(properties, self.added_files, 'added-data-files') + set_non_zero(properties, self.removed_files, 'removed-data-files') + set_non_zero(properties, self.added_eq_delete_files, 'added-equality-delete-files') + set_non_zero(properties, self.removed_eq_delete_files, 'removed-equality-delete-files') + set_non_zero(properties, self.added_pos_delete_files, 'added-position-delete-files') + set_non_zero(properties, self.removed_pos_delete_files, 'removed-position-delete-files') + set_non_zero(properties, self.added_delete_files, 'added-delete-files') + set_non_zero(properties, self.removed_delete_files, 'removed-delete-files') + set_non_zero(properties, self.added_records, 'added-records') + set_non_zero(properties, self.deleted_records, 'deleted-records') + set_non_zero(properties, self.added_pos_deletes, 'added-position-deletes') + set_non_zero(properties, self.removed_pos_deletes, 'removed-position-deletes') + set_non_zero(properties, self.added_eq_deletes, 'added-equality-deletes') + set_non_zero(properties, self.removed_eq_deletes, 'removed-equality-deletes') + + return properties + + +def merge_snapshot_summaries(previous_summary: Optional[Mapping[str, str]], summary: Summary) -> Dict[str, str]: + properties = ['records', 'files-size', 'data-files', 'delete-files', 'position-deletes', 'equality-deletes'] + + if not previous_summary: + previous_summary = {f'total-{prop}': '0' for prop in properties} + + def _update_totals(total_property: str, added_property: str, removed_property: str) -> None: + if new_total_str := previous_summary.get(total_property): + new_total = int(new_total_str) + if new_total >= 0 and (added := summary.get(added_property)): + new_total += int(added) + if new_total >= 0 and (removed := summary.get(removed_property)): + new_total -= int(removed) + if new_total >= 0: + summary[total_property] = str(new_total) + + for prop in properties: + _update_totals( + total_property=f'total-{prop}', + added_property=f'added-{prop}', + removed_property=f'deleted-{prop}', + ) + return summary diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 625cbc1b6c..4b2ddd999e 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -17,7 +17,8 @@ # pylint:disable=redefined-outer-name,eval-used import pytest -from pyiceberg.table.snapshots import Operation, Snapshot, Summary +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile +from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, merge_snapshot_summaries @pytest.fixture @@ -120,3 +121,171 @@ def test_snapshot_with_properties_repr(snapshot_with_properties: Snapshot) -> No == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3)""" ) assert snapshot_with_properties == eval(repr(snapshot_with_properties)) + + +@pytest.fixture +def manifest_file() -> ManifestFile: + return ManifestFile( + content=ManifestContent.DATA, + manifest_length=100, + added_files_count=1, + existing_files_count=2, + deleted_files_count=3, + added_rows_count=100, + existing_rows_count=110, + deleted_rows_count=120, + ) + + +@pytest.fixture +def data_file() -> DataFile: + return DataFile( + content=DataFileContent.DATA, + record_count=100, + file_size_in_bytes=1234, + ) + + +def test_snapshot_summary_collector(manifest_file: ManifestFile, data_file: DataFile) -> None: + ssc = SnapshotSummaryCollector() + + assert ssc.build() == {} + + ssc.added_manifest(manifest_file) + + assert ssc.build() == {'added-data-files': '1', 'added-records': '100', 'deleted-records': '120', 'removed-data-files': '3'} + + ssc.add_file(data_file) + + assert ssc.build() == {'added-data-files': '2', 'added-records': '200', 'deleted-records': '120', 'removed-data-files': '3'} + + +def test_merge_snapshot_summaries_empty() -> None: + assert merge_snapshot_summaries(None, Summary(operation=Operation.APPEND)) == Summary( + Operation.APPEND, + **{ + 'total-records': '0', + 'total-files-size': '0', + 'total-data-files': '0', + 'total-delete-files': '0', + 'total-position-deletes': '0', + 'total-equality-deletes': '0', + }, + ) + + +def test_merge_snapshot_summaries_new_summary() -> None: + actual = merge_snapshot_summaries( + None, + Summary( + operation=Operation.APPEND, + **{ + 'total-data-files': '0', + 'added-data-files': '2', + 'removed-data-files': '1', + 'total-delete-files': '0', + 'added-delete-files': '2', + 'removed-delete-files': '1', + 'total-equality-deletes': '0', + 'added-equality-deletes': '2', + 'removed-equality-deletes': '1', + 'total-files-size': '0', + 'added-files-size': '2', + 'removed-files-size': '1', + 'total-position-deletes': '0', + 'added-position-deletes': '2', + 'removed-position-deletes': '1', + 'total-records': '0', + 'added-records': '2', + 'removed-records': '1', + }, + ), + ) + + expected = Summary( + Operation.APPEND, + **{ + 'total-data-files': '2', + 'added-data-files': '2', + 'removed-data-files': '1', + 'total-delete-files': '2', + 'added-delete-files': '2', + 'removed-delete-files': '1', + 'total-equality-deletes': '2', + 'added-equality-deletes': '2', + 'removed-equality-deletes': '1', + 'total-files-size': '2', + 'added-files-size': '2', + 'removed-files-size': '1', + 'total-position-deletes': '2', + 'added-position-deletes': '2', + 'removed-position-deletes': '1', + 'total-records': '2', + 'added-records': '2', + 'removed-records': '1', + }, + ) + + assert actual == expected + + +def test_merge_snapshot_summaries_old_and_new_summary() -> None: + actual = merge_snapshot_summaries( + { + 'total-data-files': '1', + 'total-delete-files': '1', + 'total-equality-deletes': '1', + 'total-files-size': '1', + 'total-position-deletes': '1', + 'total-records': '1', + }, + Summary( + operation=Operation.APPEND, + **{ + 'total-data-files': '0', + 'added-data-files': '2', + 'removed-data-files': '1', + 'total-delete-files': '0', + 'added-delete-files': '2', + 'removed-delete-files': '1', + 'total-equality-deletes': '0', + 'added-equality-deletes': '2', + 'removed-equality-deletes': '1', + 'total-files-size': '0', + 'added-files-size': '2', + 'removed-files-size': '1', + 'total-position-deletes': '0', + 'added-position-deletes': '2', + 'removed-position-deletes': '1', + 'total-records': '0', + 'added-records': '2', + 'removed-records': '1', + }, + ), + ) + + expected = Summary( + Operation.APPEND, + **{ + 'total-data-files': '3', + 'added-data-files': '2', + 'removed-data-files': '1', + 'total-delete-files': '3', + 'added-delete-files': '2', + 'removed-delete-files': '1', + 'total-equality-deletes': '3', + 'added-equality-deletes': '2', + 'removed-equality-deletes': '1', + 'total-files-size': '3', + 'added-files-size': '2', + 'removed-files-size': '1', + 'total-position-deletes': '3', + 'added-position-deletes': '2', + 'removed-position-deletes': '1', + 'total-records': '3', + 'added-records': '2', + 'removed-records': '1', + }, + ) + + assert actual == expected From 580c8241f1c2345c1703e5e42d50f37db9e63772 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 13 Oct 2023 23:33:33 +0200 Subject: [PATCH 2/6] Cleanup --- pyiceberg/table/snapshots.py | 102 +++++++++++++++++++---- tests/table/test_snapshots.py | 153 +++++++++++++++------------------- 2 files changed, 153 insertions(+), 102 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 05e28ace78..2395ad385c 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -178,6 +178,7 @@ def add_file(self, data_file: DataFile) -> None: if data_file.content == DataFileContent.DATA: self.added_files += 1 self.added_records += data_file.record_count + self.added_size += data_file.file_size_in_bytes elif data_file.content == DataFileContent.POSITION_DELETES: self.added_delete_files += 1 self.added_pos_delete_files += 1 @@ -206,13 +207,13 @@ def removed_file(self, data_file: DataFile) -> None: def added_manifest(self, manifest: ManifestFile) -> None: if manifest.content == ManifestContent.DATA: - self.added_files += manifest.added_files_count or 0 - self.added_records += manifest.added_rows_count or 0 - self.removed_files += manifest.deleted_files_count or 0 - self.deleted_records += manifest.deleted_rows_count or 0 + self.added_files += manifest.added_files_count + self.added_records += manifest.added_rows_count + self.removed_files += manifest.deleted_files_count + self.deleted_records += manifest.deleted_rows_count elif manifest.content == ManifestContent.DELETES: - self.added_delete_files += manifest.added_files_count or 0 - self.removed_delete_files += manifest.deleted_files_count or 0 + self.added_delete_files += manifest.added_files_count + self.removed_delete_files += manifest.deleted_files_count else: raise ValueError(f"Unknown manifest file content: {manifest.content}") @@ -242,15 +243,55 @@ def set_non_zero(properties: Dict[str, str], num: int, property_name: str) -> No return properties -def merge_snapshot_summaries(previous_summary: Optional[Mapping[str, str]], summary: Summary) -> Dict[str, str]: - properties = ['records', 'files-size', 'data-files', 'delete-files', 'position-deletes', 'equality-deletes'] +properties = ['records', 'files-size', 'data-files', 'delete-files', 'position-deletes', 'equality-deletes'] + + +def truncate_table_summary(summary: Dict[str, str]) -> Dict[str, str]: + truncated_metrics = { + 'total-data-files': '0', + 'total-delete-files': '0', + 'total-records': '0', + 'total-files-size': '0', + 'total-position-deletes': '0', + 'total-equality-deletes': '0', + } + + def _cast_to_int(value: str) -> Optional[int]: + return int(value) if value is not None else None + + if value := _cast_to_int(summary.get('total-data-files')): + truncated_metrics['deleted-data-files'] = str(value) + if value := _cast_to_int(summary.get('total-delete-files')): + truncated_metrics['removed-delete-files'] = str(value) + if value := _cast_to_int(summary.get('total-records')): + truncated_metrics['deleted-records'] = str(value) + if value := _cast_to_int(summary.get('total-files-size')): + truncated_metrics['removed-files-size'] = str(value) + if value := _cast_to_int(summary.get('total-position-deletes')): + truncated_metrics['removed-position-deletes'] = str(value) + if value := _cast_to_int(summary.get('total-equality-deletes')): + truncated_metrics['removed-equality-deletes'] = str(value) + + return truncated_metrics + + +def merge_snapshot_summaries(operation: Operation, previous_summary: Optional[Mapping], summary: Mapping) -> Dict[str, str]: + if operation == Operation.OVERWRITE: + summary.update(truncate_table_summary(previous_summary)) if not previous_summary: - previous_summary = {f'total-{prop}': '0' for prop in properties} + previous_summary = { + 'total-data-files': '0', + 'total-delete-files': '0', + 'total-records': '0', + 'total-files-size': '0', + 'total-position-deletes': '0', + 'total-equality-deletes': '0', + } - def _update_totals(total_property: str, added_property: str, removed_property: str) -> None: - if new_total_str := previous_summary.get(total_property): - new_total = int(new_total_str) + def _update_totals(total_property: str, added_property: str, removed_property: str): + if new_total := previous_summary.get(total_property): + new_total = int(new_total) if new_total >= 0 and (added := summary.get(added_property)): new_total += int(added) if new_total >= 0 and (removed := summary.get(removed_property)): @@ -258,10 +299,35 @@ def _update_totals(total_property: str, added_property: str, removed_property: s if new_total >= 0: summary[total_property] = str(new_total) - for prop in properties: - _update_totals( - total_property=f'total-{prop}', - added_property=f'added-{prop}', - removed_property=f'deleted-{prop}', - ) + _update_totals( + total_property='total-data-files', + added_property='added-data-files', + removed_property='deleted-data-files', + ) + _update_totals( + total_property='total-delete-files', + added_property='added-delete-files', + removed_property='removed-delete-files', + ) + _update_totals( + total_property='total-records', + added_property='added-records', + removed_property='deleted-records', + ) + _update_totals( + total_property='total-files-size', + added_property='added-files-size', + removed_property='deleted-files-size', + ) + _update_totals( + total_property='total-position-deletes', + added_property='added-position-deletes', + removed_property='removed-position-deletes', + ) + _update_totals( + total_property='total-equality-deletes', + added_property='added-equality-deletes', + removed_property='removed-equality-deletes', + ) + return summary diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 4b2ddd999e..f030870516 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -160,76 +160,67 @@ def test_snapshot_summary_collector(manifest_file: ManifestFile, data_file: Data assert ssc.build() == {'added-data-files': '2', 'added-records': '200', 'deleted-records': '120', 'removed-data-files': '3'} -def test_merge_snapshot_summaries_empty() -> None: - assert merge_snapshot_summaries(None, Summary(operation=Operation.APPEND)) == Summary( - Operation.APPEND, - **{ - 'total-records': '0', - 'total-files-size': '0', - 'total-data-files': '0', - 'total-delete-files': '0', - 'total-position-deletes': '0', - 'total-equality-deletes': '0', - }, - ) - - -def test_merge_snapshot_summaries_new_summary() -> None: +def test_merge_snapshot_summaries_empty(): + assert merge_snapshot_summaries(None, {}) == { + 'total-data-files': '0', + 'total-delete-files': '0', + 'total-equality-deletes': '0', + 'total-files-size': '0', + 'total-position-deletes': '0', + 'total-records': '0', + } + + +def test_merge_snapshot_summaries_new_summary(): actual = merge_snapshot_summaries( None, - Summary( - operation=Operation.APPEND, - **{ - 'total-data-files': '0', - 'added-data-files': '2', - 'removed-data-files': '1', - 'total-delete-files': '0', - 'added-delete-files': '2', - 'removed-delete-files': '1', - 'total-equality-deletes': '0', - 'added-equality-deletes': '2', - 'removed-equality-deletes': '1', - 'total-files-size': '0', - 'added-files-size': '2', - 'removed-files-size': '1', - 'total-position-deletes': '0', - 'added-position-deletes': '2', - 'removed-position-deletes': '1', - 'total-records': '0', - 'added-records': '2', - 'removed-records': '1', - }, - ), - ) - - expected = Summary( - Operation.APPEND, - **{ - 'total-data-files': '2', + { + 'total-data-files': '0', 'added-data-files': '2', 'removed-data-files': '1', - 'total-delete-files': '2', + 'total-delete-files': '0', 'added-delete-files': '2', 'removed-delete-files': '1', - 'total-equality-deletes': '2', + 'total-equality-deletes': '0', 'added-equality-deletes': '2', 'removed-equality-deletes': '1', - 'total-files-size': '2', + 'total-files-size': '0', 'added-files-size': '2', 'removed-files-size': '1', - 'total-position-deletes': '2', + 'total-position-deletes': '0', 'added-position-deletes': '2', 'removed-position-deletes': '1', - 'total-records': '2', + 'total-records': '0', 'added-records': '2', 'removed-records': '1', }, ) + expected = { + 'added-data-files': '2', + 'added-delete-files': '2', + 'added-equality-deletes': '2', + 'added-files-size': '2', + 'added-position-deletes': '2', + 'added-records': '2', + 'removed-data-files': '1', + 'removed-delete-files': '1', + 'removed-equality-deletes': '1', + 'removed-files-size': '1', + 'removed-position-deletes': '1', + 'removed-records': '1', + 'total-data-files': '2', + 'total-delete-files': '2', + 'total-equality-deletes': '2', + 'total-files-size': '2', + 'total-position-deletes': '2', + 'total-records': '2', + } + assert actual == expected -def test_merge_snapshot_summaries_old_and_new_summary() -> None: +def test_merge_snapshot_summaries_old_and_new_summary(): actual = merge_snapshot_summaries( { 'total-data-files': '1', @@ -239,53 +230,47 @@ def test_merge_snapshot_summaries_old_and_new_summary() -> None: 'total-position-deletes': '1', 'total-records': '1', }, - Summary( - operation=Operation.APPEND, - **{ - 'total-data-files': '0', - 'added-data-files': '2', - 'removed-data-files': '1', - 'total-delete-files': '0', - 'added-delete-files': '2', - 'removed-delete-files': '1', - 'total-equality-deletes': '0', - 'added-equality-deletes': '2', - 'removed-equality-deletes': '1', - 'total-files-size': '0', - 'added-files-size': '2', - 'removed-files-size': '1', - 'total-position-deletes': '0', - 'added-position-deletes': '2', - 'removed-position-deletes': '1', - 'total-records': '0', - 'added-records': '2', - 'removed-records': '1', - }, - ), - ) - - expected = Summary( - Operation.APPEND, - **{ - 'total-data-files': '3', + { + 'total-data-files': '0', 'added-data-files': '2', 'removed-data-files': '1', - 'total-delete-files': '3', + 'total-delete-files': '0', 'added-delete-files': '2', 'removed-delete-files': '1', - 'total-equality-deletes': '3', + 'total-equality-deletes': '0', 'added-equality-deletes': '2', 'removed-equality-deletes': '1', - 'total-files-size': '3', + 'total-files-size': '0', 'added-files-size': '2', 'removed-files-size': '1', - 'total-position-deletes': '3', + 'total-position-deletes': '0', 'added-position-deletes': '2', 'removed-position-deletes': '1', - 'total-records': '3', + 'total-records': '0', 'added-records': '2', 'removed-records': '1', }, ) + expected = { + 'added-data-files': '2', + 'added-delete-files': '2', + 'added-equality-deletes': '2', + 'added-files-size': '2', + 'added-position-deletes': '2', + 'added-records': '2', + 'removed-data-files': '1', + 'removed-delete-files': '1', + 'removed-equality-deletes': '1', + 'removed-files-size': '1', + 'removed-position-deletes': '1', + 'removed-records': '1', + 'total-data-files': '3', + 'total-delete-files': '3', + 'total-equality-deletes': '3', + 'total-files-size': '3', + 'total-position-deletes': '3', + 'total-records': '3', + } + assert actual == expected From 3dba41a39749eabef9353f6bdf620eec053208d1 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 23 Oct 2023 14:20:52 -0400 Subject: [PATCH 3/6] Refactor it a bit --- pyiceberg/table/snapshots.py | 85 +++++++++-------- tests/table/test_snapshots.py | 166 ++++++++++++++++------------------ 2 files changed, 125 insertions(+), 126 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 2395ad385c..995478be76 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -101,6 +101,14 @@ def __repr__(self) -> str: repr_properties = f", **{repr(self._additional_properties)}" if self._additional_properties else "" return f"Summary({repr(self.operation)}{repr_properties})" + def __eq__(self, other: Any) -> bool: + """Compare if the summary is equal to another summary.""" + return ( + self.operation == other.operation and self.additional_properties == other.additional_properties + if isinstance(other, Summary) + else False + ) + class Snapshot(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") @@ -207,13 +215,13 @@ def removed_file(self, data_file: DataFile) -> None: def added_manifest(self, manifest: ManifestFile) -> None: if manifest.content == ManifestContent.DATA: - self.added_files += manifest.added_files_count - self.added_records += manifest.added_rows_count - self.removed_files += manifest.deleted_files_count - self.deleted_records += manifest.deleted_rows_count + self.added_files += manifest.added_files_count or 0 + self.added_records += manifest.added_rows_count or 0 + self.removed_files += manifest.deleted_files_count or 0 + self.deleted_records += manifest.deleted_rows_count or 0 elif manifest.content == ManifestContent.DELETES: - self.added_delete_files += manifest.added_files_count - self.removed_delete_files += manifest.deleted_files_count + self.added_delete_files += manifest.added_files_count or 0 + self.removed_delete_files += manifest.deleted_files_count or 0 else: raise ValueError(f"Unknown manifest file content: {manifest.content}") @@ -246,38 +254,39 @@ def set_non_zero(properties: Dict[str, str], num: int, property_name: str) -> No properties = ['records', 'files-size', 'data-files', 'delete-files', 'position-deletes', 'equality-deletes'] -def truncate_table_summary(summary: Dict[str, str]) -> Dict[str, str]: - truncated_metrics = { - 'total-data-files': '0', - 'total-delete-files': '0', - 'total-records': '0', - 'total-files-size': '0', - 'total-position-deletes': '0', - 'total-equality-deletes': '0', - } - - def _cast_to_int(value: str) -> Optional[int]: - return int(value) if value is not None else None +def truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary: + for prop in { + 'total-data-files', + 'total-delete-files', + 'total-records', + 'total-files-size', + 'total-position-deletes', + 'total-equality-deletes', + }: + summary[prop] = '0' + + if value := previous_summary.get('total-data-files'): + summary['deleted-data-files'] = value + if value := previous_summary.get('total-delete-files'): + summary['removed-delete-files'] = value + if value := previous_summary.get('total-records'): + summary['deleted-records'] = value + if value := previous_summary.get('total-files-size'): + summary['removed-files-size'] = value + if value := previous_summary.get('total-position-deletes'): + summary['removed-position-deletes'] = value + if value := previous_summary.get('total-equality-deletes'): + summary['removed-equality-deletes'] = value - if value := _cast_to_int(summary.get('total-data-files')): - truncated_metrics['deleted-data-files'] = str(value) - if value := _cast_to_int(summary.get('total-delete-files')): - truncated_metrics['removed-delete-files'] = str(value) - if value := _cast_to_int(summary.get('total-records')): - truncated_metrics['deleted-records'] = str(value) - if value := _cast_to_int(summary.get('total-files-size')): - truncated_metrics['removed-files-size'] = str(value) - if value := _cast_to_int(summary.get('total-position-deletes')): - truncated_metrics['removed-position-deletes'] = str(value) - if value := _cast_to_int(summary.get('total-equality-deletes')): - truncated_metrics['removed-equality-deletes'] = str(value) - - return truncated_metrics + return summary -def merge_snapshot_summaries(operation: Operation, previous_summary: Optional[Mapping], summary: Mapping) -> Dict[str, str]: - if operation == Operation.OVERWRITE: - summary.update(truncate_table_summary(previous_summary)) +def merge_snapshot_summaries( + summary: Summary, + previous_summary: Optional[Mapping[str, str]] = None, +) -> Summary: + if summary.operation == Operation.OVERWRITE and previous_summary is not None: + summary = truncate_table_summary(summary, previous_summary) if not previous_summary: previous_summary = { @@ -289,9 +298,9 @@ def merge_snapshot_summaries(operation: Operation, previous_summary: Optional[Ma 'total-equality-deletes': '0', } - def _update_totals(total_property: str, added_property: str, removed_property: str): - if new_total := previous_summary.get(total_property): - new_total = int(new_total) + def _update_totals(total_property: str, added_property: str, removed_property: str) -> None: + if new_total_str := previous_summary.get(total_property): + new_total = int(new_total_str) if new_total >= 0 and (added := summary.get(added_property)): new_total += int(added) if new_total >= 0 and (removed := summary.get(removed_property)): diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index f030870516..d0a6748a0a 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -157,72 +157,79 @@ def test_snapshot_summary_collector(manifest_file: ManifestFile, data_file: Data ssc.add_file(data_file) - assert ssc.build() == {'added-data-files': '2', 'added-records': '200', 'deleted-records': '120', 'removed-data-files': '3'} - - -def test_merge_snapshot_summaries_empty(): - assert merge_snapshot_summaries(None, {}) == { - 'total-data-files': '0', - 'total-delete-files': '0', - 'total-equality-deletes': '0', - 'total-files-size': '0', - 'total-position-deletes': '0', - 'total-records': '0', + assert ssc.build() == { + 'added-data-files': '2', + 'added-files-size': '1234', + 'added-records': '200', + 'deleted-records': '120', + 'removed-data-files': '3', } -def test_merge_snapshot_summaries_new_summary(): - actual = merge_snapshot_summaries( - None, - { +def test_merge_snapshot_summaries_empty() -> None: + assert merge_snapshot_summaries(Summary(Operation.APPEND)) == Summary( + operation=Operation.APPEND, + **{ 'total-data-files': '0', - 'added-data-files': '2', - 'removed-data-files': '1', 'total-delete-files': '0', - 'added-delete-files': '2', - 'removed-delete-files': '1', - 'total-equality-deletes': '0', - 'added-equality-deletes': '2', - 'removed-equality-deletes': '1', + 'total-records': '0', 'total-files-size': '0', - 'added-files-size': '2', - 'removed-files-size': '1', 'total-position-deletes': '0', - 'added-position-deletes': '2', - 'removed-position-deletes': '1', - 'total-records': '0', - 'added-records': '2', - 'removed-records': '1', + 'total-equality-deletes': '0', }, ) - expected = { - 'added-data-files': '2', - 'added-delete-files': '2', - 'added-equality-deletes': '2', - 'added-files-size': '2', - 'added-position-deletes': '2', - 'added-records': '2', - 'removed-data-files': '1', - 'removed-delete-files': '1', - 'removed-equality-deletes': '1', - 'removed-files-size': '1', - 'removed-position-deletes': '1', - 'removed-records': '1', - 'total-data-files': '2', - 'total-delete-files': '2', - 'total-equality-deletes': '2', - 'total-files-size': '2', - 'total-position-deletes': '2', - 'total-records': '2', - } + +def test_merge_snapshot_summaries_new_summary() -> None: + actual = merge_snapshot_summaries( + summary=Summary( + operation=Operation.APPEND, + **{ + 'added-data-files': '1', + 'added-delete-files': '2', + 'added-equality-deletes': '3', + 'added-files-size': '4', + 'added-position-deletes': '5', + 'added-records': '6', + }, + ) + ) + + expected = Summary( + operation=Operation.APPEND, + **{ + 'added-data-files': '1', + 'added-delete-files': '2', + 'added-equality-deletes': '3', + 'added-files-size': '4', + 'added-position-deletes': '5', + 'added-records': '6', + 'total-data-files': '1', + 'total-delete-files': '2', + 'total-records': '6', + 'total-files-size': '4', + 'total-position-deletes': '5', + 'total-equality-deletes': '3', + }, + ) assert actual == expected -def test_merge_snapshot_summaries_old_and_new_summary(): +def test_merge_snapshot_summaries_overwrite_summary() -> None: actual = merge_snapshot_summaries( - { + summary=Summary( + operation=Operation.OVERWRITE, + **{ + 'added-data-files': '1', + 'added-delete-files': '2', + 'added-equality-deletes': '3', + 'added-files-size': '4', + 'added-position-deletes': '5', + 'added-records': '6', + }, + ), + previous_summary={ 'total-data-files': '1', 'total-delete-files': '1', 'total-equality-deletes': '1', @@ -230,47 +237,30 @@ def test_merge_snapshot_summaries_old_and_new_summary(): 'total-position-deletes': '1', 'total-records': '1', }, - { - 'total-data-files': '0', - 'added-data-files': '2', - 'removed-data-files': '1', - 'total-delete-files': '0', + ) + + expected = Summary( + operation=Operation.OVERWRITE, + **{ + 'added-data-files': '1', 'added-delete-files': '2', + 'added-equality-deletes': '3', + 'added-files-size': '4', + 'added-position-deletes': '5', + 'added-records': '6', + 'total-position-deletes': '5', + 'total-files-size': '5', + 'total-records': '6', + 'total-delete-files': '2', + 'total-data-files': '1', + 'total-equality-deletes': '3', + 'deleted-data-files': '1', 'removed-delete-files': '1', - 'total-equality-deletes': '0', - 'added-equality-deletes': '2', - 'removed-equality-deletes': '1', - 'total-files-size': '0', - 'added-files-size': '2', + 'deleted-records': '1', 'removed-files-size': '1', - 'total-position-deletes': '0', - 'added-position-deletes': '2', 'removed-position-deletes': '1', - 'total-records': '0', - 'added-records': '2', - 'removed-records': '1', + 'removed-equality-deletes': '1', }, ) - expected = { - 'added-data-files': '2', - 'added-delete-files': '2', - 'added-equality-deletes': '2', - 'added-files-size': '2', - 'added-position-deletes': '2', - 'added-records': '2', - 'removed-data-files': '1', - 'removed-delete-files': '1', - 'removed-equality-deletes': '1', - 'removed-files-size': '1', - 'removed-position-deletes': '1', - 'removed-records': '1', - 'total-data-files': '3', - 'total-delete-files': '3', - 'total-equality-deletes': '3', - 'total-files-size': '3', - 'total-position-deletes': '3', - 'total-records': '3', - } - - assert actual == expected + assert actual.additional_properties == expected.additional_properties From 12c4699bfbc04ea9d5b6901b61463232f7eaf426 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 25 Oct 2023 14:17:16 -0400 Subject: [PATCH 4/6] Comments --- pyiceberg/table/snapshots.py | 11 +++++++---- tests/table/test_snapshots.py | 10 ++++++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 995478be76..ee2f2ff9a6 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -183,10 +183,11 @@ def __init__(self) -> None: self.removed_eq_deletes = 0 def add_file(self, data_file: DataFile) -> None: + self.added_size += data_file.file_size_in_bytes + if data_file.content == DataFileContent.DATA: self.added_files += 1 self.added_records += data_file.record_count - self.added_size += data_file.file_size_in_bytes elif data_file.content == DataFileContent.POSITION_DELETES: self.added_delete_files += 1 self.added_pos_delete_files += 1 @@ -199,6 +200,8 @@ def add_file(self, data_file: DataFile) -> None: raise ValueError(f"Unknown data file content: {data_file.content}") def removed_file(self, data_file: DataFile) -> None: + self.removed_size += data_file.file_size_in_bytes + if data_file.content == DataFileContent.DATA: self.removed_files += 1 self.deleted_records += data_file.record_count @@ -251,9 +254,6 @@ def set_non_zero(properties: Dict[str, str], num: int, property_name: str) -> No return properties -properties = ['records', 'files-size', 'data-files', 'delete-files', 'position-deletes', 'equality-deletes'] - - def truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary: for prop in { 'total-data-files', @@ -285,6 +285,9 @@ def merge_snapshot_summaries( summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, ) -> Summary: + if summary.operation not in {Operation.APPEND, Operation.OVERWRITE}: + raise ValueError(f"Operation not implemented: {summary.operation}") + if summary.operation == Operation.OVERWRITE and previous_summary is not None: summary = truncate_table_summary(summary, previous_summary) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index d0a6748a0a..b09c90f940 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -264,3 +264,13 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: ) assert actual.additional_properties == expected.additional_properties + + +def test_invalid_operation() -> None: + with pytest.raises(ValueError) as e: + merge_snapshot_summaries(summary=Summary(Operation.REPLACE)) + assert "Operation not implemented: Operation.REPLACE" in str(e.value) + + with pytest.raises(ValueError) as e: + merge_snapshot_summaries(summary=Summary(Operation.DELETE)) + assert "Operation not implemented: Operation.DELETE" in str(e.value) From 49687fc8f9b71cddc1f27b983d5b82f59ce1477c Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 6 Nov 2023 14:52:04 +0100 Subject: [PATCH 5/6] Thanks Ryan! --- pyiceberg/table/snapshots.py | 185 ++++++++++++++++++---------------- pyproject.toml | 6 +- tests/table/test_snapshots.py | 91 ++++++++++------- 3 files changed, 155 insertions(+), 127 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index ee2f2ff9a6..77e258ba81 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -26,9 +26,33 @@ from pydantic import Field, PrivateAttr, model_serializer from pyiceberg.io import FileIO -from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, read_manifest_list +from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list from pyiceberg.typedef import IcebergBaseModel +ADDED_DATA_FILES = 'added-data-files' +ADDED_DELETE_FILES = 'added-delete-files' +ADDED_EQUALITY_DELETES = 'added-equality-deletes' +ADDED_FILE_SIZE = 'added-files-size' +ADDED_POSITION_DELETES = 'added-position-deletes' +ADDED_POSITION_DELETE_FILES = f'{ADDED_POSITION_DELETES}-files' +ADDED_RECORDS = 'added-records' +DELETED_DATA_FILES = 'deleted-data-files' +DELETED_RECORDS = 'deleted-records' +EQUALITY_DELETE_FILES = 'added-equality-delete-files' +REMOVED_DELETE_FILES = 'removed-delete-files' +REMOVED_EQUALITY_DELETES = 'removed-equality-deletes' +REMOVED_EQUALITY_DELETE_FILES = f'{REMOVED_EQUALITY_DELETES}-files' +REMOVED_FILE_SIZE = 'removed-files-size' +REMOVED_POSITION_DELETES = 'removed-position-deletes' +REMOVED_POSITION_DELETE_FILES = f'{REMOVED_POSITION_DELETES}-files' +TOTAL_EQUALITY_DELETES = 'total-equality-deletes' +TOTAL_POSITION_DELETES = 'total-position-deletes' +TOTAL_DATA_FILES = 'total-data-files' +TOTAL_DELETE_FILES = 'total-delete-files' +TOTAL_RECORDS = 'total-records' +TOTAL_FILE_SIZE = 'total-files-size' + + OPERATION = "operation" @@ -199,7 +223,7 @@ def add_file(self, data_file: DataFile) -> None: else: raise ValueError(f"Unknown data file content: {data_file.content}") - def removed_file(self, data_file: DataFile) -> None: + def remove_file(self, data_file: DataFile) -> None: self.removed_size += data_file.file_size_in_bytes if data_file.content == DataFileContent.DATA: @@ -216,130 +240,121 @@ def removed_file(self, data_file: DataFile) -> None: else: raise ValueError(f"Unknown data file content: {data_file.content}") - def added_manifest(self, manifest: ManifestFile) -> None: - if manifest.content == ManifestContent.DATA: - self.added_files += manifest.added_files_count or 0 - self.added_records += manifest.added_rows_count or 0 - self.removed_files += manifest.deleted_files_count or 0 - self.deleted_records += manifest.deleted_rows_count or 0 - elif manifest.content == ManifestContent.DELETES: - self.added_delete_files += manifest.added_files_count or 0 - self.removed_delete_files += manifest.deleted_files_count or 0 - else: - raise ValueError(f"Unknown manifest file content: {manifest.content}") - def build(self) -> Dict[str, str]: - def set_non_zero(properties: Dict[str, str], num: int, property_name: str) -> None: + def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None: if num > 0: properties[property_name] = str(num) properties: Dict[str, str] = {} - set_non_zero(properties, self.added_size, 'added-files-size') - set_non_zero(properties, self.removed_size, 'removed-files-size') - set_non_zero(properties, self.added_files, 'added-data-files') - set_non_zero(properties, self.removed_files, 'removed-data-files') - set_non_zero(properties, self.added_eq_delete_files, 'added-equality-delete-files') - set_non_zero(properties, self.removed_eq_delete_files, 'removed-equality-delete-files') - set_non_zero(properties, self.added_pos_delete_files, 'added-position-delete-files') - set_non_zero(properties, self.removed_pos_delete_files, 'removed-position-delete-files') - set_non_zero(properties, self.added_delete_files, 'added-delete-files') - set_non_zero(properties, self.removed_delete_files, 'removed-delete-files') - set_non_zero(properties, self.added_records, 'added-records') - set_non_zero(properties, self.deleted_records, 'deleted-records') - set_non_zero(properties, self.added_pos_deletes, 'added-position-deletes') - set_non_zero(properties, self.removed_pos_deletes, 'removed-position-deletes') - set_non_zero(properties, self.added_eq_deletes, 'added-equality-deletes') - set_non_zero(properties, self.removed_eq_deletes, 'removed-equality-deletes') + set_when_positive(properties, self.added_size, ADDED_FILE_SIZE) + set_when_positive(properties, self.removed_size, REMOVED_FILE_SIZE) + set_when_positive(properties, self.added_files, ADDED_DATA_FILES) + set_when_positive(properties, self.removed_files, DELETED_DATA_FILES) + set_when_positive(properties, self.added_eq_delete_files, EQUALITY_DELETE_FILES) + set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES) + set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES) + set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES) + set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES) + set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES) + set_when_positive(properties, self.added_records, ADDED_RECORDS) + set_when_positive(properties, self.deleted_records, DELETED_RECORDS) + set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES) + set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES) + set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES) + set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES) return properties -def truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary: +def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary: for prop in { - 'total-data-files', - 'total-delete-files', - 'total-records', - 'total-files-size', - 'total-position-deletes', - 'total-equality-deletes', + TOTAL_DATA_FILES, + TOTAL_DELETE_FILES, + TOTAL_RECORDS, + TOTAL_FILE_SIZE, + TOTAL_POSITION_DELETES, + TOTAL_EQUALITY_DELETES, }: summary[prop] = '0' - if value := previous_summary.get('total-data-files'): - summary['deleted-data-files'] = value - if value := previous_summary.get('total-delete-files'): - summary['removed-delete-files'] = value - if value := previous_summary.get('total-records'): - summary['deleted-records'] = value - if value := previous_summary.get('total-files-size'): - summary['removed-files-size'] = value - if value := previous_summary.get('total-position-deletes'): - summary['removed-position-deletes'] = value - if value := previous_summary.get('total-equality-deletes'): - summary['removed-equality-deletes'] = value + if value := previous_summary.get(TOTAL_DATA_FILES): + summary[DELETED_DATA_FILES] = value + if value := previous_summary.get(TOTAL_DELETE_FILES): + summary[REMOVED_DELETE_FILES] = value + if value := previous_summary.get(TOTAL_RECORDS): + summary[DELETED_RECORDS] = value + if value := previous_summary.get(TOTAL_FILE_SIZE): + summary[REMOVED_FILE_SIZE] = value + if value := previous_summary.get(TOTAL_POSITION_DELETES): + summary[REMOVED_POSITION_DELETES] = value + if value := previous_summary.get(TOTAL_EQUALITY_DELETES): + summary[REMOVED_EQUALITY_DELETES] = value return summary -def merge_snapshot_summaries( - summary: Summary, - previous_summary: Optional[Mapping[str, str]] = None, +def _merge_snapshot_summaries( + summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False ) -> Summary: if summary.operation not in {Operation.APPEND, Operation.OVERWRITE}: raise ValueError(f"Operation not implemented: {summary.operation}") - if summary.operation == Operation.OVERWRITE and previous_summary is not None: - summary = truncate_table_summary(summary, previous_summary) + if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None: + summary = _truncate_table_summary(summary, previous_summary) if not previous_summary: previous_summary = { - 'total-data-files': '0', - 'total-delete-files': '0', - 'total-records': '0', - 'total-files-size': '0', - 'total-position-deletes': '0', - 'total-equality-deletes': '0', + TOTAL_DATA_FILES: '0', + TOTAL_DELETE_FILES: '0', + TOTAL_RECORDS: '0', + TOTAL_FILE_SIZE: '0', + TOTAL_POSITION_DELETES: '0', + TOTAL_EQUALITY_DELETES: '0', } def _update_totals(total_property: str, added_property: str, removed_property: str) -> None: - if new_total_str := previous_summary.get(total_property): - new_total = int(new_total_str) - if new_total >= 0 and (added := summary.get(added_property)): - new_total += int(added) - if new_total >= 0 and (removed := summary.get(removed_property)): - new_total -= int(removed) + if previous_total_str := previous_summary.get(total_property): + try: + new_total = int(previous_total_str) + if new_total >= 0 and (added := summary.get(added_property)): + new_total += int(added) + if new_total >= 0 and (removed := summary.get(removed_property)): + new_total -= int(removed) + except ValueError as e: + raise ValueError(f"Could not parse summary property {total_property} to an int: {previous_total_str}") from e + if new_total >= 0: summary[total_property] = str(new_total) _update_totals( - total_property='total-data-files', - added_property='added-data-files', - removed_property='deleted-data-files', + total_property=TOTAL_DATA_FILES, + added_property=ADDED_DATA_FILES, + removed_property=DELETED_DATA_FILES, ) _update_totals( - total_property='total-delete-files', - added_property='added-delete-files', - removed_property='removed-delete-files', + total_property=TOTAL_DELETE_FILES, + added_property=ADDED_DELETE_FILES, + removed_property=REMOVED_DELETE_FILES, ) _update_totals( - total_property='total-records', - added_property='added-records', - removed_property='deleted-records', + total_property=TOTAL_RECORDS, + added_property=ADDED_RECORDS, + removed_property=DELETED_RECORDS, ) _update_totals( - total_property='total-files-size', - added_property='added-files-size', - removed_property='deleted-files-size', + total_property=TOTAL_FILE_SIZE, + added_property=ADDED_FILE_SIZE, + removed_property=REMOVED_FILE_SIZE, ) _update_totals( - total_property='total-position-deletes', - added_property='added-position-deletes', - removed_property='removed-position-deletes', + total_property=TOTAL_POSITION_DELETES, + added_property=ADDED_POSITION_DELETES, + removed_property=REMOVED_POSITION_DELETES, ) _update_totals( - total_property='total-equality-deletes', - added_property='added-equality-deletes', - removed_property='removed-equality-deletes', + total_property=TOTAL_EQUALITY_DELETES, + added_property=ADDED_EQUALITY_DELETES, + removed_property=REMOVED_EQUALITY_DELETES, ) return summary diff --git a/pyproject.toml b/pyproject.toml index 6f17719e40..abcb0790dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -123,9 +123,9 @@ markers = [ ] # Turns a warning into an error -filterwarnings = [ - "error" -] +#filterwarnings = [ +# "error" +#] [tool.black] line-length = 130 diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index b09c90f940..73bf2c9888 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -18,7 +18,7 @@ import pytest from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile -from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, merge_snapshot_summaries +from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, _merge_snapshot_summaries @pytest.fixture @@ -146,28 +146,22 @@ def data_file() -> DataFile: ) -def test_snapshot_summary_collector(manifest_file: ManifestFile, data_file: DataFile) -> None: +def test_snapshot_summary_collector(data_file: DataFile) -> None: ssc = SnapshotSummaryCollector() assert ssc.build() == {} - ssc.added_manifest(manifest_file) - - assert ssc.build() == {'added-data-files': '1', 'added-records': '100', 'deleted-records': '120', 'removed-data-files': '3'} - ssc.add_file(data_file) assert ssc.build() == { - 'added-data-files': '2', + 'added-data-files': '1', 'added-files-size': '1234', - 'added-records': '200', - 'deleted-records': '120', - 'removed-data-files': '3', + 'added-records': '100', } def test_merge_snapshot_summaries_empty() -> None: - assert merge_snapshot_summaries(Summary(Operation.APPEND)) == Summary( + assert _merge_snapshot_summaries(Summary(Operation.APPEND)) == Summary( operation=Operation.APPEND, **{ 'total-data-files': '0', @@ -181,7 +175,7 @@ def test_merge_snapshot_summaries_empty() -> None: def test_merge_snapshot_summaries_new_summary() -> None: - actual = merge_snapshot_summaries( + actual = _merge_snapshot_summaries( summary=Summary( operation=Operation.APPEND, **{ @@ -217,7 +211,7 @@ def test_merge_snapshot_summaries_new_summary() -> None: def test_merge_snapshot_summaries_overwrite_summary() -> None: - actual = merge_snapshot_summaries( + actual = _merge_snapshot_summaries( summary=Summary( operation=Operation.OVERWRITE, **{ @@ -237,40 +231,59 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: 'total-position-deletes': '1', 'total-records': '1', }, + truncate_full_table=True, ) - expected = Summary( - operation=Operation.OVERWRITE, - **{ - 'added-data-files': '1', - 'added-delete-files': '2', - 'added-equality-deletes': '3', - 'added-files-size': '4', - 'added-position-deletes': '5', - 'added-records': '6', - 'total-position-deletes': '5', - 'total-files-size': '5', - 'total-records': '6', - 'total-delete-files': '2', - 'total-data-files': '1', - 'total-equality-deletes': '3', - 'deleted-data-files': '1', - 'removed-delete-files': '1', - 'deleted-records': '1', - 'removed-files-size': '1', - 'removed-position-deletes': '1', - 'removed-equality-deletes': '1', - }, - ) + expected = { + 'added-data-files': '1', + 'added-delete-files': '2', + 'added-equality-deletes': '3', + 'added-files-size': '4', + 'added-position-deletes': '5', + 'added-records': '6', + 'total-data-files': '1', + 'total-records': '6', + 'total-delete-files': '2', + 'total-equality-deletes': '3', + 'total-files-size': '4', + 'total-position-deletes': '5', + 'deleted-data-files': '1', + 'removed-delete-files': '1', + 'deleted-records': '1', + 'removed-files-size': '1', + 'removed-position-deletes': '1', + 'removed-equality-deletes': '1', + } - assert actual.additional_properties == expected.additional_properties + assert actual.additional_properties == expected def test_invalid_operation() -> None: with pytest.raises(ValueError) as e: - merge_snapshot_summaries(summary=Summary(Operation.REPLACE)) + _merge_snapshot_summaries(summary=Summary(Operation.REPLACE)) assert "Operation not implemented: Operation.REPLACE" in str(e.value) with pytest.raises(ValueError) as e: - merge_snapshot_summaries(summary=Summary(Operation.DELETE)) + _merge_snapshot_summaries(summary=Summary(Operation.DELETE)) assert "Operation not implemented: Operation.DELETE" in str(e.value) + + +def test_invalid_type() -> None: + with pytest.raises(ValueError) as e: + _merge_snapshot_summaries( + summary=Summary( + operation=Operation.OVERWRITE, + **{ + 'added-data-files': '1', + 'added-delete-files': '2', + 'added-equality-deletes': '3', + 'added-files-size': '4', + 'added-position-deletes': '5', + 'added-records': '6', + }, + ), + previous_summary={'total-data-files': 'abc'}, # should be a number + truncate_full_table=True, + ) + + assert "Could not parse summary property total-data-files to an int: abc" in str(e.value) From 6bca2aa09a8d179e47f16a9d0fe03543c696362d Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 7 Dec 2023 15:36:01 +0100 Subject: [PATCH 6/6] Thanks Ryan! --- pyiceberg/table/snapshots.py | 48 +++++++++++++++++------------------ tests/table/test_snapshots.py | 14 +++++----- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 77e258ba81..e22c95f8ee 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -34,17 +34,17 @@ ADDED_EQUALITY_DELETES = 'added-equality-deletes' ADDED_FILE_SIZE = 'added-files-size' ADDED_POSITION_DELETES = 'added-position-deletes' -ADDED_POSITION_DELETE_FILES = f'{ADDED_POSITION_DELETES}-files' +ADDED_POSITION_DELETE_FILES = 'added-position-delete-files' ADDED_RECORDS = 'added-records' DELETED_DATA_FILES = 'deleted-data-files' DELETED_RECORDS = 'deleted-records' -EQUALITY_DELETE_FILES = 'added-equality-delete-files' +ADDED_EQUALITY_DELETE_FILES = 'added-equality-delete-files' REMOVED_DELETE_FILES = 'removed-delete-files' REMOVED_EQUALITY_DELETES = 'removed-equality-deletes' -REMOVED_EQUALITY_DELETE_FILES = f'{REMOVED_EQUALITY_DELETES}-files' +REMOVED_EQUALITY_DELETE_FILES = 'removed-equality-delete-files' REMOVED_FILE_SIZE = 'removed-files-size' REMOVED_POSITION_DELETES = 'removed-position-deletes' -REMOVED_POSITION_DELETE_FILES = f'{REMOVED_POSITION_DELETES}-files' +REMOVED_POSITION_DELETE_FILES = 'removed-position-delete-files' TOTAL_EQUALITY_DELETES = 'total-equality-deletes' TOTAL_POSITION_DELETES = 'total-position-deletes' TOTAL_DATA_FILES = 'total-data-files' @@ -92,14 +92,14 @@ def __init__(self, operation: Operation, **data: Any) -> None: def __getitem__(self, __key: str) -> Optional[Any]: # type: ignore """Return a key as it is a map.""" - if __key == 'operation': + if __key.lower() == 'operation': return self.operation else: return self._additional_properties.get(__key) def __setitem__(self, key: str, value: Any) -> None: """Set a key as it is a map.""" - if key == 'operation': + if key.lower() == 'operation': self.operation = value else: self._additional_properties[key] = value @@ -171,10 +171,10 @@ class SnapshotLogEntry(IcebergBaseModel): class SnapshotSummaryCollector: - added_size: int - removed_size: int - added_files: int - removed_files: int + added_file_size: int + removed_file_size: int + added_data_files: int + removed_data_files: int added_eq_delete_files: int removed_eq_delete_files: int added_pos_delete_files: int @@ -189,10 +189,10 @@ class SnapshotSummaryCollector: removed_eq_deletes: int def __init__(self) -> None: - self.added_size = 0 - self.removed_size = 0 - self.added_files = 0 - self.removed_files = 0 + self.added_file_size = 0 + self.removed_file_size = 0 + self.added_data_files = 0 + self.removed_data_files = 0 self.added_eq_delete_files = 0 self.removed_eq_delete_files = 0 self.added_pos_delete_files = 0 @@ -207,10 +207,10 @@ def __init__(self) -> None: self.removed_eq_deletes = 0 def add_file(self, data_file: DataFile) -> None: - self.added_size += data_file.file_size_in_bytes + self.added_file_size += data_file.file_size_in_bytes if data_file.content == DataFileContent.DATA: - self.added_files += 1 + self.added_data_files += 1 self.added_records += data_file.record_count elif data_file.content == DataFileContent.POSITION_DELETES: self.added_delete_files += 1 @@ -224,10 +224,10 @@ def add_file(self, data_file: DataFile) -> None: raise ValueError(f"Unknown data file content: {data_file.content}") def remove_file(self, data_file: DataFile) -> None: - self.removed_size += data_file.file_size_in_bytes + self.removed_file_size += data_file.file_size_in_bytes if data_file.content == DataFileContent.DATA: - self.removed_files += 1 + self.removed_data_files += 1 self.deleted_records += data_file.record_count elif data_file.content == DataFileContent.POSITION_DELETES: self.removed_delete_files += 1 @@ -246,11 +246,11 @@ def set_when_positive(properties: Dict[str, str], num: int, property_name: str) properties[property_name] = str(num) properties: Dict[str, str] = {} - set_when_positive(properties, self.added_size, ADDED_FILE_SIZE) - set_when_positive(properties, self.removed_size, REMOVED_FILE_SIZE) - set_when_positive(properties, self.added_files, ADDED_DATA_FILES) - set_when_positive(properties, self.removed_files, DELETED_DATA_FILES) - set_when_positive(properties, self.added_eq_delete_files, EQUALITY_DELETE_FILES) + set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE) + set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE) + set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES) + set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES) + set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES) set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES) set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES) set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES) @@ -293,7 +293,7 @@ def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str return summary -def _merge_snapshot_summaries( +def _update_snapshot_summaries( summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False ) -> Summary: if summary.operation not in {Operation.APPEND, Operation.OVERWRITE}: diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 73bf2c9888..124c513022 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -18,7 +18,7 @@ import pytest from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile -from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, _merge_snapshot_summaries +from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, _update_snapshot_summaries @pytest.fixture @@ -161,7 +161,7 @@ def test_snapshot_summary_collector(data_file: DataFile) -> None: def test_merge_snapshot_summaries_empty() -> None: - assert _merge_snapshot_summaries(Summary(Operation.APPEND)) == Summary( + assert _update_snapshot_summaries(Summary(Operation.APPEND)) == Summary( operation=Operation.APPEND, **{ 'total-data-files': '0', @@ -175,7 +175,7 @@ def test_merge_snapshot_summaries_empty() -> None: def test_merge_snapshot_summaries_new_summary() -> None: - actual = _merge_snapshot_summaries( + actual = _update_snapshot_summaries( summary=Summary( operation=Operation.APPEND, **{ @@ -211,7 +211,7 @@ def test_merge_snapshot_summaries_new_summary() -> None: def test_merge_snapshot_summaries_overwrite_summary() -> None: - actual = _merge_snapshot_summaries( + actual = _update_snapshot_summaries( summary=Summary( operation=Operation.OVERWRITE, **{ @@ -260,17 +260,17 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: def test_invalid_operation() -> None: with pytest.raises(ValueError) as e: - _merge_snapshot_summaries(summary=Summary(Operation.REPLACE)) + _update_snapshot_summaries(summary=Summary(Operation.REPLACE)) assert "Operation not implemented: Operation.REPLACE" in str(e.value) with pytest.raises(ValueError) as e: - _merge_snapshot_summaries(summary=Summary(Operation.DELETE)) + _update_snapshot_summaries(summary=Summary(Operation.DELETE)) assert "Operation not implemented: Operation.DELETE" in str(e.value) def test_invalid_type() -> None: with pytest.raises(ValueError) as e: - _merge_snapshot_summaries( + _update_snapshot_summaries( summary=Summary( operation=Operation.OVERWRITE, **{