diff --git a/paimon-python/pypaimon/snapshot/snapshot.py b/paimon-python/pypaimon/snapshot/snapshot.py index 96b287ab5575..f164f5eb61ba 100644 --- a/paimon-python/pypaimon/snapshot/snapshot.py +++ b/paimon-python/pypaimon/snapshot/snapshot.py @@ -21,6 +21,8 @@ from pypaimon.common.json_util import json_field +COMMIT_IDENTIFIER = 0x7fffffffffffffff + @dataclass class Snapshot: diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index cde282eff005..6132c9bd6926 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -28,7 +28,7 @@ from pypaimon.schema.table_schema import TableSchema from pypaimon.table.bucket_mode import BucketMode from pypaimon.table.table import Table -from pypaimon.write.batch_write_builder import BatchWriteBuilder +from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor, FixedBucketRowKeyExtractor, PostponeBucketRowKeyExtractor, @@ -98,6 +98,9 @@ def new_read_builder(self) -> 'ReadBuilder': def new_batch_write_builder(self) -> BatchWriteBuilder: return BatchWriteBuilder(self) + def new_stream_write_builder(self) -> StreamWriteBuilder: + return StreamWriteBuilder(self) + def create_row_key_extractor(self) -> RowKeyExtractor: bucket_mode = self.bucket_mode() if bucket_mode == BucketMode.HASH_FIXED: diff --git a/paimon-python/pypaimon/table/table.py b/paimon-python/pypaimon/table/table.py index 3a1fe2e6228c..e20784f1fc9d 100644 --- a/paimon-python/pypaimon/table/table.py +++ b/paimon-python/pypaimon/table/table.py @@ -19,7 +19,7 @@ from abc import ABC, abstractmethod from pypaimon.read.read_builder import ReadBuilder -from pypaimon.write.batch_write_builder import BatchWriteBuilder +from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder class Table(ABC): @@ -32,3 +32,7 @@ def new_read_builder(self) -> ReadBuilder: @abstractmethod def new_batch_write_builder(self) -> BatchWriteBuilder: """Returns a builder for building batch table write and table commit.""" + + @abstractmethod + def new_stream_write_builder(self) -> StreamWriteBuilder: + """Returns a builder for building stream table write and table commit.""" diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py b/paimon-python/pypaimon/tests/py36/ao_simple_test.py index efb3189e0649..b14fbcf8dbd7 100644 --- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py @@ -421,3 +421,59 @@ def test_initialize_oss_fs_pyarrow_lt_7(self): session_token="TOKEN", region="cn-hangzhou", endpoint_override="oss-bucket." + props[OssOptions.OSS_ENDPOINT]) + + def test_multi_prepare_commit_ao(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.rest_catalog.create_table('default.test_append_only_parquet', schema, False) + table = self.rest_catalog.get_table('default.test_append_only_parquet') + write_builder = table.new_stream_write_builder() + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + # write 1 + data1 = { + 'user_id': [1, 2, 3, 4], + 'item_id': [1001, 1002, 1003, 1004], + 'behavior': ['a', 'b', 'c', None], + 'dt': ['p1', 'p1', 'p2', 'p1'], + } + pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_write.prepare_commit(0) + # write 2 + data2 = { + 'user_id': [5, 6, 7, 8], + 'item_id': [1005, 1006, 1007, 1008], + 'behavior': ['e', 'f', 'g', 'h'], + 'dt': ['p2', 'p1', 'p2', 'p2'], + } + pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_write.prepare_commit(1) + # write 3 + data3 = { + 'user_id': [9, 10], + 'item_id': [1009, 1010], + 'behavior': ['i', 'j'], + 'dt': ['p2', 'p1'], + } + pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema) + table_write.write_arrow(pa_table) + cm = table_write.prepare_commit(2) + # commit + table_commit.commit(cm, 2) + table_write.close() + table_commit.close() + self.assertEqual(2, table_write.file_store_write.commit_identifier) + + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + actual = table_sort_by(table_read.to_arrow(splits), 'user_id') + expected = pa.Table.from_pydict({ + 'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + 'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010], + 'behavior': ['a', 'b', 'c', None, 'e', 'f', 'g', 'h', 'i', 'j'], + 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p2', 'p2', 'p1'] + }, schema=self.pa_schema) + self.assertEqual(expected, actual) diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py b/paimon-python/pypaimon/tests/write/table_write_test.py new file mode 100644 index 000000000000..21b76731acd0 --- /dev/null +++ b/paimon-python/pypaimon/tests/write/table_write_test.py @@ -0,0 +1,155 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import os +import shutil + +import tempfile +import unittest + +from pypaimon import CatalogFactory, Schema +import pyarrow as pa + + +class TableWriteTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({ + 'warehouse': cls.warehouse + }) + cls.catalog.create_database('default', True) + + cls.pa_schema = pa.schema([ + ('user_id', pa.int32()), + ('item_id', pa.int64()), + ('behavior', pa.string()), + ('dt', pa.string()) + ]) + cls.expected = pa.Table.from_pydict({ + 'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + 'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010], + 'behavior': ['a', 'b', 'c', None, 'e', 'f', 'g', 'h', 'i', 'j'], + 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p2', 'p2', 'p1'] + }, schema=cls.pa_schema) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def test_multi_prepare_commit_ao(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.catalog.create_table('default.test_append_only_parquet', schema, False) + table = self.catalog.get_table('default.test_append_only_parquet') + write_builder = table.new_stream_write_builder() + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + # write 1 + data1 = { + 'user_id': [1, 2, 3, 4], + 'item_id': [1001, 1002, 1003, 1004], + 'behavior': ['a', 'b', 'c', None], + 'dt': ['p1', 'p1', 'p2', 'p1'], + } + pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_write.prepare_commit(0) + # write 2 + data2 = { + 'user_id': [5, 6, 7, 8], + 'item_id': [1005, 1006, 1007, 1008], + 'behavior': ['e', 'f', 'g', 'h'], + 'dt': ['p2', 'p1', 'p2', 'p2'], + } + pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_write.prepare_commit(1) + # write 3 + data3 = { + 'user_id': [9, 10], + 'item_id': [1009, 1010], + 'behavior': ['i', 'j'], + 'dt': ['p2', 'p1'], + } + pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema) + table_write.write_arrow(pa_table) + cm = table_write.prepare_commit(2) + # commit + table_commit.commit(cm, 2) + table_write.close() + table_commit.close() + self.assertEqual(2, table_write.file_store_write.commit_identifier) + + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + actual = table_read.to_arrow(splits).sort_by('user_id') + self.assertEqual(self.expected, actual) + + def test_multi_prepare_commit_pk(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'], + options={'bucket': '2'}) + self.catalog.create_table('default.test_primary_key_parquet', schema, False) + table = self.catalog.get_table('default.test_primary_key_parquet') + write_builder = table.new_stream_write_builder() + + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + # write 1 + data1 = { + 'user_id': [1, 2, 3, 4], + 'item_id': [1001, 1002, 1003, 1004], + 'behavior': ['a', 'b', 'c', None], + 'dt': ['p1', 'p1', 'p2', 'p1'], + } + pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_write.prepare_commit(0) + # write 2 + data2 = { + 'user_id': [5, 6, 7, 8], + 'item_id': [1005, 1006, 1007, 1008], + 'behavior': ['e', 'f', 'g', 'h'], + 'dt': ['p2', 'p1', 'p2', 'p2'], + } + pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_write.prepare_commit(1) + # write 3 + data3 = { + 'user_id': [9, 10], + 'item_id': [1009, 1010], + 'behavior': ['i', 'j'], + 'dt': ['p2', 'p1'], + } + pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema) + table_write.write_arrow(pa_table) + cm = table_write.prepare_commit(2) + # commit + table_commit.commit(cm, 2) + table_write.close() + table_commit.close() + self.assertEqual(2, table_write.file_store_write.commit_identifier) + + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + actual = table_read.to_arrow(splits).sort_by('user_id') + self.assertEqual(self.expected, actual) diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index 9ebabf110359..c100b64966d8 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -37,6 +37,7 @@ def __init__(self, table): self.data_writers: Dict[Tuple, DataWriter] = {} self.max_seq_numbers: dict = {} self.write_cols = None + self.commit_identifier = 0 def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch): key = (partition, bucket) @@ -48,6 +49,7 @@ def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch): def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter: def max_seq_number(): return self._seq_number_stats(partition).get(bucket, 1) + # Check if table has blob columns if self._has_blob_columns(): return DataBlobWriter( @@ -83,7 +85,8 @@ def _has_blob_columns(self) -> bool: return True return False - def prepare_commit(self) -> List[CommitMessage]: + def prepare_commit(self, commit_identifier) -> List[CommitMessage]: + self.commit_identifier = commit_identifier commit_messages = [] for (partition, bucket), writer in self.data_writers.items(): committed_files = writer.prepare_commit() diff --git a/paimon-python/pypaimon/write/batch_table_commit.py b/paimon-python/pypaimon/write/table_commit.py similarity index 86% rename from paimon-python/pypaimon/write/batch_table_commit.py rename to paimon-python/pypaimon/write/table_commit.py index 7f42e1cef113..0dcfabf99e30 100644 --- a/paimon-python/pypaimon/write/batch_table_commit.py +++ b/paimon-python/pypaimon/write/table_commit.py @@ -16,14 +16,14 @@ # limitations under the License. ################################################################################ -import time from typing import List, Optional +from pypaimon.snapshot.snapshot import COMMIT_IDENTIFIER from pypaimon.write.commit_message import CommitMessage from pypaimon.write.file_store_commit import FileStoreCommit -class BatchTableCommit: +class TableCommit: """Python implementation of BatchTableCommit for batch writing scenarios.""" def __init__(self, table, commit_user: str, static_partition: Optional[dict]): @@ -41,15 +41,13 @@ def __init__(self, table, commit_user: str, static_partition: Optional[dict]): self.file_store_commit = FileStoreCommit(snapshot_commit, table, commit_user) self.batch_committed = False - def commit(self, commit_messages: List[CommitMessage]): + def _commit(self, commit_messages: List[CommitMessage], commit_identifier: int = COMMIT_IDENTIFIER): self._check_committed() non_empty_messages = [msg for msg in commit_messages if not msg.is_empty()] if not non_empty_messages: return - commit_identifier = int(time.time() * 1000) - try: if self.overwrite_partition is not None: self.file_store_commit.overwrite( @@ -76,3 +74,14 @@ def _check_committed(self): if self.batch_committed: raise RuntimeError("BatchTableCommit only supports one-time committing.") self.batch_committed = True + + +class BatchTableCommit(TableCommit): + def commit(self, commit_messages: List[CommitMessage]): + self._commit(commit_messages, COMMIT_IDENTIFIER) + + +class StreamTableCommit(TableCommit): + + def commit(self, commit_messages: List[CommitMessage], commit_identifier: int = COMMIT_IDENTIFIER): + self._commit(commit_messages, commit_identifier) diff --git a/paimon-python/pypaimon/write/batch_table_write.py b/paimon-python/pypaimon/write/table_write.py similarity index 89% rename from paimon-python/pypaimon/write/batch_table_write.py rename to paimon-python/pypaimon/write/table_write.py index f8d0660bfa84..3ccc0782230d 100644 --- a/paimon-python/pypaimon/write/batch_table_write.py +++ b/paimon-python/pypaimon/write/table_write.py @@ -15,18 +15,18 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - from collections import defaultdict from typing import List import pyarrow as pa from pypaimon.schema.data_types import PyarrowFieldParser +from pypaimon.snapshot.snapshot import COMMIT_IDENTIFIER from pypaimon.write.commit_message import CommitMessage from pypaimon.write.file_store_write import FileStoreWrite -class BatchTableWrite: +class TableWrite: def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable @@ -34,7 +34,6 @@ def __init__(self, table): self.table_pyarrow_schema = PyarrowFieldParser.from_paimon_schema(self.table.table_schema.fields) self.file_store_write = FileStoreWrite(self.table) self.row_key_extractor = self.table.create_row_key_extractor() - self.batch_committed = False def write_arrow(self, table: pa.Table): batches_iterator = table.to_batches() @@ -59,12 +58,6 @@ def write_pandas(self, dataframe): record_batch = pa.RecordBatch.from_pandas(dataframe, schema=pa_schema) return self.write_arrow_batch(record_batch) - def prepare_commit(self) -> List[CommitMessage]: - if self.batch_committed: - raise RuntimeError("BatchTableWrite only supports one-time committing.") - self.batch_committed = True - return self.file_store_write.prepare_commit() - def with_write_type(self, write_cols: List[str]): for col in write_cols: if col not in self.table_pyarrow_schema.names: @@ -83,3 +76,21 @@ def _validate_pyarrow_schema(self, data_schema: pa.Schema): f"Input schema is: {data_schema} " f"Table schema is: {self.table_pyarrow_schema} " f"Write cols is: {self.file_store_write.write_cols}") + + +class BatchTableWrite(TableWrite): + def __init__(self, table): + super().__init__(table) + self.batch_committed = False + + def prepare_commit(self) -> List[CommitMessage]: + if self.batch_committed: + raise RuntimeError("BatchTableWrite only supports one-time committing.") + self.batch_committed = True + return self.file_store_write.prepare_commit(COMMIT_IDENTIFIER) + + +class StreamTableWrite(TableWrite): + + def prepare_commit(self, commit_identifier: int = COMMIT_IDENTIFIER) -> List[CommitMessage]: + return self.file_store_write.prepare_commit(commit_identifier) diff --git a/paimon-python/pypaimon/write/batch_write_builder.py b/paimon-python/pypaimon/write/write_builder.py similarity index 73% rename from paimon-python/pypaimon/write/batch_write_builder.py rename to paimon-python/pypaimon/write/write_builder.py index 2380530fbc29..8c9ed725f5e6 100644 --- a/paimon-python/pypaimon/write/batch_write_builder.py +++ b/paimon-python/pypaimon/write/write_builder.py @@ -17,14 +17,15 @@ ################################################################################ import uuid +from abc import ABC from typing import Optional from pypaimon.common.core_options import CoreOptions -from pypaimon.write.batch_table_commit import BatchTableCommit -from pypaimon.write.batch_table_write import BatchTableWrite +from pypaimon.write.table_commit import BatchTableCommit, StreamTableCommit, TableCommit +from pypaimon.write.table_write import BatchTableWrite, StreamTableWrite, TableWrite -class BatchWriteBuilder: +class WriteBuilder(ABC): def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable @@ -36,15 +37,33 @@ def overwrite(self, static_partition: Optional[dict] = None): self.static_partition = static_partition if static_partition is not None else {} return self - def new_write(self) -> BatchTableWrite: - return BatchTableWrite(self.table) + def new_write(self) -> TableWrite: + """Returns a table write.""" - def new_commit(self) -> BatchTableCommit: - commit = BatchTableCommit(self.table, self.commit_user, self.static_partition) - return commit + def new_commit(self) -> TableCommit: + """Returns a table commit.""" def _create_commit_user(self): if CoreOptions.COMMIT_USER_PREFIX in self.table.options: return f"{self.table.options.get(CoreOptions.COMMIT_USER_PREFIX)}_{uuid.uuid4()}" else: return str(uuid.uuid4()) + + +class BatchWriteBuilder(WriteBuilder): + + def new_write(self) -> BatchTableWrite: + return BatchTableWrite(self.table) + + def new_commit(self) -> BatchTableCommit: + commit = BatchTableCommit(self.table, self.commit_user, self.static_partition) + return commit + + +class StreamWriteBuilder(WriteBuilder): + def new_write(self) -> StreamTableWrite: + return StreamTableWrite(self.table) + + def new_commit(self) -> StreamTableCommit: + commit = StreamTableCommit(self.table, self.commit_user, self.static_partition) + return commit