Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions paimon-python/pypaimon/snapshot/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

from pypaimon.common.json_util import json_field

COMMIT_IDENTIFIER = 0x7fffffffffffffff


@dataclass
class Snapshot:
Expand Down
5 changes: 4 additions & 1 deletion paimon-python/pypaimon/table/file_store_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pypaimon.schema.table_schema import TableSchema
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.table.table import Table
from pypaimon.write.batch_write_builder import BatchWriteBuilder
from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder
from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor,
FixedBucketRowKeyExtractor,
PostponeBucketRowKeyExtractor,
Expand Down Expand Up @@ -98,6 +98,9 @@ def new_read_builder(self) -> 'ReadBuilder':
def new_batch_write_builder(self) -> BatchWriteBuilder:
return BatchWriteBuilder(self)

def new_stream_write_builder(self) -> StreamWriteBuilder:
return StreamWriteBuilder(self)

def create_row_key_extractor(self) -> RowKeyExtractor:
bucket_mode = self.bucket_mode()
if bucket_mode == BucketMode.HASH_FIXED:
Expand Down
6 changes: 5 additions & 1 deletion paimon-python/pypaimon/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from abc import ABC, abstractmethod

from pypaimon.read.read_builder import ReadBuilder
from pypaimon.write.batch_write_builder import BatchWriteBuilder
from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder


class Table(ABC):
Expand All @@ -32,3 +32,7 @@ def new_read_builder(self) -> ReadBuilder:
@abstractmethod
def new_batch_write_builder(self) -> BatchWriteBuilder:
"""Returns a builder for building batch table write and table commit."""

@abstractmethod
def new_stream_write_builder(self) -> StreamWriteBuilder:
"""Returns a builder for building stream table write and table commit."""
56 changes: 56 additions & 0 deletions paimon-python/pypaimon/tests/py36/ao_simple_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,59 @@ def test_initialize_oss_fs_pyarrow_lt_7(self):
session_token="TOKEN",
region="cn-hangzhou",
endpoint_override="oss-bucket." + props[OssOptions.OSS_ENDPOINT])

def test_multi_prepare_commit_ao(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_parquet', schema, False)
table = self.rest_catalog.get_table('default.test_append_only_parquet')
write_builder = table.new_stream_write_builder()

table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
# write 1
data1 = {
'user_id': [1, 2, 3, 4],
'item_id': [1001, 1002, 1003, 1004],
'behavior': ['a', 'b', 'c', None],
'dt': ['p1', 'p1', 'p2', 'p1'],
}
pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema)
table_write.write_arrow(pa_table)
table_write.prepare_commit(0)
# write 2
data2 = {
'user_id': [5, 6, 7, 8],
'item_id': [1005, 1006, 1007, 1008],
'behavior': ['e', 'f', 'g', 'h'],
'dt': ['p2', 'p1', 'p2', 'p2'],
}
pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema)
table_write.write_arrow(pa_table)
table_write.prepare_commit(1)
# write 3
data3 = {
'user_id': [9, 10],
'item_id': [1009, 1010],
'behavior': ['i', 'j'],
'dt': ['p2', 'p1'],
}
pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema)
table_write.write_arrow(pa_table)
cm = table_write.prepare_commit(2)
# commit
table_commit.commit(cm, 2)
table_write.close()
table_commit.close()
self.assertEqual(2, table_write.file_store_write.commit_identifier)

read_builder = table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_sort_by(table_read.to_arrow(splits), 'user_id')
expected = pa.Table.from_pydict({
'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010],
'behavior': ['a', 'b', 'c', None, 'e', 'f', 'g', 'h', 'i', 'j'],
'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p2', 'p2', 'p1']
}, schema=self.pa_schema)
self.assertEqual(expected, actual)
155 changes: 155 additions & 0 deletions paimon-python/pypaimon/tests/write/table_write_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import os
import shutil

import tempfile
import unittest

from pypaimon import CatalogFactory, Schema
import pyarrow as pa


class TableWriteTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.tempdir = tempfile.mkdtemp()
cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
cls.catalog = CatalogFactory.create({
'warehouse': cls.warehouse
})
cls.catalog.create_database('default', True)

