Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions paimon-python/pypaimon/common/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,32 +283,30 @@ def read_overwritten_file_utf8(self, path: Path) -> Optional[str]:

return None

def write_parquet(self, path: Path, data: pyarrow.RecordBatch, compression: str = 'snappy', **kwargs):
def write_parquet(self, path: Path, data: pyarrow.Table, compression: str = 'snappy', **kwargs):
try:
import pyarrow.parquet as pq
table = pyarrow.Table.from_batches([data])

with self.new_output_stream(path) as output_stream:
pq.write_table(table, output_stream, compression=compression, **kwargs)
pq.write_table(data, output_stream, compression=compression, **kwargs)

except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Parquet file {path}: {e}") from e

def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression: str = 'zstd', **kwargs):
def write_orc(self, path: Path, data: pyarrow.Table, compression: str = 'zstd', **kwargs):
try:
"""Write ORC file using PyArrow ORC writer."""
import sys
import pyarrow.orc as orc
table = pyarrow.Table.from_batches([data])

with self.new_output_stream(path) as output_stream:
# Check Python version - if 3.6, don't use compression parameter
if sys.version_info[:2] == (3, 6):
orc.write_table(table, output_stream, **kwargs)
orc.write_table(data, output_stream, **kwargs)
else:
orc.write_table(
table,
data,
output_stream,
compression=compression,
**kwargs
Expand All @@ -318,7 +316,7 @@ def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression: str = 'z
self.delete_quietly(path)
raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e

def write_avro(self, path: Path, data: pyarrow.RecordBatch, avro_schema: Optional[Dict[str, Any]] = None, **kwargs):
def write_avro(self, path: Path, data: pyarrow.Table, avro_schema: Optional[Dict[str, Any]] = None, **kwargs):
import fastavro
if avro_schema is None:
from pypaimon.schema.data_types import PyarrowFieldParser
Expand Down
36 changes: 36 additions & 0 deletions paimon-python/pypaimon/tests/py36/ao_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,42 @@ def testAvroAppendOnlyReader(self):
actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
self.assertEqual(actual, self.expected)

def test_append_only_multi_write_once_commit(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_multi_once_commit', schema, False)
table = self.rest_catalog.get_table('default.test_append_only_multi_once_commit')
write_builder = table.new_batch_write_builder()

table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data1 = {
'user_id': [1, 2, 3, 4],
'item_id': [1001, 1002, 1003, 1004],
'behavior': ['a', 'b', 'c', None],
'dt': ['p1', 'p1', 'p2', 'p1'],
'long-dt': ['2024-10-10', '2024-10-10', '2024-10-10', '2024-01-01'],
}
pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema)
data2 = {
'user_id': [5, 6, 7, 8],
'item_id': [1005, 1006, 1007, 1008],
'behavior': ['e', 'f', 'g', 'h'],
'dt': ['p2', 'p1', 'p2', 'p2'],
'long-dt': ['2024-10-10', '2025-01-23', 'abcdefghijklmnopk', '2025-08-08'],
}
pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema)

table_write.write_arrow(pa_table1)
table_write.write_arrow(pa_table2)

table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

read_builder = table.new_read_builder()
actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
self.assertEqual(actual, self.expected)

def testAppendOnlyReaderWithFilter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_filter', schema, False)
Expand Down
34 changes: 34 additions & 0 deletions paimon-python/pypaimon/tests/reader_append_only_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,40 @@ def testAvroAppendOnlyReader(self):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)

def test_append_only_multi_write_once_commit(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
self.catalog.create_table('default.test_append_only_multi_once_commit', schema, False)
table = self.catalog.get_table('default.test_append_only_multi_once_commit')
write_builder = table.new_batch_write_builder()

table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data1 = {
'user_id': [1, 2, 3, 4],
'item_id': [1001, 1002, 1003, 1004],
'behavior': ['a', 'b', 'c', None],
'dt': ['p1', 'p1', 'p2', 'p1'],
}
pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema)
data2 = {
'user_id': [5, 6, 7, 8],
'item_id': [1005, 1006, 1007, 1008],
'behavior': ['e', 'f', 'g', 'h'],
'dt': ['p2', 'p1', 'p2', 'p2'],
}
pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema)

