From 7740b8eb1b08b2f3a5da86a628c4e3cbb71998b6 Mon Sep 17 00:00:00 2001 From: umi Date: Tue, 4 Nov 2025 16:39:53 +0800 Subject: [PATCH 1/4] draft --- .../pypaimon/tests/py36/ao_simple_test.py | 55 +++++++ .../pypaimon/tests/write/table_write_test.py | 153 ++++++++++++++++++ .../pypaimon/write/batch_table_write.py | 4 - .../pypaimon/write/file_store_write.py | 2 + 4 files changed, 210 insertions(+), 4 deletions(-) create mode 100644 paimon-python/pypaimon/tests/write/table_write_test.py diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py b/paimon-python/pypaimon/tests/py36/ao_simple_test.py index efb3189e0649..fde96d473c27 100644 --- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py @@ -421,3 +421,58 @@ def test_initialize_oss_fs_pyarrow_lt_7(self): session_token="TOKEN", region="cn-hangzhou", endpoint_override="oss-bucket." + props[OssOptions.OSS_ENDPOINT]) + + def test_multi_prepare_commit_ao(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.rest_catalog.create_table('default.test_append_only_parquet', schema, False) + table = self.rest_catalog.get_table('default.test_append_only_parquet') + write_builder = table.new_batch_write_builder() + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + # write 1 + data1 = { + 'user_id': [1, 2, 3, 4], + 'item_id': [1001, 1002, 1003, 1004], + 'behavior': ['a', 'b', 'c', None], + 'dt': ['p1', 'p1', 'p2', 'p1'], + } + pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_write.prepare_commit() + # write 2 + data2 = { + 'user_id': [5, 6, 7, 8], + 'item_id': [1005, 1006, 1007, 1008], + 'behavior': ['e', 'f', 'g', 'h'], + 'dt': ['p2', 'p1', 'p2', 'p2'], + } + pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_write.prepare_commit() + # write 3 + data3 = { + 'user_id': [9, 10], + 'item_id': [1009, 1010], + 'behavior': ['i', 'j'], + 'dt': ['p2', 'p1'], + } + pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema) + table_write.write_arrow(pa_table) + cm = table_write.prepare_commit() + # commit + table_commit.commit(cm) + table_write.close() + table_commit.close() + self.assertEqual(3, table_write.file_store_write.commit_identifier) + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + actual = table_sort_by(table_read.to_arrow(splits), 'user_id') + expected = pa.Table.from_pydict({ + 'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + 'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010], + 'behavior': ['a', 'b', 'c', None, 'e', 'f', 'g', 'h', 'i', 'j'], + 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p2', 'p2', 'p1'] + }, schema=self.pa_schema) + self.assertEqual(expected, actual) diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py b/paimon-python/pypaimon/tests/write/table_write_test.py new file mode 100644 index 000000000000..53e8a1bc26c2 --- /dev/null +++ b/paimon-python/pypaimon/tests/write/table_write_test.py @@ -0,0 +1,153 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import os +import shutil + +import tempfile +import unittest + +from pypaimon import CatalogFactory, Schema +import pyarrow as pa + + +class TableWriteTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({ + 'warehouse': cls.warehouse + }) + cls.catalog.create_database('default', True) + + cls.pa_schema = pa.schema([ + ('user_id', pa.int32()), + ('item_id', pa.int64()), + ('behavior', pa.string()), + ('dt', pa.string()) + ]) + cls.expected = pa.Table.from_pydict({ + 'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + 'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010], + 'behavior': ['a', 'b', 'c', None, 'e', 'f', 'g', 'h', 'i', 'j'], + 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p2', 'p2', 'p1'] + }, schema=cls.pa_schema) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def test_multi_prepare_commit_ao(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.catalog.create_table('default.test_append_only_parquet', schema, False) + table = self.catalog.get_table('default.test_append_only_parquet') + write_builder = table.new_batch_write_builder() + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + # write 1 + data1 = { + 'user_id': [1, 2, 3, 4], + 'item_id': [1001, 1002, 1003, 1004], + 'behavior': ['a', 'b', 'c', None], + 'dt': ['p1', 'p1', 'p2', 'p1'], + } + pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_write.prepare_commit() + # write 2 + data2 = { + 'user_id': [5, 6, 7, 8], + 'item_id': [1005, 1006, 1007, 1008], + 'behavior': ['e', 'f', 'g', 'h'], + 'dt': ['p2', 'p1', 'p2', 'p2'], + } + pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_write.prepare_commit() + # write 3 + data3 = { + 'user_id': [9, 10], + 'item_id': [1009, 1010], + 'behavior': ['i', 'j'], + 'dt': ['p2', 'p1'], + } + pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema) + table_write.write_arrow(pa_table) + cm = table_write.prepare_commit() + # commit + table_commit.commit(cm) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + actual = table_read.to_arrow(splits).sort_by('user_id') + self.assertEqual(self.expected, actual) + + def test_multi_prepare_commit_pk(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'], + options={'bucket': '2'}) + self.catalog.create_table('default.test_primary_key_parquet', schema, False) + table = self.catalog.get_table('default.test_primary_key_parquet') + write_builder = table.new_batch_write_builder() + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + # write 1 + data1 = { + 'user_id': [1, 2, 3, 4], + 'item_id': [1001, 1002, 1003, 1004], + 'behavior': ['a', 'b', 'c', None], + 'dt': ['p1', 'p1', 'p2', 'p1'], + } + pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_write.prepare_commit() + # write 2 + data2 = { + 'user_id': [5, 6, 7, 8], + 'item_id': [1005, 1006, 1007, 1008], + 'behavior': ['e', 'f', 'g', 'h'], + 'dt': ['p2', 'p1', 'p2', 'p2'], + } + pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_write.prepare_commit() + # write 3 + data3 = { + 'user_id': [9, 10], + 'item_id': [1009, 1010], + 'behavior': ['i', 'j'], + 'dt': ['p2', 'p1'], + } + pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema) + table_write.write_arrow(pa_table) + cm = table_write.prepare_commit() + # commit + table_commit.commit(cm) + table_write.close() + table_commit.close() + self.assertEqual(3, table_write.file_store_write.commit_identifier) + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + actual = table_read.to_arrow(splits).sort_by('user_id') + self.assertEqual(self.expected, actual) diff --git a/paimon-python/pypaimon/write/batch_table_write.py b/paimon-python/pypaimon/write/batch_table_write.py index f8d0660bfa84..596fcfb82a7c 100644 --- a/paimon-python/pypaimon/write/batch_table_write.py +++ b/paimon-python/pypaimon/write/batch_table_write.py @@ -34,7 +34,6 @@ def __init__(self, table): self.table_pyarrow_schema = PyarrowFieldParser.from_paimon_schema(self.table.table_schema.fields) self.file_store_write = FileStoreWrite(self.table) self.row_key_extractor = self.table.create_row_key_extractor() - self.batch_committed = False def write_arrow(self, table: pa.Table): batches_iterator = table.to_batches() @@ -60,9 +59,6 @@ def write_pandas(self, dataframe): return self.write_arrow_batch(record_batch) def prepare_commit(self) -> List[CommitMessage]: - if self.batch_committed: - raise RuntimeError("BatchTableWrite only supports one-time committing.") - self.batch_committed = True return self.file_store_write.prepare_commit() def with_write_type(self, write_cols: List[str]): diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index 9ebabf110359..f95e5e728356 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -37,6 +37,7 @@ def __init__(self, table): self.data_writers: Dict[Tuple, DataWriter] = {} self.max_seq_numbers: dict = {} self.write_cols = None + self.commit_identifier = 0 def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch): key = (partition, bucket) @@ -84,6 +85,7 @@ def _has_blob_columns(self) -> bool: return False def prepare_commit(self) -> List[CommitMessage]: + self.commit_identifier += 1 commit_messages = [] for (partition, bucket), writer in self.data_writers.items(): committed_files = writer.prepare_commit() From 8587e5196322173b78c36d2c099a3d0085cbd3c3 Mon Sep 17 00:00:00 2001 From: umi Date: Tue, 4 Nov 2025 17:15:05 +0800 Subject: [PATCH 2/4] fix --- paimon-python/pypaimon/snapshot/snapshot.py | 2 ++ .../pypaimon/tests/py36/ao_simple_test.py | 11 +++++----- .../pypaimon/tests/write/table_write_test.py | 20 ++++++++++--------- .../pypaimon/write/batch_table_commit.py | 6 ++---- .../pypaimon/write/batch_table_write.py | 5 +++-- .../pypaimon/write/file_store_write.py | 4 ++-- 6 files changed, 26 insertions(+), 22 deletions(-) diff --git a/paimon-python/pypaimon/snapshot/snapshot.py b/paimon-python/pypaimon/snapshot/snapshot.py index 96b287ab5575..f164f5eb61ba 100644 --- a/paimon-python/pypaimon/snapshot/snapshot.py +++ b/paimon-python/pypaimon/snapshot/snapshot.py @@ -21,6 +21,8 @@ from pypaimon.common.json_util import json_field +COMMIT_IDENTIFIER = 0x7fffffffffffffff + @dataclass class Snapshot: diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py b/paimon-python/pypaimon/tests/py36/ao_simple_test.py index fde96d473c27..d361d06897c3 100644 --- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py @@ -439,7 +439,7 @@ def test_multi_prepare_commit_ao(self): } pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema) table_write.write_arrow(pa_table) - table_write.prepare_commit() + table_write.prepare_commit(0) # write 2 data2 = { 'user_id': [5, 6, 7, 8], @@ -449,7 +449,7 @@ def test_multi_prepare_commit_ao(self): } pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema) table_write.write_arrow(pa_table) - table_write.prepare_commit() + table_write.prepare_commit(1) # write 3 data3 = { 'user_id': [9, 10], @@ -459,12 +459,13 @@ def test_multi_prepare_commit_ao(self): } pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema) table_write.write_arrow(pa_table) - cm = table_write.prepare_commit() + cm = table_write.prepare_commit(2) # commit - table_commit.commit(cm) + table_commit.commit(cm, 2) table_write.close() table_commit.close() - self.assertEqual(3, table_write.file_store_write.commit_identifier) + self.assertEqual(2, table_write.file_store_write.commit_identifier) + read_builder = table.new_read_builder() table_read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py b/paimon-python/pypaimon/tests/write/table_write_test.py index 53e8a1bc26c2..e582d920a85e 100644 --- a/paimon-python/pypaimon/tests/write/table_write_test.py +++ b/paimon-python/pypaimon/tests/write/table_write_test.py @@ -70,7 +70,7 @@ def test_multi_prepare_commit_ao(self): } pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema) table_write.write_arrow(pa_table) - table_write.prepare_commit() + table_write.prepare_commit(0) # write 2 data2 = { 'user_id': [5, 6, 7, 8], @@ -80,7 +80,7 @@ def test_multi_prepare_commit_ao(self): } pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema) table_write.write_arrow(pa_table) - table_write.prepare_commit() + table_write.prepare_commit(1) # write 3 data3 = { 'user_id': [9, 10], @@ -90,11 +90,12 @@ def test_multi_prepare_commit_ao(self): } pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema) table_write.write_arrow(pa_table) - cm = table_write.prepare_commit() + cm = table_write.prepare_commit(2) # commit - table_commit.commit(cm) + table_commit.commit(cm, 2) table_write.close() table_commit.close() + self.assertEqual(2, table_write.file_store_write.commit_identifier) read_builder = table.new_read_builder() table_read = read_builder.new_read() @@ -120,7 +121,7 @@ def test_multi_prepare_commit_pk(self): } pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema) table_write.write_arrow(pa_table) - table_write.prepare_commit() + table_write.prepare_commit(0) # write 2 data2 = { 'user_id': [5, 6, 7, 8], @@ -130,7 +131,7 @@ def test_multi_prepare_commit_pk(self): } pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema) table_write.write_arrow(pa_table) - table_write.prepare_commit() + table_write.prepare_commit(1) # write 3 data3 = { 'user_id': [9, 10], @@ -140,12 +141,13 @@ def test_multi_prepare_commit_pk(self): } pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema) table_write.write_arrow(pa_table) - cm = table_write.prepare_commit() + cm = table_write.prepare_commit(2) # commit - table_commit.commit(cm) + table_commit.commit(cm, 2) table_write.close() table_commit.close() - self.assertEqual(3, table_write.file_store_write.commit_identifier) + self.assertEqual(2, table_write.file_store_write.commit_identifier) + read_builder = table.new_read_builder() table_read = read_builder.new_read() splits = read_builder.new_scan().plan().splits() diff --git a/paimon-python/pypaimon/write/batch_table_commit.py b/paimon-python/pypaimon/write/batch_table_commit.py index 7f42e1cef113..6111541cb0a0 100644 --- a/paimon-python/pypaimon/write/batch_table_commit.py +++ b/paimon-python/pypaimon/write/batch_table_commit.py @@ -16,9 +16,9 @@ # limitations under the License. ################################################################################ -import time from typing import List, Optional +from pypaimon.snapshot.snapshot import COMMIT_IDENTIFIER from pypaimon.write.commit_message import CommitMessage from pypaimon.write.file_store_commit import FileStoreCommit @@ -41,15 +41,13 @@ def __init__(self, table, commit_user: str, static_partition: Optional[dict]): self.file_store_commit = FileStoreCommit(snapshot_commit, table, commit_user) self.batch_committed = False - def commit(self, commit_messages: List[CommitMessage]): + def commit(self, commit_messages: List[CommitMessage], commit_identifier: int = COMMIT_IDENTIFIER): self._check_committed() non_empty_messages = [msg for msg in commit_messages if not msg.is_empty()] if not non_empty_messages: return - commit_identifier = int(time.time() * 1000) - try: if self.overwrite_partition is not None: self.file_store_commit.overwrite( diff --git a/paimon-python/pypaimon/write/batch_table_write.py b/paimon-python/pypaimon/write/batch_table_write.py index 596fcfb82a7c..6043b7947052 100644 --- a/paimon-python/pypaimon/write/batch_table_write.py +++ b/paimon-python/pypaimon/write/batch_table_write.py @@ -22,6 +22,7 @@ import pyarrow as pa from pypaimon.schema.data_types import PyarrowFieldParser +from pypaimon.snapshot.snapshot import COMMIT_IDENTIFIER from pypaimon.write.commit_message import CommitMessage from pypaimon.write.file_store_write import FileStoreWrite @@ -58,8 +59,8 @@ def write_pandas(self, dataframe): record_batch = pa.RecordBatch.from_pandas(dataframe, schema=pa_schema) return self.write_arrow_batch(record_batch) - def prepare_commit(self) -> List[CommitMessage]: - return self.file_store_write.prepare_commit() + def prepare_commit(self, commit_identifier: int = COMMIT_IDENTIFIER) -> List[CommitMessage]: + return self.file_store_write.prepare_commit(commit_identifier) def with_write_type(self, write_cols: List[str]): for col in write_cols: diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index f95e5e728356..364ba9f6228a 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -84,8 +84,8 @@ def _has_blob_columns(self) -> bool: return True return False - def prepare_commit(self) -> List[CommitMessage]: - self.commit_identifier += 1 + def prepare_commit(self,commit_identifier) -> List[CommitMessage]: + self.commit_identifier = commit_identifier commit_messages = [] for (partition, bucket), writer in self.data_writers.items(): committed_files = writer.prepare_commit() From 82ae8ad1deeeb4b8116226b079f9c8876c87defc Mon Sep 17 00:00:00 2001 From: umi Date: Tue, 4 Nov 2025 17:15:40 +0800 Subject: [PATCH 3/4] fix --- paimon-python/pypaimon/write/file_store_write.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index 364ba9f6228a..c100b64966d8 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -49,6 +49,7 @@ def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch): def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter: def max_seq_number(): return self._seq_number_stats(partition).get(bucket, 1) + # Check if table has blob columns if self._has_blob_columns(): return DataBlobWriter( @@ -84,7 +85,7 @@ def _has_blob_columns(self) -> bool: return True return False - def prepare_commit(self,commit_identifier) -> List[CommitMessage]: + def prepare_commit(self, commit_identifier) -> List[CommitMessage]: self.commit_identifier = commit_identifier commit_messages = [] for (partition, bucket), writer in self.data_writers.items(): From b11c8b0d8eac86aebad13ce530a64b63a812fdae Mon Sep 17 00:00:00 2001 From: umi Date: Tue, 4 Nov 2025 18:13:32 +0800 Subject: [PATCH 4/4] streamWrite --- .../pypaimon/table/file_store_table.py | 5 ++- paimon-python/pypaimon/table/table.py | 6 +++- .../pypaimon/tests/py36/ao_simple_test.py | 2 +- .../pypaimon/tests/write/table_write_test.py | 4 +-- ...{batch_table_commit.py => table_commit.py} | 15 ++++++-- .../{batch_table_write.py => table_write.py} | 24 ++++++++++--- ...atch_write_builder.py => write_builder.py} | 35 ++++++++++++++----- 7 files changed, 71 insertions(+), 20 deletions(-) rename paimon-python/pypaimon/write/{batch_table_commit.py => table_commit.py} (89%) rename paimon-python/pypaimon/write/{batch_table_write.py => table_write.py} (89%) rename paimon-python/pypaimon/write/{batch_write_builder.py => write_builder.py} (73%) diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index cde282eff005..6132c9bd6926 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -28,7 +28,7 @@ from pypaimon.schema.table_schema import TableSchema from pypaimon.table.bucket_mode import BucketMode from pypaimon.table.table import Table -from pypaimon.write.batch_write_builder import BatchWriteBuilder +from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor, FixedBucketRowKeyExtractor, PostponeBucketRowKeyExtractor, @@ -98,6 +98,9 @@ def new_read_builder(self) -> 'ReadBuilder': def new_batch_write_builder(self) -> BatchWriteBuilder: return BatchWriteBuilder(self) + def new_stream_write_builder(self) -> StreamWriteBuilder: + return StreamWriteBuilder(self) + def create_row_key_extractor(self) -> RowKeyExtractor: bucket_mode = self.bucket_mode() if bucket_mode == BucketMode.HASH_FIXED: diff --git a/paimon-python/pypaimon/table/table.py b/paimon-python/pypaimon/table/table.py index 3a1fe2e6228c..e20784f1fc9d 100644 --- a/paimon-python/pypaimon/table/table.py +++ b/paimon-python/pypaimon/table/table.py @@ -19,7 +19,7 @@ from abc import ABC, abstractmethod from pypaimon.read.read_builder import ReadBuilder -from pypaimon.write.batch_write_builder import BatchWriteBuilder +from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder class Table(ABC): @@ -32,3 +32,7 @@ def new_read_builder(self) -> ReadBuilder: @abstractmethod def new_batch_write_builder(self) -> BatchWriteBuilder: """Returns a builder for building batch table write and table commit.""" + + @abstractmethod + def new_stream_write_builder(self) -> StreamWriteBuilder: + """Returns a builder for building stream table write and table commit.""" diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py b/paimon-python/pypaimon/tests/py36/ao_simple_test.py index d361d06897c3..b14fbcf8dbd7 100644 --- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py @@ -426,7 +426,7 @@ def test_multi_prepare_commit_ao(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.rest_catalog.create_table('default.test_append_only_parquet', schema, False) table = self.rest_catalog.get_table('default.test_append_only_parquet') - write_builder = table.new_batch_write_builder() + write_builder = table.new_stream_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py b/paimon-python/pypaimon/tests/write/table_write_test.py index e582d920a85e..21b76731acd0 100644 --- a/paimon-python/pypaimon/tests/write/table_write_test.py +++ b/paimon-python/pypaimon/tests/write/table_write_test.py @@ -57,7 +57,7 @@ def test_multi_prepare_commit_ao(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) self.catalog.create_table('default.test_append_only_parquet', schema, False) table = self.catalog.get_table('default.test_append_only_parquet') - write_builder = table.new_batch_write_builder() + write_builder = table.new_stream_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() @@ -108,7 +108,7 @@ def test_multi_prepare_commit_pk(self): options={'bucket': '2'}) self.catalog.create_table('default.test_primary_key_parquet', schema, False) table = self.catalog.get_table('default.test_primary_key_parquet') - write_builder = table.new_batch_write_builder() + write_builder = table.new_stream_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() diff --git a/paimon-python/pypaimon/write/batch_table_commit.py b/paimon-python/pypaimon/write/table_commit.py similarity index 89% rename from paimon-python/pypaimon/write/batch_table_commit.py rename to paimon-python/pypaimon/write/table_commit.py index 6111541cb0a0..0dcfabf99e30 100644 --- a/paimon-python/pypaimon/write/batch_table_commit.py +++ b/paimon-python/pypaimon/write/table_commit.py @@ -23,7 +23,7 @@ from pypaimon.write.file_store_commit import FileStoreCommit -class BatchTableCommit: +class TableCommit: """Python implementation of BatchTableCommit for batch writing scenarios.""" def __init__(self, table, commit_user: str, static_partition: Optional[dict]): @@ -41,7 +41,7 @@ def __init__(self, table, commit_user: str, static_partition: Optional[dict]): self.file_store_commit = FileStoreCommit(snapshot_commit, table, commit_user) self.batch_committed = False - def commit(self, commit_messages: List[CommitMessage], commit_identifier: int = COMMIT_IDENTIFIER): + def _commit(self, commit_messages: List[CommitMessage], commit_identifier: int = COMMIT_IDENTIFIER): self._check_committed() non_empty_messages = [msg for msg in commit_messages if not msg.is_empty()] @@ -74,3 +74,14 @@ def _check_committed(self): if self.batch_committed: raise RuntimeError("BatchTableCommit only supports one-time committing.") self.batch_committed = True + + +class BatchTableCommit(TableCommit): + def commit(self, commit_messages: List[CommitMessage]): + self._commit(commit_messages, COMMIT_IDENTIFIER) + + +class StreamTableCommit(TableCommit): + + def commit(self, commit_messages: List[CommitMessage], commit_identifier: int = COMMIT_IDENTIFIER): + self._commit(commit_messages, commit_identifier) diff --git a/paimon-python/pypaimon/write/batch_table_write.py b/paimon-python/pypaimon/write/table_write.py similarity index 89% rename from paimon-python/pypaimon/write/batch_table_write.py rename to paimon-python/pypaimon/write/table_write.py index 6043b7947052..3ccc0782230d 100644 --- a/paimon-python/pypaimon/write/batch_table_write.py +++ b/paimon-python/pypaimon/write/table_write.py @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - from collections import defaultdict from typing import List @@ -27,7 +26,7 @@ from pypaimon.write.file_store_write import FileStoreWrite -class BatchTableWrite: +class TableWrite: def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable @@ -59,9 +58,6 @@ def write_pandas(self, dataframe): record_batch = pa.RecordBatch.from_pandas(dataframe, schema=pa_schema) return self.write_arrow_batch(record_batch) - def prepare_commit(self, commit_identifier: int = COMMIT_IDENTIFIER) -> List[CommitMessage]: - return self.file_store_write.prepare_commit(commit_identifier) - def with_write_type(self, write_cols: List[str]): for col in write_cols: if col not in self.table_pyarrow_schema.names: @@ -80,3 +76,21 @@ def _validate_pyarrow_schema(self, data_schema: pa.Schema): f"Input schema is: {data_schema} " f"Table schema is: {self.table_pyarrow_schema} " f"Write cols is: {self.file_store_write.write_cols}") + + +class BatchTableWrite(TableWrite): + def __init__(self, table): + super().__init__(table) + self.batch_committed = False + + def prepare_commit(self) -> List[CommitMessage]: + if self.batch_committed: + raise RuntimeError("BatchTableWrite only supports one-time committing.") + self.batch_committed = True + return self.file_store_write.prepare_commit(COMMIT_IDENTIFIER) + + +class StreamTableWrite(TableWrite): + + def prepare_commit(self, commit_identifier: int = COMMIT_IDENTIFIER) -> List[CommitMessage]: + return self.file_store_write.prepare_commit(commit_identifier) diff --git a/paimon-python/pypaimon/write/batch_write_builder.py b/paimon-python/pypaimon/write/write_builder.py similarity index 73% rename from paimon-python/pypaimon/write/batch_write_builder.py rename to paimon-python/pypaimon/write/write_builder.py index 2380530fbc29..8c9ed725f5e6 100644 --- a/paimon-python/pypaimon/write/batch_write_builder.py +++ b/paimon-python/pypaimon/write/write_builder.py @@ -17,14 +17,15 @@ ################################################################################ import uuid +from abc import ABC from typing import Optional from pypaimon.common.core_options import CoreOptions -from pypaimon.write.batch_table_commit import BatchTableCommit -from pypaimon.write.batch_table_write import BatchTableWrite +from pypaimon.write.table_commit import BatchTableCommit, StreamTableCommit, TableCommit +from pypaimon.write.table_write import BatchTableWrite, StreamTableWrite, TableWrite -class BatchWriteBuilder: +class WriteBuilder(ABC): def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable @@ -36,15 +37,33 @@ def overwrite(self, static_partition: Optional[dict] = None): self.static_partition = static_partition if static_partition is not None else {} return self - def new_write(self) -> BatchTableWrite: - return BatchTableWrite(self.table) + def new_write(self) -> TableWrite: + """Returns a table write.""" - def new_commit(self) -> BatchTableCommit: - commit = BatchTableCommit(self.table, self.commit_user, self.static_partition) - return commit + def new_commit(self) -> TableCommit: + """Returns a table commit.""" def _create_commit_user(self): if CoreOptions.COMMIT_USER_PREFIX in self.table.options: return f"{self.table.options.get(CoreOptions.COMMIT_USER_PREFIX)}_{uuid.uuid4()}" else: return str(uuid.uuid4()) + + +class BatchWriteBuilder(WriteBuilder): + + def new_write(self) -> BatchTableWrite: + return BatchTableWrite(self.table) + + def new_commit(self) -> BatchTableCommit: + commit = BatchTableCommit(self.table, self.commit_user, self.static_partition) + return commit + + +class StreamWriteBuilder(WriteBuilder): + def new_write(self) -> StreamTableWrite: + return StreamTableWrite(self.table) + + def new_commit(self) -> StreamTableCommit: + commit = StreamTableCommit(self.table, self.commit_user, self.static_partition) + return commit