cls.pa_schema = pa.schema([
('user_id', pa.int32()),
('item_id', pa.int64()),
('behavior', pa.string()),
('dt', pa.string())
])
cls.expected = pa.Table.from_pydict({
'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010],
'behavior': ['a', 'b', 'c', None, 'e', 'f', 'g', 'h', 'i', 'j'],
'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p2', 'p2', 'p1']
}, schema=cls.pa_schema)

@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tempdir, ignore_errors=True)

def test_multi_prepare_commit_ao(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
self.catalog.create_table('default.test_append_only_parquet', schema, False)
table = self.catalog.get_table('default.test_append_only_parquet')
write_builder = table.new_stream_write_builder()

table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
# write 1
data1 = {
'user_id': [1, 2, 3, 4],
'item_id': [1001, 1002, 1003, 1004],
'behavior': ['a', 'b', 'c', None],
'dt': ['p1', 'p1', 'p2', 'p1'],
}
pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema)
table_write.write_arrow(pa_table)
table_write.prepare_commit(0)
# write 2
data2 = {
'user_id': [5, 6, 7, 8],
'item_id': [1005, 1006, 1007, 1008],
'behavior': ['e', 'f', 'g', 'h'],
'dt': ['p2', 'p1', 'p2', 'p2'],
}
pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema)
table_write.write_arrow(pa_table)
table_write.prepare_commit(1)
# write 3
data3 = {
'user_id': [9, 10],
'item_id': [1009, 1010],
'behavior': ['i', 'j'],
'dt': ['p2', 'p1'],
}
pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema)
table_write.write_arrow(pa_table)
cm = table_write.prepare_commit(2)
# commit
table_commit.commit(cm, 2)
table_write.close()
table_commit.close()
self.assertEqual(2, table_write.file_store_write.commit_identifier)

read_builder = table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_read.to_arrow(splits).sort_by('user_id')
self.assertEqual(self.expected, actual)

def test_multi_prepare_commit_pk(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], primary_keys=['user_id', 'dt'],
options={'bucket': '2'})
self.catalog.create_table('default.test_primary_key_parquet', schema, False)
table = self.catalog.get_table('default.test_primary_key_parquet')
write_builder = table.new_stream_write_builder()

table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
# write 1
data1 = {
'user_id': [1, 2, 3, 4],
'item_id': [1001, 1002, 1003, 1004],
'behavior': ['a', 'b', 'c', None],
'dt': ['p1', 'p1', 'p2', 'p1'],
}
pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema)
table_write.write_arrow(pa_table)
table_write.prepare_commit(0)
# write 2
data2 = {
'user_id': [5, 6, 7, 8],
'item_id': [1005, 1006, 1007, 1008],
'behavior': ['e', 'f', 'g', 'h'],
'dt': ['p2', 'p1', 'p2', 'p2'],
}
pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema)
table_write.write_arrow(pa_table)
table_write.prepare_commit(1)
# write 3
data3 = {
'user_id': [9, 10],
'item_id': [1009, 1010],
'behavior': ['i', 'j'],
'dt': ['p2', 'p1'],
}
pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema)
table_write.write_arrow(pa_table)
cm = table_write.prepare_commit(2)
# commit
table_commit.commit(cm, 2)
table_write.close()
table_commit.close()
self.assertEqual(2, table_write.file_store_write.commit_identifier)

read_builder = table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
actual = table_read.to_arrow(splits).sort_by('user_id')
self.assertEqual(self.expected, actual)
5 changes: 4 additions & 1 deletion paimon-python/pypaimon/write/file_store_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(self, table):
self.data_writers: Dict[Tuple, DataWriter] = {}
self.max_seq_numbers: dict = {}
self.write_cols = None
self.commit_identifier = 0

def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
key = (partition, bucket)
Expand All @@ -48,6 +49,7 @@ def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter:
def max_seq_number():
return self._seq_number_stats(partition).get(bucket, 1)

