Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 244 additions & 2 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,40 @@
Any,
Dict,
List,
Mapping,
Optional,
)

from pydantic import Field, PrivateAttr, model_serializer

from pyiceberg.io import FileIO
from pyiceberg.manifest import ManifestFile, read_manifest_list
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list
from pyiceberg.typedef import IcebergBaseModel

ADDED_DATA_FILES = 'added-data-files'
ADDED_DELETE_FILES = 'added-delete-files'
ADDED_EQUALITY_DELETES = 'added-equality-deletes'
ADDED_FILE_SIZE = 'added-files-size'
ADDED_POSITION_DELETES = 'added-position-deletes'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These first 5 look correct.

ADDED_POSITION_DELETE_FILES = 'added-position-delete-files'
ADDED_RECORDS = 'added-records'
DELETED_DATA_FILES = 'deleted-data-files'
DELETED_RECORDS = 'deleted-records'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DELETED_ properties look correct.

ADDED_EQUALITY_DELETE_FILES = 'added-equality-delete-files'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Odd that this is here rather than with the other ADDED_ properties.

REMOVED_DELETE_FILES = 'removed-delete-files'
REMOVED_EQUALITY_DELETES = 'removed-equality-deletes'
REMOVED_EQUALITY_DELETE_FILES = 'removed-equality-delete-files'
REMOVED_FILE_SIZE = 'removed-files-size'
REMOVED_POSITION_DELETES = 'removed-position-deletes'
REMOVED_POSITION_DELETE_FILES = 'removed-position-delete-files'
TOTAL_EQUALITY_DELETES = 'total-equality-deletes'
TOTAL_POSITION_DELETES = 'total-position-deletes'
TOTAL_DATA_FILES = 'total-data-files'
TOTAL_DELETE_FILES = 'total-delete-files'
TOTAL_RECORDS = 'total-records'
TOTAL_FILE_SIZE = 'total-files-size'
Comment thread
rdblue marked this conversation as resolved.


OPERATION = "operation"


Expand All @@ -51,7 +76,7 @@ def __repr__(self) -> str:
return f"Operation.{self.name}"


