From 2eb026cf203edd8263bad4269456bb1e60b19010 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:32:08 +0800 Subject: [PATCH 1/2] [Python] Support overwrite mode for writer --- .../manifest/manifest_file_manager.py | 76 ++++---- paimon-python/pypaimon/read/plan.py | 5 + paimon-python/pypaimon/read/table_scan.py | 17 +- .../pypaimon/tests/reader_basic_test.py | 81 ++++++++- .../tests/rest_table_read_write_test.py | 81 +++++++++ .../pypaimon/tests/test_file_store_commit.py | 52 +++--- .../pypaimon/write/batch_table_commit.py | 2 +- .../pypaimon/write/batch_write_builder.py | 2 +- .../pypaimon/write/file_store_commit.py | 166 +++++++++++------- 9 files changed, 344 insertions(+), 138 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 7c97f7b0ca40..b2cd7868bb6f 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -24,9 +24,8 @@ from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA, ManifestEntry) from pypaimon.manifest.schema.simple_stats import SimpleStats -from pypaimon.table.row.binary_row import (BinaryRow, BinaryRowDeserializer, +from pypaimon.table.row.binary_row import (BinaryRowDeserializer, BinaryRowSerializer) -from pypaimon.write.commit_message import CommitMessage class ManifestFileManager: @@ -99,46 +98,43 @@ def read(self, manifest_file_name: str, shard_filter=None) -> List[ManifestEntry entries.append(entry) return entries - def write(self, file_name, commit_messages: List[CommitMessage]): + def write(self, file_name, entries: List[ManifestEntry]): avro_records = [] - for message in commit_messages: - partition_bytes = BinaryRowSerializer.to_bytes( - BinaryRow(list(message.partition), self.table.table_schema.get_partition_key_fields())) - for file in message.new_files: - avro_record = { - "_VERSION": 2, - "_KIND": 0, - "_PARTITION": partition_bytes, - "_BUCKET": message.bucket, - "_TOTAL_BUCKETS": self.table.total_buckets, - "_FILE": { - "_FILE_NAME": file.file_name, - "_FILE_SIZE": file.file_size, - "_ROW_COUNT": file.row_count, - "_MIN_KEY": BinaryRowSerializer.to_bytes(file.min_key), - "_MAX_KEY": BinaryRowSerializer.to_bytes(file.max_key), - "_KEY_STATS": { - "_MIN_VALUES": BinaryRowSerializer.to_bytes(file.key_stats.min_values), - "_MAX_VALUES": BinaryRowSerializer.to_bytes(file.key_stats.max_values), - "_NULL_COUNTS": file.key_stats.null_counts, - }, - "_VALUE_STATS": { - "_MIN_VALUES": BinaryRowSerializer.to_bytes(file.value_stats.min_values), - "_MAX_VALUES": BinaryRowSerializer.to_bytes(file.value_stats.max_values), - "_NULL_COUNTS": file.value_stats.null_counts, - }, - "_MIN_SEQUENCE_NUMBER": file.min_sequence_number, - "_MAX_SEQUENCE_NUMBER": file.max_sequence_number, - "_SCHEMA_ID": file.schema_id, - "_LEVEL": file.level, - "_EXTRA_FILES": file.extra_files, - "_CREATION_TIME": file.creation_time, - "_DELETE_ROW_COUNT": file.delete_row_count, - "_EMBEDDED_FILE_INDEX": file.embedded_index, - "_FILE_SOURCE": file.file_source, - } + for entry in entries: + avro_record = { + "_VERSION": 2, + "_KIND": entry.kind, + "_PARTITION": BinaryRowSerializer.to_bytes(entry.partition), + "_BUCKET": entry.bucket, + "_TOTAL_BUCKETS": entry.bucket, + "_FILE": { + "_FILE_NAME": entry.file.file_name, + "_FILE_SIZE": entry.file.file_size, + "_ROW_COUNT": entry.file.row_count, + "_MIN_KEY": BinaryRowSerializer.to_bytes(entry.file.min_key), + "_MAX_KEY": BinaryRowSerializer.to_bytes(entry.file.max_key), + "_KEY_STATS": { + "_MIN_VALUES": BinaryRowSerializer.to_bytes(entry.file.key_stats.min_values), + "_MAX_VALUES": BinaryRowSerializer.to_bytes(entry.file.key_stats.max_values), + "_NULL_COUNTS": entry.file.key_stats.null_counts, + }, + "_VALUE_STATS": { + "_MIN_VALUES": BinaryRowSerializer.to_bytes(entry.file.value_stats.min_values), + "_MAX_VALUES": BinaryRowSerializer.to_bytes(entry.file.value_stats.max_values), + "_NULL_COUNTS": entry.file.value_stats.null_counts, + }, + "_MIN_SEQUENCE_NUMBER": entry.file.min_sequence_number, + "_MAX_SEQUENCE_NUMBER": entry.file.max_sequence_number, + "_SCHEMA_ID": entry.file.schema_id, + "_LEVEL": entry.file.level, + "_EXTRA_FILES": entry.file.extra_files, + "_CREATION_TIME": entry.file.creation_time, + "_DELETE_ROW_COUNT": entry.file.delete_row_count, + "_EMBEDDED_FILE_INDEX": entry.file.embedded_index, + "_FILE_SOURCE": entry.file.file_source, } - avro_records.append(avro_record) + } + avro_records.append(avro_record) manifest_path = self.manifest_path / file_name try: diff --git a/paimon-python/pypaimon/read/plan.py b/paimon-python/pypaimon/read/plan.py index c3aeaa8a54e2..9a65fd6f12b7 100644 --- a/paimon-python/pypaimon/read/plan.py +++ b/paimon-python/pypaimon/read/plan.py @@ -19,13 +19,18 @@ from dataclasses import dataclass from typing import List +from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.read.split import Split @dataclass class Plan: """Implementation of Plan for native Python reading.""" + _files: List[ManifestEntry] _splits: List[Split] + def files(self) -> List[ManifestEntry]: + return self._files + def splits(self) -> List[Split]: return self._splits diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index a4d92bb79682..0b9f97db4f9f 100644 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -73,16 +73,25 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int], def plan(self) -> Plan: latest_snapshot = self.snapshot_manager.get_latest_snapshot() if not latest_snapshot: - return Plan([]) + return Plan([], []) manifest_files = self.manifest_list_manager.read_all(latest_snapshot) - file_entries = [] + deleted_entries = set() + added_entries = [] + # TODO: filter manifest files by predicate for manifest_file in manifest_files: manifest_entries = self.manifest_file_manager.read(manifest_file.file_name, lambda row: self._bucket_filter(row)) for entry in manifest_entries: if entry.kind == 0: - file_entries.append(entry) + added_entries.append(entry) + else: + deleted_entries.add((tuple(entry.partition.values), entry.bucket, entry.file.file_name)) + + file_entries = [ + entry for entry in added_entries + if (tuple(entry.partition.values), entry.bucket, entry.file.file_name) not in deleted_entries + ] if self.predicate: file_entries = self._filter_by_predicate(file_entries) @@ -100,7 +109,7 @@ def plan(self) -> Plan: splits = self._apply_push_down_limit(splits) - return Plan(splits) + return Plan(file_entries, splits) def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan': self.idx_of_this_subtask = idx_of_this_subtask diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py b/paimon-python/pypaimon/tests/reader_basic_test.py index 4cfb4cf2ac85..445a65763f65 100644 --- a/paimon-python/pypaimon/tests/reader_basic_test.py +++ b/paimon-python/pypaimon/tests/reader_basic_test.py @@ -68,8 +68,85 @@ def tearDownClass(cls): shutil.rmtree(cls.tempdir, ignore_errors=True) def test_overwrite(self): - pass - # TODO: support overwrite + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()) + ]) + schema = Schema.from_pyarrow_schema(simple_pa_schema, partition_keys=['f0'], + options={'dynamic-partition-overwrite': 'false'}) + self.catalog.create_table('default.test_overwrite', schema, False) + table = self.catalog.get_table('default.test_overwrite') + read_builder = table.new_read_builder() + + # test normal write + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + df0 = pd.DataFrame({ + 'f0': [1, 2], + 'f1': ['apple', 'banana'], + }) + + table_write.write_pandas(df0) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df0 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0') + df0['f0'] = df0['f0'].astype('int32') + pd.testing.assert_frame_equal( + actual_df0.reset_index(drop=True), df0.reset_index(drop=True)) + + # test partially overwrite + write_builder = table.new_batch_write_builder().overwrite({'f0': 1}) + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + df1 = pd.DataFrame({ + 'f0': [1], + 'f1': ['watermelon'], + }) + + table_write.write_pandas(df1) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df1 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0') + expected_df1 = pd.DataFrame({ + 'f0': [1, 2], + 'f1': ['watermelon', 'banana'] + }) + expected_df1['f0'] = expected_df1['f0'].astype('int32') + pd.testing.assert_frame_equal( + actual_df1.reset_index(drop=True), expected_df1.reset_index(drop=True)) + + # test fully overwrite + write_builder = table.new_batch_write_builder().overwrite() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + df2 = pd.DataFrame({ + 'f0': [3], + 'f1': ['Neo'], + }) + + table_write.write_pandas(df2) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df2 = table_read.to_pandas(table_scan.plan().splits()) + df2['f0'] = df2['f0'].astype('int32') + pd.testing.assert_frame_equal( + actual_df2.reset_index(drop=True), df2.reset_index(drop=True)) def testWriteWrongSchema(self): self.catalog.create_table('default.test_wrong_schema', diff --git a/paimon-python/pypaimon/tests/rest_table_read_write_test.py b/paimon-python/pypaimon/tests/rest_table_read_write_test.py index 0608c4ef58cd..efa0839a3332 100644 --- a/paimon-python/pypaimon/tests/rest_table_read_write_test.py +++ b/paimon-python/pypaimon/tests/rest_table_read_write_test.py @@ -32,6 +32,87 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest): + def test_overwrite(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()) + ]) + schema = Schema.from_pyarrow_schema(simple_pa_schema, partition_keys=['f0'], + options={'dynamic-partition-overwrite': 'false'}) + self.rest_catalog.create_table('default.test_overwrite', schema, False) + table = self.rest_catalog.get_table('default.test_overwrite') + read_builder = table.new_read_builder() + + # test normal write + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + df0 = pd.DataFrame({ + 'f0': [1, 2], + 'f1': ['apple', 'banana'], + }) + + table_write.write_pandas(df0) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df0 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0') + df0['f0'] = df0['f0'].astype('int32') + pd.testing.assert_frame_equal( + actual_df0.reset_index(drop=True), df0.reset_index(drop=True)) + + # test partially overwrite + write_builder = table.new_batch_write_builder().overwrite({'f0': 1}) + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + df1 = pd.DataFrame({ + 'f0': [1], + 'f1': ['watermelon'], + }) + + table_write.write_pandas(df1) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df1 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0') + expected_df1 = pd.DataFrame({ + 'f0': [1, 2], + 'f1': ['watermelon', 'banana'] + }) + expected_df1['f0'] = expected_df1['f0'].astype('int32') + pd.testing.assert_frame_equal( + actual_df1.reset_index(drop=True), expected_df1.reset_index(drop=True)) + + # test fully overwrite + write_builder = table.new_batch_write_builder().overwrite() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + df2 = pd.DataFrame({ + 'f0': [3], + 'f1': ['Neo'], + }) + + table_write.write_pandas(df2) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df2 = table_read.to_pandas(table_scan.plan().splits()) + df2['f0'] = df2['f0'].astype('int32') + pd.testing.assert_frame_equal( + actual_df2.reset_index(drop=True), df2.reset_index(drop=True)) + def testParquetAppendOnlyReader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_parquet', schema, False) diff --git a/paimon-python/pypaimon/tests/test_file_store_commit.py b/paimon-python/pypaimon/tests/test_file_store_commit.py index 6f32894c9050..5110bc0f8035 100644 --- a/paimon-python/pypaimon/tests/test_file_store_commit.py +++ b/paimon-python/pypaimon/tests/test_file_store_commit.py @@ -22,7 +22,9 @@ from unittest.mock import Mock, patch from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.snapshot.snapshot_commit import PartitionStatistics +from pypaimon.table.row.binary_row import BinaryRow from pypaimon.write.commit_message import CommitMessage from pypaimon.write.file_store_commit import FileStoreCommit @@ -84,7 +86,7 @@ def test_generate_partition_statistics_single_partition_single_file( ) # Test method - statistics = file_store_commit._generate_partition_statistics([commit_message]) + statistics = file_store_commit._generate_partition_statistics(self._to_entries([commit_message])) # Verify results self.assertEqual(len(statistics), 1) @@ -145,7 +147,7 @@ def test_generate_partition_statistics_multiple_files_same_partition( ) # Test method - statistics = file_store_commit._generate_partition_statistics([commit_message]) + statistics = file_store_commit._generate_partition_statistics(self._to_entries([commit_message])) # Verify results self.assertEqual(len(statistics), 1) @@ -213,7 +215,8 @@ def test_generate_partition_statistics_multiple_partitions( ) # Test method - statistics = file_store_commit._generate_partition_statistics([commit_message_1, commit_message_2]) + statistics = file_store_commit._generate_partition_statistics( + self._to_entries([commit_message_1, commit_message_2])) # Verify results self.assertEqual(len(statistics), 2) @@ -268,7 +271,7 @@ def test_generate_partition_statistics_unpartitioned_table( ) # Test method - statistics = file_store_commit._generate_partition_statistics([commit_message]) + statistics = file_store_commit._generate_partition_statistics(self._to_entries([commit_message])) # Verify results self.assertEqual(len(statistics), 1) @@ -308,7 +311,7 @@ def test_generate_partition_statistics_no_creation_time( ) # Test method - statistics = file_store_commit._generate_partition_statistics([commit_message]) + statistics = file_store_commit._generate_partition_statistics(self._to_entries([commit_message])) # Verify results self.assertEqual(len(statistics), 1) @@ -347,7 +350,7 @@ def test_generate_partition_statistics_mismatched_partition_keys( ) # Test method - statistics = file_store_commit._generate_partition_statistics([commit_message]) + statistics = file_store_commit._generate_partition_statistics(self._to_entries([commit_message])) # Verify results - should fallback to index-based naming self.assertEqual(len(statistics), 1) @@ -372,29 +375,20 @@ def test_generate_partition_statistics_empty_commit_messages( # Verify results self.assertEqual(len(statistics), 0) - def test_generate_partition_statistics_commit_message_no_files( - self, mock_manifest_list_manager, mock_manifest_file_manager, mock_snapshot_manager): - """Test partition statistics generation with commit message containing no files.""" - # Create FileStoreCommit instance - file_store_commit = self._create_file_store_commit() - - commit_message = CommitMessage( - partition=('2024-01-15', 'us-east-1'), - bucket=0, - new_files=[] # No files - ) - - # Test method - statistics = file_store_commit._generate_partition_statistics([commit_message]) - - # Verify results - should still create a partition entry with zero counts - self.assertEqual(len(statistics), 1) - - stat = statistics[0] - self.assertEqual(stat.spec, {'dt': '2024-01-15', 'region': 'us-east-1'}) - self.assertEqual(stat.record_count, 0) - self.assertEqual(stat.file_count, 0) - self.assertEqual(stat.file_size_in_bytes, 0) + @staticmethod + def _to_entries(commit_messages): + commit_entries = [] + for msg in commit_messages: + partition = BinaryRow(list(msg.partition), None) + for file in msg.new_files: + commit_entries.append(ManifestEntry( + kind=0, + partition=partition, + bucket=msg.bucket, + total_buckets=None, + file=file + )) + return commit_entries if __name__ == '__main__': diff --git a/paimon-python/pypaimon/write/batch_table_commit.py b/paimon-python/pypaimon/write/batch_table_commit.py index 55d8de0905f7..7f42e1cef113 100644 --- a/paimon-python/pypaimon/write/batch_table_commit.py +++ b/paimon-python/pypaimon/write/batch_table_commit.py @@ -53,7 +53,7 @@ def commit(self, commit_messages: List[CommitMessage]): try: if self.overwrite_partition is not None: self.file_store_commit.overwrite( - partition=self.overwrite_partition, + overwrite_partition=self.overwrite_partition, commit_messages=non_empty_messages, commit_identifier=commit_identifier ) diff --git a/paimon-python/pypaimon/write/batch_write_builder.py b/paimon-python/pypaimon/write/batch_write_builder.py index 0a274cfee0df..2380530fbc29 100644 --- a/paimon-python/pypaimon/write/batch_write_builder.py +++ b/paimon-python/pypaimon/write/batch_write_builder.py @@ -33,7 +33,7 @@ def __init__(self, table): self.static_partition = None def overwrite(self, static_partition: Optional[dict] = None): - self.static_partition = static_partition + self.static_partition = static_partition if static_partition is not None else {} return self def new_write(self) -> BatchTableWrite: diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index efe8207ebe69..ed9f5d16fc65 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -21,15 +21,19 @@ from pathlib import Path from typing import List +from pypaimon.common.predicate_builder import PredicateBuilder from pypaimon.manifest.manifest_file_manager import ManifestFileManager from pypaimon.manifest.manifest_list_manager import ManifestListManager +from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.read.table_scan import TableScan from pypaimon.snapshot.snapshot import Snapshot from pypaimon.snapshot.snapshot_commit import (PartitionStatistics, SnapshotCommit) from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.row.binary_row import BinaryRow +from pypaimon.table.row.offset_row import OffsetRow from pypaimon.write.commit_message import CommitMessage @@ -60,15 +64,83 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): if not commit_messages: return + commit_entries = [] + for msg in commit_messages: + partition = BinaryRow(list(msg.partition), self.table.table_schema.get_partition_key_fields()) + for file in msg.new_files: + commit_entries.append(ManifestEntry( + kind=0, + partition=partition, + bucket=msg.bucket, + total_buckets=self.table.total_buckets, + file=file + )) + + self._try_commit(commit_kind="APPEND", + commit_entries=commit_entries, + commit_identifier=commit_identifier) + + def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], commit_identifier: int): + """Commit the given commit messages in overwrite mode.""" + if not commit_messages: + return + + partition_filter = None + # sanity check, all changes must be done within the given partition, meanwhile build a partition filter + if len(overwrite_partition) > 0: + predicate_builder = PredicateBuilder(self.table.table_schema.get_partition_key_fields()) + sub_predicates = [] + for key, value in overwrite_partition.items(): + sub_predicates.append(predicate_builder.equal(key, value)) + partition_filter = predicate_builder.and_predicates(sub_predicates) + + for msg in commit_messages: + row = OffsetRow(msg.partition, 0, len(msg.partition)) + if not partition_filter.test(row): + raise RuntimeError(f"Trying to overwrite partition {overwrite_partition}, but the changes " + f"in {msg.partition} does not belong to this partition") + + commit_entries = [] + current_entries = TableScan(self.table, partition_filter, None, []).plan().files() + for entry in current_entries: + entry.kind = 1 + commit_entries.append(entry) + for msg in commit_messages: + partition = BinaryRow(list(msg.partition), self.table.table_schema.get_partition_key_fields()) + for file in msg.new_files: + commit_entries.append(ManifestEntry( + kind=0, + partition=partition, + bucket=msg.bucket, + total_buckets=self.table.total_buckets, + file=file + )) + + self._try_commit(commit_kind="OVERWRITE", + commit_entries=commit_entries, + commit_identifier=commit_identifier) + + def _try_commit(self, commit_kind, commit_entries, commit_identifier): unique_id = uuid.uuid4() base_manifest_list = f"manifest-list-{unique_id}-0" delta_manifest_list = f"manifest-list-{unique_id}-1" # process new_manifest new_manifest_file = f"manifest-{str(uuid.uuid4())}-0" - self.manifest_file_manager.write(new_manifest_file, commit_messages) + added_file_count = 0 + deleted_file_count = 0 + delta_record_count = 0 + for entry in commit_entries: + if entry.kind == 0: + added_file_count += 1 + delta_record_count += entry.file.row_count + else: + deleted_file_count += 1 + delta_record_count -= entry.file.row_count + self.manifest_file_manager.write(new_manifest_file, commit_entries) + # TODO: implement noConflictsOrFail logic - partition_columns = list(zip(*(msg.partition for msg in commit_messages))) + partition_columns = list(zip(*(entry.partition.values for entry in commit_entries))) partition_min_stats = [min(col) for col in partition_columns] partition_max_stats = [max(col) for col in partition_columns] partition_null_counts = [sum(value == 0 for value in col) for col in partition_columns] @@ -78,8 +150,8 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): new_manifest_list = ManifestFileMeta( file_name=new_manifest_file, file_size=self.table.file_io.get_file_size(self.manifest_file_manager.manifest_path / new_manifest_file), - num_added_files=sum(len(msg.new_files) for msg in commit_messages), - num_deleted_files=0, + num_added_files=added_file_count, + num_deleted_files=deleted_file_count, partition_stats=SimpleStats( min_values=BinaryRow( values=partition_min_stats, @@ -109,8 +181,7 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): # process snapshot new_snapshot_id = self._generate_snapshot_id() - record_count_add = self._generate_record_count_add(commit_messages) - total_record_count += record_count_add + total_record_count += delta_record_count snapshot_data = Snapshot( version=1, id=new_snapshot_id, @@ -118,15 +189,15 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): base_manifest_list=base_manifest_list, delta_manifest_list=delta_manifest_list, total_record_count=total_record_count, - delta_record_count=record_count_add, + delta_record_count=delta_record_count, commit_user=self.commit_user, commit_identifier=commit_identifier, - commit_kind="APPEND", + commit_kind=commit_kind, time_millis=int(time.time() * 1000), ) # Generate partition statistics for the commit - statistics = self._generate_partition_statistics(commit_messages) + statistics = self._generate_partition_statistics(commit_entries) # Use SnapshotCommit for atomic commit with self.snapshot_commit: @@ -134,10 +205,6 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): if not success: raise RuntimeError(f"Failed to commit snapshot {new_snapshot_id}") - def overwrite(self, partition, commit_messages: List[CommitMessage], commit_identifier: int): - """Commit the given commit messages in overwrite mode.""" - raise RuntimeError("overwrite unsupported yet") - def abort(self, commit_messages: List[CommitMessage]): for message in commit_messages: for file in message.new_files: @@ -161,25 +228,25 @@ def _generate_snapshot_id(self) -> int: else: return 1 - def _generate_partition_statistics(self, commit_messages: List[CommitMessage]) -> List[PartitionStatistics]: + def _generate_partition_statistics(self, commit_entries: List[ManifestEntry]) -> List[PartitionStatistics]: """ - Generate partition statistics from commit messages. + Generate partition statistics from commit entries. This method follows the Java implementation pattern from org.apache.paimon.manifest.PartitionEntry.fromManifestEntry() and PartitionEntry.merge() methods. Args: - commit_messages: List of commit messages to analyze + commit_entries: List of commit entries to analyze Returns: List of PartitionStatistics for each unique partition """ partition_stats = {} - for message in commit_messages: + for entry in commit_entries: # Convert partition tuple to dictionary for PartitionStatistics - partition_value = message.partition # Call the method to get partition value + partition_value = tuple(entry.partition.values) # Call the method to get partition value if partition_value: # Assuming partition is a tuple and we need to convert it to a dict # This may need adjustment based on actual partition format @@ -211,30 +278,29 @@ def _generate_partition_statistics(self, commit_messages: List[CommitMessage]) - 'last_file_creation_time': 0 } - # Process each file in the commit message # Following Java implementation: PartitionEntry.fromDataFile() - for file_meta in message.new_files: - # Extract actual file metadata (following Java DataFileMeta pattern) - record_count = file_meta.row_count - file_size_in_bytes = file_meta.file_size - file_count = 1 - - # Convert creation_time to milliseconds (Java uses epoch millis) - if file_meta.creation_time: - file_creation_time = int(file_meta.creation_time.timestamp() * 1000) - else: - file_creation_time = int(time.time() * 1000) + file_meta = entry.file + # Extract actual file metadata (following Java DataFileMeta pattern) + record_count = file_meta.row_count + file_size_in_bytes = file_meta.file_size + file_count = 1 + + # Convert creation_time to milliseconds (Java uses epoch millis) + if file_meta.creation_time: + file_creation_time = int(file_meta.creation_time.timestamp() * 1000) + else: + file_creation_time = int(time.time() * 1000) - # Accumulate statistics (following Java PartitionEntry.merge() logic) - partition_stats[partition_key]['record_count'] += record_count - partition_stats[partition_key]['file_size_in_bytes'] += file_size_in_bytes - partition_stats[partition_key]['file_count'] += file_count + # Accumulate statistics (following Java PartitionEntry.merge() logic) + partition_stats[partition_key]['record_count'] += record_count + partition_stats[partition_key]['file_size_in_bytes'] += file_size_in_bytes + partition_stats[partition_key]['file_count'] += file_count - # Keep the latest creation time - partition_stats[partition_key]['last_file_creation_time'] = max( - partition_stats[partition_key]['last_file_creation_time'], - file_creation_time - ) + # Keep the latest creation time + partition_stats[partition_key]['last_file_creation_time'] = max( + partition_stats[partition_key]['last_file_creation_time'], + file_creation_time + ) # Convert to PartitionStatistics objects # Following Java PartitionEntry.toPartitionStatistics() pattern @@ -248,25 +314,3 @@ def _generate_partition_statistics(self, commit_messages: List[CommitMessage]) - ) for stats in partition_stats.values() ] - - def _generate_record_count_add(self, commit_messages: List[CommitMessage]) -> int: - """ - Generate record count add from commit messages. - - This method follows the Java implementation pattern from - org.apache.paimon.manifest.ManifestEntry.recordCountAdd(). - - Args: - commit_messages: List of commit messages to analyze - - Returns: - Count of add record - """ - record_count = 0 - - for message in commit_messages: - new_files = message.new_files - for file_meta in new_files: - record_count += file_meta.row_count - - return record_count From 8b0b45ce255eab8ead09c400ec71f85e3b257791 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:34:09 +0800 Subject: [PATCH 2/2] [Python] Support overwrite mode for writer --- .../pypaimon/tests/py36/ao_read_write_test.py | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py index 575bf535e3f9..7ae847c9f392 100644 --- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py @@ -32,6 +32,87 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest): + def test_overwrite(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()) + ]) + schema = Schema.from_pyarrow_schema(simple_pa_schema, partition_keys=['f0'], + options={'dynamic-partition-overwrite': 'false'}) + self.rest_catalog.create_table('default.test_overwrite', schema, False) + table = self.rest_catalog.get_table('default.test_overwrite') + read_builder = table.new_read_builder() + + # test normal write + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + df0 = pd.DataFrame({ + 'f0': [1, 2], + 'f1': ['apple', 'banana'], + }) + + table_write.write_pandas(df0) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df0 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0') + df0['f0'] = df0['f0'].astype('int32') + pd.testing.assert_frame_equal( + actual_df0.reset_index(drop=True), df0.reset_index(drop=True)) + + # test partially overwrite + write_builder = table.new_batch_write_builder().overwrite({'f0': 1}) + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + df1 = pd.DataFrame({ + 'f0': [1], + 'f1': ['watermelon'], + }) + + table_write.write_pandas(df1) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df1 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0') + expected_df1 = pd.DataFrame({ + 'f0': [1, 2], + 'f1': ['watermelon', 'banana'] + }) + expected_df1['f0'] = expected_df1['f0'].astype('int32') + pd.testing.assert_frame_equal( + actual_df1.reset_index(drop=True), expected_df1.reset_index(drop=True)) + + # test fully overwrite + write_builder = table.new_batch_write_builder().overwrite() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + df2 = pd.DataFrame({ + 'f0': [3], + 'f1': ['Neo'], + }) + + table_write.write_pandas(df2) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df2 = table_read.to_pandas(table_scan.plan().splits()) + df2['f0'] = df2['f0'].astype('int32') + pd.testing.assert_frame_equal( + actual_df2.reset_index(drop=True), df2.reset_index(drop=True)) + def testParquetAppendOnlyReader(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_parquet', schema, False)