# Check if table has blob columns
if self._has_blob_columns():
return DataBlobWriter(
Expand Down Expand Up @@ -83,7 +85,8 @@ def _has_blob_columns(self) -> bool:
return True
return False

def prepare_commit(self) -> List[CommitMessage]:
def prepare_commit(self, commit_identifier) -> List[CommitMessage]:
self.commit_identifier = commit_identifier
commit_messages = []
for (partition, bucket), writer in self.data_writers.items():
committed_files = writer.prepare_commit()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
# limitations under the License.
################################################################################

import time
from typing import List, Optional

from pypaimon.snapshot.snapshot import COMMIT_IDENTIFIER
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.file_store_commit import FileStoreCommit


class BatchTableCommit:
class TableCommit:
"""Python implementation of BatchTableCommit for batch writing scenarios."""

def __init__(self, table, commit_user: str, static_partition: Optional[dict]):
Expand All @@ -41,15 +41,13 @@ def __init__(self, table, commit_user: str, static_partition: Optional[dict]):
self.file_store_commit = FileStoreCommit(snapshot_commit, table, commit_user)
self.batch_committed = False

def commit(self, commit_messages: List[CommitMessage]):
def _commit(self, commit_messages: List[CommitMessage], commit_identifier: int = COMMIT_IDENTIFIER):
self._check_committed()

non_empty_messages = [msg for msg in commit_messages if not msg.is_empty()]
if not non_empty_messages:
return

commit_identifier = int(time.time() * 1000)

try:
if self.overwrite_partition is not None:
self.file_store_commit.overwrite(
Expand All @@ -76,3 +74,14 @@ def _check_committed(self):
if self.batch_committed:
raise RuntimeError("BatchTableCommit only supports one-time committing.")
self.batch_committed = True


class BatchTableCommit(TableCommit):
def commit(self, commit_messages: List[CommitMessage]):
self._commit(commit_messages, COMMIT_IDENTIFIER)


class StreamTableCommit(TableCommit):

def commit(self, commit_messages: List[CommitMessage], commit_identifier: int = COMMIT_IDENTIFIER):
self._commit(commit_messages, commit_identifier)
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from collections import defaultdict
from typing import List

import pyarrow as pa

from pypaimon.schema.data_types import PyarrowFieldParser
from pypaimon.snapshot.snapshot import COMMIT_IDENTIFIER
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.file_store_write import FileStoreWrite


class BatchTableWrite:
class TableWrite:
def __init__(self, table):
from pypaimon.table.file_store_table import FileStoreTable

self.table: FileStoreTable = table
self.table_pyarrow_schema = PyarrowFieldParser.from_paimon_schema(self.table.table_schema.fields)
self.file_store_write = FileStoreWrite(self.table)
self.row_key_extractor = self.table.create_row_key_extractor()
self.batch_committed = False

def write_arrow(self, table: pa.Table):
batches_iterator = table.to_batches()
Expand All @@ -59,12 +58,6 @@ def write_pandas(self, dataframe):
record_batch = pa.RecordBatch.from_pandas(dataframe, schema=pa_schema)
return self.write_arrow_batch(record_batch)

def prepare_commit(self) -> List[CommitMessage]:
if self.batch_committed:
raise RuntimeError("BatchTableWrite only supports one-time committing.")
self.batch_committed = True
return self.file_store_write.prepare_commit()

def with_write_type(self, write_cols: List[str]):
for col in write_cols:
if col not in self.table_pyarrow_schema.names:
Expand All @@ -83,3 +76,21 @@ def _validate_pyarrow_schema(self, data_schema: pa.Schema):
f"Input schema is: {data_schema} "
f"Table schema is: {self.table_pyarrow_schema} "
f"Write cols is: {self.file_store_write.write_cols}")


class BatchTableWrite(TableWrite):
def __init__(self, table):
super().__init__(table)
self.batch_committed = False

def prepare_commit(self) -> List[CommitMessage]:
if self.batch_committed:
raise RuntimeError("BatchTableWrite only supports one-time committing.")
self.batch_committed = True
return self.file_store_write.prepare_commit(COMMIT_IDENTIFIER)


class StreamTableWrite(TableWrite):

def prepare_commit(self, commit_identifier: int = COMMIT_IDENTIFIER) -> List[CommitMessage]:
return self.file_store_write.prepare_commit(commit_identifier)
Loading