table_write.write_arrow(pa_table1)
table_write.write_arrow(pa_table2)

table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

read_builder = table.new_read_builder()
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)

def testAppendOnlyReaderWithFilter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
self.catalog.create_table('default.test_append_only_filter', schema, False)
Expand Down
44 changes: 44 additions & 0 deletions paimon-python/pypaimon/tests/reader_primary_key_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,50 @@ def testPkAvroReader(self):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)

def test_pk_multi_write_once_commit(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
primary_keys=['user_id', 'dt'],
options={'bucket': '2'})
self.catalog.create_table('default.test_pk_multi', schema, False)
table = self.catalog.get_table('default.test_pk_multi')
write_builder = table.new_batch_write_builder()

table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data1 = {
'user_id': [1, 2, 3, 4],
'item_id': [1001, 1002, 1003, 1004],
'behavior': ['a', 'b', 'c', None],
'dt': ['p1', 'p1', 'p2', 'p1'],
}
pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema)
data2 = {
'user_id': [5, 2, 7, 8],
'item_id': [1005, 1002, 1007, 1008],
'behavior': ['e', 'b-new', 'g', 'h'],
'dt': ['p2', 'p1', 'p1', 'p2']
}
pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema)

table_write.write_arrow(pa_table1)
table_write.write_arrow(pa_table2)

table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

read_builder = table.new_read_builder()
actual = self._read_test_table(read_builder).sort_by('user_id')
# TODO support pk merge feature when multiple write
expected = pa.Table.from_pydict({
'user_id': [1, 2, 2, 3, 4, 5, 7, 8],
'item_id': [1001, 1002, 1002, 1003, 1004, 1005, 1007, 1008],
'behavior': ['a', 'b', 'b-new', 'c', None, 'e', 'g', 'h'],
'dt': ['p1', 'p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'],
}, schema=self.pa_schema)
self.assertEqual(actual, expected)

def testPkReaderWithFilter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
class AppendOnlyDataWriter(DataWriter):
"""Data writer for append-only tables."""

def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
return data
def _process_data(self, data: pa.RecordBatch) -> pa.Table:
return pa.Table.from_batches([data])

def _merge_data(self, existing_data: pa.RecordBatch, new_data: pa.RecordBatch) -> pa.RecordBatch:
def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table:
return pa.concat_tables([existing_data, new_data])
8 changes: 4 additions & 4 deletions paimon-python/pypaimon/write/writer/data_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int):
self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
self.sequence_generator = SequenceGenerator(max_seq_number)

self.pending_data: Optional[pa.RecordBatch] = None
self.pending_data: Optional[pa.Table] = None
self.committed_files: List[DataFileMeta] = []

def write(self, data: pa.RecordBatch):
Expand Down Expand Up @@ -100,7 +100,7 @@ def _check_and_roll_if_needed(self):
self.pending_data = remaining_data
self._check_and_roll_if_needed()

def _write_data_to_file(self, data: pa.RecordBatch):
def _write_data_to_file(self, data: pa.Table):
if data.num_rows == 0:
return
file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
Expand All @@ -115,8 +115,8 @@ def _write_data_to_file(self, data: pa.RecordBatch):
raise ValueError(f"Unsupported file format: {self.file_format}")

# min key & max key
table = pa.Table.from_batches([data])
selected_table = table.select(self.trimmed_primary_key)

selected_table = data.select(self.trimmed_primary_key)
key_columns_batch = selected_table.to_batches()[0]
min_key_row_batch = key_columns_batch.slice(0, 1)
max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows - 1, 1)
Expand Down
6 changes: 3 additions & 3 deletions paimon-python/pypaimon/write/writer/key_value_data_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
class KeyValueDataWriter(DataWriter):
"""Data writer for primary key tables with system fields and sorting."""

def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
def _process_data(self, data: pa.RecordBatch) -> pa.Table:
enhanced_data = self._add_system_fields(data)
return self._sort_by_primary_key(enhanced_data)
return pa.Table.from_batches([self._sort_by_primary_key(enhanced_data)])

def _merge_data(self, existing_data: pa.RecordBatch, new_data: pa.RecordBatch) -> pa.RecordBatch:
def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table:
combined = pa.concat_tables([existing_data, new_data])
return self._sort_by_primary_key(combined)

Expand Down