class Summary(IcebergBaseModel):
class Summary(IcebergBaseModel, Mapping[str, str]):
"""A class that stores the summary information for a Snapshot.

The snapshot summary’s operation field is used by some operations,
Expand All @@ -65,6 +90,25 @@ def __init__(self, operation: Operation, **data: Any) -> None:
super().__init__(operation=operation, **data)
self._additional_properties = data

def __getitem__(self, __key: str) -> Optional[Any]: # type: ignore
"""Return a key as it is a map."""
if __key.lower() == 'operation':
return self.operation
else:
return self._additional_properties.get(__key)

def __setitem__(self, key: str, value: Any) -> None:
"""Set a key as it is a map."""
if key.lower() == 'operation':
self.operation = value
else:
self._additional_properties[key] = value

def __len__(self) -> int:
"""Return the number of keys in the summary."""
# Operation is required
return 1 + len(self._additional_properties)

@model_serializer
def ser_model(self) -> Dict[str, str]:
return {
Expand All @@ -81,6 +125,14 @@ def __repr__(self) -> str:
repr_properties = f", **{repr(self._additional_properties)}" if self._additional_properties else ""
return f"Summary({repr(self.operation)}{repr_properties})"

def __eq__(self, other: Any) -> bool:
"""Compare if the summary is equal to another summary."""
return (
self.operation == other.operation and self.additional_properties == other.additional_properties
if isinstance(other, Summary)
else False
)


class Snapshot(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
Expand Down Expand Up @@ -116,3 +168,193 @@ class MetadataLogEntry(IcebergBaseModel):
class SnapshotLogEntry(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
timestamp_ms: int = Field(alias="timestamp-ms")


class SnapshotSummaryCollector:
added_file_size: int
removed_file_size: int
added_data_files: int
removed_data_files: int
added_eq_delete_files: int
removed_eq_delete_files: int
added_pos_delete_files: int
removed_pos_delete_files: int
added_delete_files: int
removed_delete_files: int
added_records: int
deleted_records: int
added_pos_deletes: int
removed_pos_deletes: int
added_eq_deletes: int
removed_eq_deletes: int

Copy link
Copy Markdown
Contributor

@HonahX HonahX Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java Implementation I saw a flag named trustSizeAndDeleteCounts, which is set to false when we add a DELETES manifest. Based on my understanding, the purpose of this flag is to let us skip reporting size and delete counts related metrics when we add one or more DELETES manifest since we do not know the exact number of rows deleted in the manifest.

Do we want to add the flag in this PR or in the future?

ref: apache/iceberg#1367 (comment)

Copy link
Copy Markdown
Contributor Author

@Fokko Fokko Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping to get some insights from @rdblue on this one. When we Operation.APPEND a table we can add existing DELETE manifests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag is needed because we don't have the correct counts for the eq and pos deletes in delete manifests. I don't think that we need to add whole manifests in Python so I'd skip it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things that I like to avoid; complexity and trust issues!

def __init__(self) -> None:
self.added_file_size = 0
self.removed_file_size = 0
self.added_data_files = 0
self.removed_data_files = 0
self.added_eq_delete_files = 0
self.removed_eq_delete_files = 0
self.added_pos_delete_files = 0
self.removed_pos_delete_files = 0
self.added_delete_files = 0
self.removed_delete_files = 0
self.added_records = 0
self.deleted_records = 0
self.added_pos_deletes = 0
self.removed_pos_deletes = 0
self.added_eq_deletes = 0
self.removed_eq_deletes = 0

def add_file(self, data_file: DataFile) -> None:
Comment thread
Fokko marked this conversation as resolved.
self.added_file_size += data_file.file_size_in_bytes

if data_file.content == DataFileContent.DATA:
self.added_data_files += 1
self.added_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.added_delete_files += 1
self.added_pos_delete_files += 1
self.added_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.added_delete_files += 1
self.added_eq_delete_files += 1
self.added_eq_deletes += data_file.record_count
else:
raise ValueError(f"Unknown data file content: {data_file.content}")

def remove_file(self, data_file: DataFile) -> None:
self.removed_file_size += data_file.file_size_in_bytes

if data_file.content == DataFileContent.DATA:
Comment thread
Fokko marked this conversation as resolved.
self.removed_data_files += 1
self.deleted_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.removed_delete_files += 1
self.removed_pos_delete_files += 1
self.removed_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.removed_delete_files += 1
self.removed_eq_delete_files += 1
self.removed_eq_deletes += data_file.record_count
else:
raise ValueError(f"Unknown data file content: {data_file.content}")

def build(self) -> Dict[str, str]:
def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
if num > 0:
properties[property_name] = str(num)

properties: Dict[str, str] = {}
set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE)
set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE)
set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES)
set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES)
set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES)
set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES)
set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES)
set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES)
set_when_positive(properties, self.added_records, ADDED_RECORDS)
set_when_positive(properties, self.deleted_records, DELETED_RECORDS)
set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES)
set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES)
set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES)
set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES)

return properties


def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary:
for prop in {
TOTAL_DATA_FILES,
TOTAL_DELETE_FILES,
TOTAL_RECORDS,
TOTAL_FILE_SIZE,
TOTAL_POSITION_DELETES,
TOTAL_EQUALITY_DELETES,
}:
summary[prop] = '0'

if value := previous_summary.get(TOTAL_DATA_FILES):
summary[DELETED_DATA_FILES] = value
if value := previous_summary.get(TOTAL_DELETE_FILES):
summary[REMOVED_DELETE_FILES] = value
if value := previous_summary.get(TOTAL_RECORDS):
summary[DELETED_RECORDS] = value
if value := previous_summary.get(TOTAL_FILE_SIZE):
summary[REMOVED_FILE_SIZE] = value
if value := previous_summary.get(TOTAL_POSITION_DELETES):
summary[REMOVED_POSITION_DELETES] = value
if value := previous_summary.get(TOTAL_EQUALITY_DELETES):
summary[REMOVED_EQUALITY_DELETES] = value

return summary


def _update_snapshot_summaries(
summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False
) -> Summary:
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE}:
raise ValueError(f"Operation not implemented: {summary.operation}")

if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:
summary = _truncate_table_summary(summary, previous_summary)

if not previous_summary:
previous_summary = {
TOTAL_DATA_FILES: '0',
TOTAL_DELETE_FILES: '0',
TOTAL_RECORDS: '0',
TOTAL_FILE_SIZE: '0',
TOTAL_POSITION_DELETES: '0',
TOTAL_EQUALITY_DELETES: '0',
}

def _update_totals(total_property: str, added_property: str, removed_property: str) -> None:
Comment thread
rdblue marked this conversation as resolved.
if previous_total_str := previous_summary.get(total_property):
try:
new_total = int(previous_total_str)
if new_total >= 0 and (added := summary.get(added_property)):
new_total += int(added)
if new_total >= 0 and (removed := summary.get(removed_property)):
new_total -= int(removed)
except ValueError as e:
raise ValueError(f"Could not parse summary property {total_property} to an int: {previous_total_str}") from e

if new_total >= 0:
summary[total_property] = str(new_total)

_update_totals(
total_property=TOTAL_DATA_FILES,
added_property=ADDED_DATA_FILES,
removed_property=DELETED_DATA_FILES,
)
_update_totals(
total_property=TOTAL_DELETE_FILES,
added_property=ADDED_DELETE_FILES,
removed_property=REMOVED_DELETE_FILES,
)
_update_totals(
total_property=TOTAL_RECORDS,
added_property=ADDED_RECORDS,
removed_property=DELETED_RECORDS,
)
_update_totals(
total_property=TOTAL_FILE_SIZE,
added_property=ADDED_FILE_SIZE,
removed_property=REMOVED_FILE_SIZE,
)
_update_totals(
total_property=TOTAL_POSITION_DELETES,
added_property=ADDED_POSITION_DELETES,
removed_property=REMOVED_POSITION_DELETES,
)
_update_totals(
total_property=TOTAL_EQUALITY_DELETES,
added_property=ADDED_EQUALITY_DELETES,
removed_property=REMOVED_EQUALITY_DELETES,
)

return summary
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ markers = [
]

# Turns a warning into an error
filterwarnings = [
"error"
]
#filterwarnings = [
# "error"
#]

[tool.black]
line-length = 130
Expand Down
Loading