diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index c98322543e5b..3ec930b24b4c 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -283,32 +283,30 @@ def read_overwritten_file_utf8(self, path: Path) -> Optional[str]: return None - def write_parquet(self, path: Path, data: pyarrow.RecordBatch, compression: str = 'snappy', **kwargs): + def write_parquet(self, path: Path, data: pyarrow.Table, compression: str = 'snappy', **kwargs): try: import pyarrow.parquet as pq - table = pyarrow.Table.from_batches([data]) with self.new_output_stream(path) as output_stream: - pq.write_table(table, output_stream, compression=compression, **kwargs) + pq.write_table(data, output_stream, compression=compression, **kwargs) except Exception as e: self.delete_quietly(path) raise RuntimeError(f"Failed to write Parquet file {path}: {e}") from e - def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression: str = 'zstd', **kwargs): + def write_orc(self, path: Path, data: pyarrow.Table, compression: str = 'zstd', **kwargs): try: """Write ORC file using PyArrow ORC writer.""" import sys import pyarrow.orc as orc - table = pyarrow.Table.from_batches([data]) with self.new_output_stream(path) as output_stream: # Check Python version - if 3.6, don't use compression parameter if sys.version_info[:2] == (3, 6): - orc.write_table(table, output_stream, **kwargs) + orc.write_table(data, output_stream, **kwargs) else: orc.write_table( - table, + data, output_stream, compression=compression, **kwargs @@ -318,7 +316,7 @@ def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression: str = 'z self.delete_quietly(path) raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e - def write_avro(self, path: Path, data: pyarrow.RecordBatch, avro_schema: Optional[Dict[str, Any]] = None, **kwargs): + def write_avro(self, path: Path, data: pyarrow.Table, avro_schema: Optional[Dict[str, Any]] = None, **kwargs): import fastavro if avro_schema is None: from pypaimon.schema.data_types import PyarrowFieldParser diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py index 9873d6bf3f8e..ca75d352a13c 100644 --- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py @@ -271,6 +271,42 @@ def testAvroAppendOnlyReader(self): actual = table_sort_by(self._read_test_table(read_builder), 'user_id') self.assertEqual(actual, self.expected) + def test_append_only_multi_write_once_commit(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.rest_catalog.create_table('default.test_append_only_multi_once_commit', schema, False) + table = self.rest_catalog.get_table('default.test_append_only_multi_once_commit') + write_builder = table.new_batch_write_builder() + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data1 = { + 'user_id': [1, 2, 3, 4], + 'item_id': [1001, 1002, 1003, 1004], + 'behavior': ['a', 'b', 'c', None], + 'dt': ['p1', 'p1', 'p2', 'p1'], + 'long-dt': ['2024-10-10', '2024-10-10', '2024-10-10', '2024-01-01'], + } + pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema) + data2 = { + 'user_id': [5, 6, 7, 8], + 'item_id': [1005, 1006, 1007, 1008], + 'behavior': ['e', 'f', 'g', 'h'], + 'dt': ['p2', 'p1', 'p2', 'p2'], + 'long-dt': ['2024-10-10', '2025-01-23', 'abcdefghijklmnopk', '2025-08-08'], + } + pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema) + + table_write.write_arrow(pa_table1) + table_write.write_arrow(pa_table2) + + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + actual = table_sort_by(self._read_test_table(read_builder), 'user_id') + self.assertEqual(actual, self.expected) + def testAppendOnlyReaderWithFilter(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_filter', schema, False) diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index 17acb9a183c9..ca22c79fbef8 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -79,6 +79,40 @@ def testAvroAppendOnlyReader(self): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) + def test_append_only_multi_write_once_commit(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.catalog.create_table('default.test_append_only_multi_once_commit', schema, False) + table = self.catalog.get_table('default.test_append_only_multi_once_commit') + write_builder = table.new_batch_write_builder() + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data1 = { + 'user_id': [1, 2, 3, 4], + 'item_id': [1001, 1002, 1003, 1004], + 'behavior': ['a', 'b', 'c', None], + 'dt': ['p1', 'p1', 'p2', 'p1'], + } + pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema) + data2 = { + 'user_id': [5, 6, 7, 8], + 'item_id': [1005, 1006, 1007, 1008], + 'behavior': ['e', 'f', 'g', 'h'], + 'dt': ['p2', 'p1', 'p2', 'p2'], + } + pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema) + + table_write.write_arrow(pa_table1) + table_write.write_arrow(pa_table2) + + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + actual = self._read_test_table(read_builder).sort_by('user_id') + self.assertEqual(actual, self.expected) + def testAppendOnlyReaderWithFilter(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.catalog.create_table('default.test_append_only_filter', schema, False) diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py b/paimon-python/pypaimon/tests/reader_primary_key_test.py index 8b9b8533500e..73cca1a788b9 100644 --- a/paimon-python/pypaimon/tests/reader_primary_key_test.py +++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py @@ -104,6 +104,50 @@ def testPkAvroReader(self): actual = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, self.expected) + def test_pk_multi_write_once_commit(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, + partition_keys=['dt'], + primary_keys=['user_id', 'dt'], + options={'bucket': '2'}) + self.catalog.create_table('default.test_pk_multi', schema, False) + table = self.catalog.get_table('default.test_pk_multi') + write_builder = table.new_batch_write_builder() + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data1 = { + 'user_id': [1, 2, 3, 4], + 'item_id': [1001, 1002, 1003, 1004], + 'behavior': ['a', 'b', 'c', None], + 'dt': ['p1', 'p1', 'p2', 'p1'], + } + pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema) + data2 = { + 'user_id': [5, 2, 7, 8], + 'item_id': [1005, 1002, 1007, 1008], + 'behavior': ['e', 'b-new', 'g', 'h'], + 'dt': ['p2', 'p1', 'p1', 'p2'] + } + pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema) + + table_write.write_arrow(pa_table1) + table_write.write_arrow(pa_table2) + + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + actual = self._read_test_table(read_builder).sort_by('user_id') + # TODO support pk merge feature when multiple write + expected = pa.Table.from_pydict({ + 'user_id': [1, 2, 2, 3, 4, 5, 7, 8], + 'item_id': [1001, 1002, 1002, 1003, 1004, 1005, 1007, 1008], + 'behavior': ['a', 'b', 'b-new', 'c', None, 'e', 'g', 'h'], + 'dt': ['p1', 'p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'], + }, schema=self.pa_schema) + self.assertEqual(actual, expected) + def testPkReaderWithFilter(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], diff --git a/paimon-python/pypaimon/write/writer/append_only_data_writer.py b/paimon-python/pypaimon/write/writer/append_only_data_writer.py index c9d4c8f864e5..3bd128d7b42a 100644 --- a/paimon-python/pypaimon/write/writer/append_only_data_writer.py +++ b/paimon-python/pypaimon/write/writer/append_only_data_writer.py @@ -24,8 +24,8 @@ class AppendOnlyDataWriter(DataWriter): """Data writer for append-only tables.""" - def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: - return data + def _process_data(self, data: pa.RecordBatch) -> pa.Table: + return pa.Table.from_batches([data]) - def _merge_data(self, existing_data: pa.RecordBatch, new_data: pa.RecordBatch) -> pa.RecordBatch: + def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table: return pa.concat_tables([existing_data, new_data]) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 787dc706248b..e2f778b58667 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -53,7 +53,7 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int): self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd") self.sequence_generator = SequenceGenerator(max_seq_number) - self.pending_data: Optional[pa.RecordBatch] = None + self.pending_data: Optional[pa.Table] = None self.committed_files: List[DataFileMeta] = [] def write(self, data: pa.RecordBatch): @@ -100,7 +100,7 @@ def _check_and_roll_if_needed(self): self.pending_data = remaining_data self._check_and_roll_if_needed() - def _write_data_to_file(self, data: pa.RecordBatch): + def _write_data_to_file(self, data: pa.Table): if data.num_rows == 0: return file_name = f"data-{uuid.uuid4()}-0.{self.file_format}" @@ -115,8 +115,8 @@ def _write_data_to_file(self, data: pa.RecordBatch): raise ValueError(f"Unsupported file format: {self.file_format}") # min key & max key - table = pa.Table.from_batches([data]) - selected_table = table.select(self.trimmed_primary_key) + + selected_table = data.select(self.trimmed_primary_key) key_columns_batch = selected_table.to_batches()[0] min_key_row_batch = key_columns_batch.slice(0, 1) max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows - 1, 1) diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py b/paimon-python/pypaimon/write/writer/key_value_data_writer.py index 99b11a978819..fb929710e8b2 100644 --- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py +++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py @@ -25,11 +25,11 @@ class KeyValueDataWriter(DataWriter): """Data writer for primary key tables with system fields and sorting.""" - def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: + def _process_data(self, data: pa.RecordBatch) -> pa.Table: enhanced_data = self._add_system_fields(data) - return self._sort_by_primary_key(enhanced_data) + return pa.Table.from_batches([self._sort_by_primary_key(enhanced_data)]) - def _merge_data(self, existing_data: pa.RecordBatch, new_data: pa.RecordBatch) -> pa.RecordBatch: + def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table: combined = pa.concat_tables([existing_data, new_data]) return self._sort_by_primary_key(combined)