diff --git a/pyiceberg/table/maintenance.py b/pyiceberg/table/maintenance.py index 0fcda35ae9..a73c819219 100644 --- a/pyiceberg/table/maintenance.py +++ b/pyiceberg/table/maintenance.py @@ -24,7 +24,7 @@ if TYPE_CHECKING: from pyiceberg.table import Table - from pyiceberg.table.update.snapshot import ExpireSnapshots + from pyiceberg.table.update.snapshot import ExpireSnapshots, RewriteDataFiles class MaintenanceTable: @@ -43,3 +43,30 @@ def expire_snapshots(self) -> ExpireSnapshots: from pyiceberg.table.update.snapshot import ExpireSnapshots return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True)) + + def rewrite_data_files(self) -> RewriteDataFiles: + """Return a RewriteDataFiles builder for compaction operations. + + This operation reads small data files and rewrites them into larger, + optimally-sized files. Files are selected based on size thresholds and + grouped by partition for efficient processing. + + Example: + result = ( + table.maintenance + .rewrite_data_files() + .filter("year = 2024") # Optional: restrict to partitions + .option("target-file-size-bytes", "134217728") # Optional: 128MB + .commit() + ) + + print(f"Rewrote {result.rewritten_data_files_count} files into " + f"{result.added_data_files_count} files") + + Returns: + RewriteDataFiles builder for configuring and executing file compaction. + """ + from pyiceberg.table import Transaction + from pyiceberg.table.update.snapshot import RewriteDataFiles + + return RewriteDataFiles(transaction=Transaction(self.tbl, autocommit=True)) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 987200bf67..d5dd6305ca 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -18,17 +18,21 @@ import concurrent.futures import itertools +import logging import uuid from abc import abstractmethod from collections import defaultdict from collections.abc import Callable from concurrent.futures import Future +from dataclasses import dataclass, field from datetime import datetime from functools import cached_property -from typing import TYPE_CHECKING, Generic +from typing import TYPE_CHECKING, Any, Generic from sortedcontainers import SortedList +logger = logging.getLogger(__name__) + from pyiceberg.avro.codecs import AvroCompressionCodec from pyiceberg.expressions import ( AlwaysFalse, @@ -1123,3 +1127,412 @@ def older_than(self, dt: datetime) -> ExpireSnapshots: if snapshot.timestamp_ms < expire_from and snapshot.snapshot_id not in protected_ids: self._snapshot_ids_to_expire.add(snapshot.snapshot_id) return self + + +@dataclass +class RewriteDataFilesResult: + """Result of a rewrite data files operation. + + Attributes: + rewritten_data_files_count: Number of data files that were rewritten. + added_data_files_count: Number of new data files created. + rewritten_bytes: Total bytes of data files that were rewritten. + failed_group_count: Number of file groups that failed to be rewritten. + """ + + rewritten_data_files_count: int = 0 + added_data_files_count: int = 0 + rewritten_bytes: int = 0 + failed_group_count: int = 0 + + +@dataclass +class FileGroup: + """A group of data files to be rewritten together. + + Attributes: + data_files: List of data files in this group. + """ + + data_files: list[DataFile] = field(default_factory=list) + + @property + def total_size_bytes(self) -> int: + """Return the total size in bytes of all files in this group.""" + return sum(f.file_size_in_bytes for f in self.data_files) + + @property + def file_count(self) -> int: + """Return the number of files in this group.""" + return len(self.data_files) + + +class RewriteDataFiles(UpdateTableMetadata["RewriteDataFiles"]): + """Rewrite data files by compacting small files into larger ones. + + This operation reads files that are below a size threshold and rewrites them + into optimally-sized files. Files are grouped by partition and bin-packed + to form groups for rewriting. + + Usage: + result = ( + table.maintenance() + .rewrite_data_files() + .filter("year = 2024") # Optional: restrict to partitions + .option("target-file-size-bytes", "134217728") # Optional: 128MB + .commit() + ) + """ + + # Configuration option names + OPTION_TARGET_FILE_SIZE_BYTES = "target-file-size-bytes" + OPTION_MIN_FILE_SIZE_BYTES = "min-file-size-bytes" + OPTION_MAX_FILE_SIZE_BYTES = "max-file-size-bytes" + OPTION_MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes" + OPTION_MIN_INPUT_FILES = "min-input-files" + + # Default values + DEFAULT_TARGET_FILE_SIZE_BYTES = 512 * 1024 * 1024 # 512 MB + DEFAULT_MAX_FILE_GROUP_SIZE_BYTES = 100 * 1024 * 1024 * 1024 # 100 GB + DEFAULT_MIN_INPUT_FILES = 2 + + _filter_expr: BooleanExpression | None + _options: dict[str, str] + _staged_result: RewriteDataFilesResult | None + + def __init__(self, transaction: Transaction) -> None: + super().__init__(transaction) + from pyiceberg.expressions import AlwaysTrue + + self._filter_expr = AlwaysTrue() + self._options = {} + self._staged_result = None + + def filter(self, expr: str | BooleanExpression) -> RewriteDataFiles: + """Apply a filter to restrict which files are considered for rewriting. + + Args: + expr: A filter expression (string or BooleanExpression) to filter files. + + Returns: + This for method chaining. + """ + from pyiceberg.expressions import And + + from pyiceberg.table import _parse_row_filter + + if self._filter_expr is None: + self._filter_expr = _parse_row_filter(expr) + else: + self._filter_expr = And(self._filter_expr, _parse_row_filter(expr)) + return self + + def option(self, name: str, value: str) -> RewriteDataFiles: + """Set a configuration option. + + Args: + name: The option name. + value: The option value. + + Returns: + This for method chaining. + """ + self._options[name] = value + return self + + def options(self, opts: dict[str, str]) -> RewriteDataFiles: + """Set multiple configuration options. + + Args: + opts: Dictionary of option names to values. + + Returns: + This for method chaining. + """ + self._options.update(opts) + return self + + @property + def _target_file_size(self) -> int: + """Get the target file size in bytes.""" + if self.OPTION_TARGET_FILE_SIZE_BYTES in self._options: + return int(self._options[self.OPTION_TARGET_FILE_SIZE_BYTES]) + return self.DEFAULT_TARGET_FILE_SIZE_BYTES + + @property + def _min_file_size(self) -> int: + """Get the minimum file size threshold. Files smaller than this are candidates for rewrite.""" + if self.OPTION_MIN_FILE_SIZE_BYTES in self._options: + return int(self._options[self.OPTION_MIN_FILE_SIZE_BYTES]) + # Default: 75% of target size + return int(self._target_file_size * 0.75) + + @property + def _max_file_size(self) -> int: + """Get the maximum file size threshold. Files larger than this are candidates for rewrite.""" + if self.OPTION_MAX_FILE_SIZE_BYTES in self._options: + return int(self._options[self.OPTION_MAX_FILE_SIZE_BYTES]) + # Default: 180% of target size + return int(self._target_file_size * 1.8) + + @property + def _max_file_group_size(self) -> int: + """Get the maximum size of a file group for rewriting.""" + if self.OPTION_MAX_FILE_GROUP_SIZE_BYTES in self._options: + return int(self._options[self.OPTION_MAX_FILE_GROUP_SIZE_BYTES]) + return self.DEFAULT_MAX_FILE_GROUP_SIZE_BYTES + + @property + def _min_input_files(self) -> int: + """Get the minimum number of input files required to trigger a rewrite.""" + if self.OPTION_MIN_INPUT_FILES in self._options: + return int(self._options[self.OPTION_MIN_INPUT_FILES]) + return self.DEFAULT_MIN_INPUT_FILES + + def _should_rewrite(self, data_file: DataFile) -> bool: + """Determine if a data file should be rewritten based on size criteria. + + A file is selected for rewrite if it is too small or too large. + + Args: + data_file: The data file to evaluate. + + Returns: + True if the file should be rewritten, False otherwise. + """ + file_size = data_file.file_size_in_bytes + return file_size < self._min_file_size or file_size > self._max_file_size + + def _get_candidate_files(self) -> list[DataFile]: + """Get data files that are candidates for rewriting. + + Returns: + List of data files that match the filter and size criteria. + """ + from pyiceberg.expressions import AlwaysTrue + from pyiceberg.expressions.visitors import inclusive_projection, manifest_evaluator + + table_metadata = self._transaction.table_metadata + io = self._transaction._table.io + snapshot = table_metadata.current_snapshot() + + if snapshot is None: + return [] + + candidates: list[DataFile] = [] + filter_expr = self._filter_expr if self._filter_expr is not None else AlwaysTrue() + + # Build partition filter for manifest pruning + schema = table_metadata.schema() + + def build_manifest_evaluator(spec_id: int) -> Callable[[ManifestFile], bool]: + spec = table_metadata.specs()[spec_id] + partition_filter = inclusive_projection(schema, spec, case_sensitive=True)(filter_expr) + return manifest_evaluator(spec, schema, partition_filter, case_sensitive=True) + + manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(build_manifest_evaluator) + + for manifest_file in snapshot.manifests(io=io): + if manifest_file.content != ManifestContent.DATA: + continue + + # Skip manifests that don't match the filter + if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file): + continue + + for entry in manifest_file.fetch_manifest_entry(io=io, discard_deleted=True): + data_file = entry.data_file + if data_file.content != DataFileContent.DATA: + continue + + if self._should_rewrite(data_file): + candidates.append(data_file) + + return candidates + + @staticmethod + def _group_files_by_partition(files: list[DataFile]) -> dict[Any, list[DataFile]]: + """Group files by their partition values. + + Args: + files: List of data files to group. + + Returns: + Dictionary mapping partition keys to lists of data files. + """ + from pyiceberg.typedef import Record + + groups: dict[Any, list[DataFile]] = defaultdict(list) + + for data_file in files: + # Use the partition Record directly as key (it has __hash__ and __eq__) + partition = data_file.partition + if partition is not None: + partition_key: Any = partition + else: + partition_key = Record() # Empty record for unpartitioned + groups[partition_key].append(data_file) + + return groups + + def _bin_pack_groups(self, partition_groups: dict[Any, list[DataFile]]) -> list[FileGroup]: + """Bin-pack files within each partition into groups for rewriting. + + Args: + partition_groups: Files grouped by partition. + + Returns: + List of FileGroups ready for rewriting. + """ + file_groups: list[FileGroup] = [] + + packer: ListPacker[DataFile] = ListPacker( + target_weight=self._max_file_group_size, + lookback=10, + largest_bin_first=False, + ) + + for partition_key, files in partition_groups.items(): + # Bin pack files within this partition + bins = packer.pack(files, weight_func=lambda f: f.file_size_in_bytes) + + for bin_files in bins: + # Only create a group if it has enough files + if len(bin_files) >= self._min_input_files: + file_groups.append(FileGroup(data_files=bin_files)) + + return file_groups + + def _rewrite_group(self, group: FileGroup) -> tuple[list[DataFile], list[DataFile]]: + """Rewrite a group of files. + + Args: + group: The file group to rewrite. + + Returns: + Tuple of (files_to_delete, files_to_add). + """ + from pyiceberg.expressions import AlwaysTrue + + from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files + from pyiceberg.table import FileScanTask + + table_metadata = self._transaction.table_metadata + io = self._transaction._table.io + schema = table_metadata.schema() + + # Create scan tasks for the files in this group + tasks = [FileScanTask(data_file=f) for f in group.data_files] + + # Read the data from all files in the group + arrow_scan = ArrowScan( + table_metadata=table_metadata, + io=io, + projected_schema=schema, + row_filter=AlwaysTrue(), + case_sensitive=True, + ) + + # Read all data into an Arrow table + arrow_table = arrow_scan.to_table(tasks) + + if arrow_table.num_rows == 0: + # No data to write, just delete the old files + return list(group.data_files), [] + + # Write new data files + new_files = list( + _dataframe_to_data_files( + table_metadata=table_metadata, + df=arrow_table, + io=io, + ) + ) + + return list(group.data_files), new_files + + def _commit(self) -> UpdatesAndRequirements: + """Execute file rewriting and return metadata updates.""" + # Get candidates + candidates = self._get_candidate_files() + if not candidates: + logger.info("No files found to rewrite") + self._staged_result = RewriteDataFilesResult() + return (), () + + logger.info("Found %d files to rewrite", len(candidates)) + + # Group by partition + partition_groups = self._group_files_by_partition(candidates) + logger.info("Files grouped into %d partitions", len(partition_groups)) + + # Bin-pack + file_groups = self._bin_pack_groups(partition_groups) + logger.info("Created %d file groups for rewriting", len(file_groups)) + + if not file_groups: + logger.info("No file groups meet the minimum input files requirement") + self._staged_result = RewriteDataFilesResult() + return (), () + + # Rewrite each group + result = RewriteDataFilesResult() + all_files_to_delete: list[DataFile] = [] + all_files_to_add: list[DataFile] = [] + + for group in file_groups: + try: + files_to_delete, files_to_add = self._rewrite_group(group) + all_files_to_delete.extend(files_to_delete) + all_files_to_add.extend(files_to_add) + result.rewritten_data_files_count += len(files_to_delete) + result.added_data_files_count += len(files_to_add) + result.rewritten_bytes += group.total_size_bytes + logger.info( + "Rewrote group: %d files (%d bytes) -> %d files", + len(files_to_delete), + group.total_size_bytes, + len(files_to_add), + ) + except Exception as e: + logger.warning("Failed to rewrite file group: %s", str(e)) + result.failed_group_count += 1 + + self._staged_result = result + + if not all_files_to_delete: + logger.info("No files were rewritten") + return (), () + + # Create _OverwriteFiles and get updates + io = self._transaction._table.io + overwrite = _OverwriteFiles( + operation=Operation.OVERWRITE, + transaction=self._transaction, + io=io, + ) + for new_file in all_files_to_add: + overwrite.append_data_file(new_file) + for old_file in all_files_to_delete: + overwrite.delete_data_file(old_file) + + # Get updates from overwrite (writes manifests, returns updates) + updates = overwrite._commit() + + logger.info( + "Rewrite complete: %d files -> %d files (%d bytes rewritten)", + result.rewritten_data_files_count, + result.added_data_files_count, + result.rewritten_bytes, + ) + + return updates + + def commit(self) -> RewriteDataFilesResult: + """Execute rewrite and commit changes. + + With autocommit=True (default), commits to catalog immediately. + With autocommit=False, only stages - call _transaction.commit_transaction() later. + """ + self._transaction._apply(*self._commit()) + return self._staged_result or RewriteDataFilesResult() diff --git a/tests/table/test_rewrite_data_files.py b/tests/table/test_rewrite_data_files.py new file mode 100644 index 0000000000..c3bee1951c --- /dev/null +++ b/tests/table/test_rewrite_data_files.py @@ -0,0 +1,578 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from unittest.mock import Mock + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.manifest import DataFile, DataFileContent, FileFormat +from pyiceberg.table.update.snapshot import FileGroup, RewriteDataFiles, RewriteDataFilesResult +from pyiceberg.typedef import Record + + +def test_rewrite_data_files_result_defaults() -> None: + """Test that RewriteDataFilesResult has correct default values.""" + result = RewriteDataFilesResult() + assert result.rewritten_data_files_count == 0 + assert result.added_data_files_count == 0 + assert result.rewritten_bytes == 0 + assert result.failed_group_count == 0 + + +def test_file_group_properties() -> None: + """Test FileGroup properties.""" + # Create mock data files + file1 = DataFile.from_args( + file_path="/path/to/file1.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=1000, + record_count=100, + ) + file2 = DataFile.from_args( + file_path="/path/to/file2.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=2000, + record_count=200, + ) + + group = FileGroup(data_files=[file1, file2]) + + assert group.file_count == 2 + assert group.total_size_bytes == 3000 + + +def test_file_group_empty() -> None: + """Test FileGroup with no files.""" + group = FileGroup() + assert group.file_count == 0 + assert group.total_size_bytes == 0 + + +def test_rewrite_data_files_option() -> None: + """Test setting options on RewriteDataFiles.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + + # Test single option + result = rewrite.option("target-file-size-bytes", "134217728") + assert result is rewrite # Method chaining + assert rewrite._options["target-file-size-bytes"] == "134217728" + + +def test_rewrite_data_files_options() -> None: + """Test setting multiple options on RewriteDataFiles.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + + # Test multiple options + result = rewrite.options({ + "target-file-size-bytes": "134217728", + "min-input-files": "3", + }) + assert result is rewrite # Method chaining + assert rewrite._options["target-file-size-bytes"] == "134217728" + assert rewrite._options["min-input-files"] == "3" + + +def test_rewrite_data_files_target_file_size_default() -> None: + """Test default target file size.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + + # Default is 512MB + assert rewrite._target_file_size == 512 * 1024 * 1024 + + +def test_rewrite_data_files_target_file_size_custom() -> None: + """Test custom target file size.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + rewrite.option("target-file-size-bytes", "134217728") + + assert rewrite._target_file_size == 134217728 # 128MB + + +def test_rewrite_data_files_min_file_size_default() -> None: + """Test default minimum file size (75% of target).""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + + # Default is 75% of 512MB = 384MB + expected = int(512 * 1024 * 1024 * 0.75) + assert rewrite._min_file_size == expected + + +def test_rewrite_data_files_min_file_size_custom() -> None: + """Test custom minimum file size.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + rewrite.option("min-file-size-bytes", "100000000") + + assert rewrite._min_file_size == 100000000 + + +def test_rewrite_data_files_max_file_size_default() -> None: + """Test default maximum file size (180% of target).""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + + # Default is 180% of 512MB + expected = int(512 * 1024 * 1024 * 1.8) + assert rewrite._max_file_size == expected + + +def test_rewrite_data_files_max_file_size_custom() -> None: + """Test custom maximum file size.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + rewrite.option("max-file-size-bytes", "1000000000") + + assert rewrite._max_file_size == 1000000000 + + +def test_rewrite_data_files_min_input_files_default() -> None: + """Test default minimum input files.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + + assert rewrite._min_input_files == 2 + + +def test_rewrite_data_files_min_input_files_custom() -> None: + """Test custom minimum input files.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + rewrite.option("min-input-files", "5") + + assert rewrite._min_input_files == 5 + + +def test_should_rewrite_small_file() -> None: + """Test that small files are candidates for rewrite.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + # Set a small target so our test file is considered "small" + rewrite.option("target-file-size-bytes", "10000000") # 10MB target + + small_file = DataFile.from_args( + file_path="/path/to/small.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=1000000, # 1MB, below 75% of 10MB (7.5MB) + record_count=100, + ) + + assert rewrite._should_rewrite(small_file) is True + + +def test_should_rewrite_large_file() -> None: + """Test that large files are candidates for rewrite.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + # Set a small target so our test file is considered "large" + rewrite.option("target-file-size-bytes", "10000000") # 10MB target + + large_file = DataFile.from_args( + file_path="/path/to/large.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=20000000, # 20MB, above 180% of 10MB (18MB) + record_count=2000, + ) + + assert rewrite._should_rewrite(large_file) is True + + +def test_should_not_rewrite_optimal_file() -> None: + """Test that optimally-sized files are not candidates for rewrite.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + rewrite.option("target-file-size-bytes", "10000000") # 10MB target + + optimal_file = DataFile.from_args( + file_path="/path/to/optimal.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=10000000, # 10MB, exactly at target + record_count=1000, + ) + + assert rewrite._should_rewrite(optimal_file) is False + + +def test_group_files_by_partition_unpartitioned() -> None: + """Test grouping files for unpartitioned table.""" + file1 = DataFile.from_args( + file_path="/path/to/file1.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=1000, + record_count=100, + ) + file2 = DataFile.from_args( + file_path="/path/to/file2.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=2000, + record_count=200, + ) + + # Both files have no partition (None) + groups = RewriteDataFiles._group_files_by_partition([file1, file2]) + + # All files should be in one group + assert len(groups) == 1 + group_files = list(groups.values())[0] + assert len(group_files) == 2 + assert file1 in group_files + assert file2 in group_files + + +def test_group_files_by_partition_partitioned() -> None: + """Test grouping files for partitioned table.""" + # Create mock files with different partitions + partition1 = Record(2024, 1) # year=2024, month=1 + partition2 = Record(2024, 2) # year=2024, month=2 + + file1 = DataFile.from_args( + file_path="/path/to/file1.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=1000, + record_count=100, + partition=partition1, + ) + file2 = DataFile.from_args( + file_path="/path/to/file2.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=2000, + record_count=200, + partition=partition1, + ) + file3 = DataFile.from_args( + file_path="/path/to/file3.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=3000, + record_count=300, + partition=partition2, + ) + + groups = RewriteDataFiles._group_files_by_partition([file1, file2, file3]) + + # Should have 2 partition groups + assert len(groups) == 2 + + # Find the partition1 group + partition1_files = groups[partition1] + assert len(partition1_files) == 2 + assert file1 in partition1_files + assert file2 in partition1_files + + # Find the partition2 group + partition2_files = groups[partition2] + assert len(partition2_files) == 1 + assert file3 in partition2_files + + +def test_bin_pack_groups_filters_small_groups() -> None: + """Test that groups with fewer than min_input_files are filtered out.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + rewrite.option("min-input-files", "3") # Require at least 3 files + rewrite.option("max-file-group-size-bytes", "1000000000") # 1GB max group size + + # Create a small partition with only 2 files + file1 = DataFile.from_args( + file_path="/path/to/file1.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=1000, + record_count=100, + ) + file2 = DataFile.from_args( + file_path="/path/to/file2.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=2000, + record_count=200, + ) + + partition_groups = {Record(): [file1, file2]} + file_groups = rewrite._bin_pack_groups(partition_groups) + + # Group should be filtered out since it has only 2 files (< 3) + assert len(file_groups) == 0 + + +def test_bin_pack_groups_includes_large_groups() -> None: + """Test that groups with sufficient files are included.""" + mock_transaction = Mock() + mock_transaction.table_metadata = Mock() + + rewrite = RewriteDataFiles(transaction=mock_transaction) + rewrite.option("min-input-files", "2") # Require at least 2 files + rewrite.option("max-file-group-size-bytes", "1000000000") # 1GB max group size + + file1 = DataFile.from_args( + file_path="/path/to/file1.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=1000, + record_count=100, + ) + file2 = DataFile.from_args( + file_path="/path/to/file2.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=2000, + record_count=200, + ) + file3 = DataFile.from_args( + file_path="/path/to/file3.parquet", + file_format=FileFormat.PARQUET, + file_size_in_bytes=3000, + record_count=300, + ) + + partition_groups = {Record(): [file1, file2, file3]} + file_groups = rewrite._bin_pack_groups(partition_groups) + + # Should have at least one group + assert len(file_groups) >= 1 + # Total files should be 3 + total_files = sum(g.file_count for g in file_groups) + assert total_files == 3 + + +def test_commit_returns_empty_result_when_no_snapshot(table_v2: "Table") -> None: + """Test that commit returns empty result when table has no snapshot.""" + from pyiceberg.table import Table + + # Mock the table metadata to have no current snapshot + table_v2.metadata = table_v2.metadata.model_copy( + update={"current_snapshot_id": None, "snapshots": []} + ) + + result = table_v2.maintenance.rewrite_data_files().commit() + + assert result.rewritten_data_files_count == 0 + assert result.added_data_files_count == 0 + assert result.rewritten_bytes == 0 + + +@pytest.fixture +def table_v2(): + """Create a minimal table fixture for testing.""" + from pyiceberg.catalog.noop import NoopCatalog + from pyiceberg.io import load_file_io + from pyiceberg.table import Table + from pyiceberg.table.metadata import TableMetadataV2 + + # Minimal v2 metadata + metadata = TableMetadataV2( + format_version=2, + table_uuid="9c12d441-03fe-4693-9a96-a0705ddf69c1", + location="s3://bucket/test/location", + last_updated_ms=1602638573590, + last_column_id=3, + schemas=[ + { + "schema_id": 0, + "type": "struct", + "fields": [ + {"id": 1, "name": "x", "required": True, "type": "long"}, + {"id": 2, "name": "y", "required": True, "type": "long"}, + {"id": 3, "name": "z", "required": True, "type": "long"}, + ], + } + ], + current_schema_id=0, + partition_specs=[{"spec_id": 0, "fields": []}], + default_spec_id=0, + last_partition_id=1000, + sort_orders=[{"order_id": 0, "fields": []}], + default_sort_order_id=0, + properties={}, + current_snapshot_id=None, + snapshots=[], + ) + + return Table( + identifier=("database", "table"), + metadata=metadata, + metadata_location="s3://bucket/test/location/metadata/v1.metadata.json", + io=load_file_io(), + catalog=NoopCatalog("NoopCatalog"), + ) + + +def test_maintenance_rewrite_data_files_method_exists(table_v2: "Table") -> None: + """Test that rewrite_data_files method exists on maintenance.""" + maintenance = table_v2.maintenance + assert hasattr(maintenance, "rewrite_data_files") + + +def test_maintenance_rewrite_data_files_returns_builder(table_v2: "Table") -> None: + """Test that rewrite_data_files returns a RewriteDataFiles builder.""" + builder = table_v2.maintenance.rewrite_data_files() + assert isinstance(builder, RewriteDataFiles) + + +def test_maintenance_rewrite_data_files_chaining(table_v2: "Table") -> None: + """Test method chaining on RewriteDataFiles builder.""" + builder = ( + table_v2.maintenance + .rewrite_data_files() + .option("target-file-size-bytes", "134217728") + .option("min-input-files", "3") + ) + assert isinstance(builder, RewriteDataFiles) + assert builder._options["target-file-size-bytes"] == "134217728" + assert builder._options["min-input-files"] == "3" + + +# Integration tests that test the full commit path +@pytest.fixture +def catalog_with_table(tmp_path): + """Create a catalog with a table containing multiple small files.""" + from pyiceberg.catalog.memory import InMemoryCatalog + + catalog = InMemoryCatalog("test_catalog", warehouse=str(tmp_path)) + catalog.create_namespace("default") + return catalog + + +def test_rewrite_data_files_full_commit_path(catalog_with_table, tmp_path) -> None: + """Integration test: create table with small files, run rewrite, verify consolidation.""" + # Create a simple schema + schema = pa.schema([ + ("id", pa.int64()), + ("name", pa.string()), + ]) + + # Create table + table = catalog_with_table.create_table( + "default.test_rewrite", + schema=schema, + ) + + # Append multiple small batches to create multiple small files + for i in range(3): + small_data = pa.table({ + "id": [i * 10 + j for j in range(10)], + "name": [f"name_{i * 10 + j}" for j in range(10)], + }) + table.append(small_data) + + # Verify we have multiple files + files_before = list(table.scan().plan_files()) + assert len(files_before) == 3, f"Expected 3 files, got {len(files_before)}" + + # Run rewrite with very small target size to ensure files are candidates + # and min-input-files=2 to allow grouping + result = ( + table.maintenance + .rewrite_data_files() + .option("target-file-size-bytes", "1000000000") # 1GB target (larger than our files) + .option("min-file-size-bytes", "100000000") # 100MB min (larger than our files) + .option("min-input-files", "2") + .commit() + ) + + # Verify the result + assert result.rewritten_data_files_count == 3, f"Expected 3 files rewritten, got {result.rewritten_data_files_count}" + assert result.added_data_files_count >= 1, f"Expected at least 1 file added, got {result.added_data_files_count}" + assert result.failed_group_count == 0, f"Expected 0 failed groups, got {result.failed_group_count}" + + # Verify data integrity - should still have all 30 rows + result_table = table.scan().to_arrow() + assert result_table.num_rows == 30, f"Expected 30 rows, got {result_table.num_rows}" + + # Verify files were consolidated + files_after = list(table.scan().plan_files()) + assert len(files_after) < len(files_before), f"Expected fewer files after rewrite: {len(files_after)} vs {len(files_before)}" + + +def test_rewrite_data_files_no_candidates(catalog_with_table, tmp_path) -> None: + """Integration test: verify no rewrite when files are within size thresholds.""" + schema = pa.schema([ + ("id", pa.int64()), + ("name", pa.string()), + ]) + + table = catalog_with_table.create_table( + "default.test_rewrite_no_candidates", + schema=schema, + ) + + # Append a single batch + data = pa.table({ + "id": [i for i in range(100)], + "name": [f"name_{i}" for i in range(100)], + }) + table.append(data) + + # Run rewrite with default thresholds - the file should be too small to be a candidate + # but we only have 1 file so min_input_files won't be met anyway + result = ( + table.maintenance + .rewrite_data_files() + .option("min-input-files", "2") + .commit() + ) + + # No files should be rewritten because we only have 1 file + assert result.rewritten_data_files_count == 0 + assert result.added_data_files_count == 0 + + +def test_rewrite_data_files_empty_table(catalog_with_table, tmp_path) -> None: + """Integration test: verify rewrite handles empty table gracefully.""" + schema = pa.schema([ + ("id", pa.int64()), + ]) + + table = catalog_with_table.create_table( + "default.test_rewrite_empty", + schema=schema, + ) + + # Table has no data, no snapshot + result = table.maintenance.rewrite_data_files().commit() + + assert result.rewritten_data_files_count == 0 + assert result.added_data_files_count == 0 + assert result.failed_group_count == 0