Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 36 additions & 40 deletions paimon-python/pypaimon/manifest/manifest_file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
ManifestEntry)
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.table.row.binary_row import (BinaryRow, BinaryRowDeserializer,
from pypaimon.table.row.binary_row import (BinaryRowDeserializer,
BinaryRowSerializer)
from pypaimon.write.commit_message import CommitMessage


class ManifestFileManager:
Expand Down Expand Up @@ -99,46 +98,43 @@ def read(self, manifest_file_name: str, shard_filter=None) -> List[ManifestEntry
entries.append(entry)
return entries

def write(self, file_name, commit_messages: List[CommitMessage]):
def write(self, file_name, entries: List[ManifestEntry]):
avro_records = []
for message in commit_messages:
partition_bytes = BinaryRowSerializer.to_bytes(
BinaryRow(list(message.partition), self.table.table_schema.get_partition_key_fields()))
for file in message.new_files:
avro_record = {
"_VERSION": 2,
"_KIND": 0,
"_PARTITION": partition_bytes,
"_BUCKET": message.bucket,
"_TOTAL_BUCKETS": self.table.total_buckets,
"_FILE": {
"_FILE_NAME": file.file_name,
"_FILE_SIZE": file.file_size,
"_ROW_COUNT": file.row_count,
"_MIN_KEY": BinaryRowSerializer.to_bytes(file.min_key),
"_MAX_KEY": BinaryRowSerializer.to_bytes(file.max_key),
"_KEY_STATS": {
"_MIN_VALUES": BinaryRowSerializer.to_bytes(file.key_stats.min_values),
"_MAX_VALUES": BinaryRowSerializer.to_bytes(file.key_stats.max_values),
"_NULL_COUNTS": file.key_stats.null_counts,
},
"_VALUE_STATS": {
"_MIN_VALUES": BinaryRowSerializer.to_bytes(file.value_stats.min_values),
"_MAX_VALUES": BinaryRowSerializer.to_bytes(file.value_stats.max_values),
"_NULL_COUNTS": file.value_stats.null_counts,
},
"_MIN_SEQUENCE_NUMBER": file.min_sequence_number,
"_MAX_SEQUENCE_NUMBER": file.max_sequence_number,
"_SCHEMA_ID": file.schema_id,
"_LEVEL": file.level,
"_EXTRA_FILES": file.extra_files,
"_CREATION_TIME": file.creation_time,
"_DELETE_ROW_COUNT": file.delete_row_count,
"_EMBEDDED_FILE_INDEX": file.embedded_index,
"_FILE_SOURCE": file.file_source,
}
for entry in entries:
avro_record = {
"_VERSION": 2,
"_KIND": entry.kind,
"_PARTITION": BinaryRowSerializer.to_bytes(entry.partition),
"_BUCKET": entry.bucket,
"_TOTAL_BUCKETS": entry.bucket,
"_FILE": {
"_FILE_NAME": entry.file.file_name,
"_FILE_SIZE": entry.file.file_size,
"_ROW_COUNT": entry.file.row_count,
"_MIN_KEY": BinaryRowSerializer.to_bytes(entry.file.min_key),
"_MAX_KEY": BinaryRowSerializer.to_bytes(entry.file.max_key),
"_KEY_STATS": {
"_MIN_VALUES": BinaryRowSerializer.to_bytes(entry.file.key_stats.min_values),
"_MAX_VALUES": BinaryRowSerializer.to_bytes(entry.file.key_stats.max_values),
"_NULL_COUNTS": entry.file.key_stats.null_counts,
},
"_VALUE_STATS": {
"_MIN_VALUES": BinaryRowSerializer.to_bytes(entry.file.value_stats.min_values),
"_MAX_VALUES": BinaryRowSerializer.to_bytes(entry.file.value_stats.max_values),
"_NULL_COUNTS": entry.file.value_stats.null_counts,
},
"_MIN_SEQUENCE_NUMBER": entry.file.min_sequence_number,
"_MAX_SEQUENCE_NUMBER": entry.file.max_sequence_number,
"_SCHEMA_ID": entry.file.schema_id,
"_LEVEL": entry.file.level,
"_EXTRA_FILES": entry.file.extra_files,
"_CREATION_TIME": entry.file.creation_time,
"_DELETE_ROW_COUNT": entry.file.delete_row_count,
"_EMBEDDED_FILE_INDEX": entry.file.embedded_index,
"_FILE_SOURCE": entry.file.file_source,
}
avro_records.append(avro_record)
}
avro_records.append(avro_record)

manifest_path = self.manifest_path / file_name
try:
Expand Down
5 changes: 5 additions & 0 deletions paimon-python/pypaimon/read/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
from dataclasses import dataclass
from typing import List

from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.read.split import Split


@dataclass
class Plan:
"""Implementation of Plan for native Python reading."""
_files: List[ManifestEntry]
_splits: List[Split]

def files(self) -> List[ManifestEntry]:
return self._files

def splits(self) -> List[Split]:
return self._splits
17 changes: 13 additions & 4 deletions paimon-python/pypaimon/read/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,25 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int],
def plan(self) -> Plan:
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
if not latest_snapshot:
return Plan([])
return Plan([], [])
manifest_files = self.manifest_list_manager.read_all(latest_snapshot)

file_entries = []
deleted_entries = set()
added_entries = []
# TODO: filter manifest files by predicate
for manifest_file in manifest_files:
manifest_entries = self.manifest_file_manager.read(manifest_file.file_name,
lambda row: self._bucket_filter(row))
for entry in manifest_entries:
if entry.kind == 0:
file_entries.append(entry)
added_entries.append(entry)
else:
deleted_entries.add((tuple(entry.partition.values), entry.bucket, entry.file.file_name))

file_entries = [
entry for entry in added_entries
if (tuple(entry.partition.values), entry.bucket, entry.file.file_name) not in deleted_entries
]

if self.predicate:
file_entries = self._filter_by_predicate(file_entries)
Expand All @@ -100,7 +109,7 @@ def plan(self) -> Plan:

splits = self._apply_push_down_limit(splits)

return Plan(splits)
return Plan(file_entries, splits)

def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan':
self.idx_of_this_subtask = idx_of_this_subtask
Expand Down
81 changes: 81 additions & 0 deletions paimon-python/pypaimon/tests/py36/ao_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,87 @@

class RESTTableReadWritePy36Test(RESTCatalogBaseTest):

def test_overwrite(self):
simple_pa_schema = pa.schema([
('f0', pa.int32()),
('f1', pa.string())
])
schema = Schema.from_pyarrow_schema(simple_pa_schema, partition_keys=['f0'],
options={'dynamic-partition-overwrite': 'false'})
self.rest_catalog.create_table('default.test_overwrite', schema, False)
table = self.rest_catalog.get_table('default.test_overwrite')
read_builder = table.new_read_builder()

# test normal write
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

df0 = pd.DataFrame({
'f0': [1, 2],
'f1': ['apple', 'banana'],
})

table_write.write_pandas(df0)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_df0 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
df0['f0'] = df0['f0'].astype('int32')
pd.testing.assert_frame_equal(
actual_df0.reset_index(drop=True), df0.reset_index(drop=True))

# test partially overwrite
write_builder = table.new_batch_write_builder().overwrite({'f0': 1})
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

df1 = pd.DataFrame({
'f0': [1],
'f1': ['watermelon'],
})

table_write.write_pandas(df1)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_df1 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
expected_df1 = pd.DataFrame({
'f0': [1, 2],
'f1': ['watermelon', 'banana']
})
expected_df1['f0'] = expected_df1['f0'].astype('int32')
pd.testing.assert_frame_equal(
actual_df1.reset_index(drop=True), expected_df1.reset_index(drop=True))

# test fully overwrite
write_builder = table.new_batch_write_builder().overwrite()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

df2 = pd.DataFrame({
'f0': [3],
'f1': ['Neo'],
})

table_write.write_pandas(df2)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_df2 = table_read.to_pandas(table_scan.plan().splits())
df2['f0'] = df2['f0'].astype('int32')
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))

def testParquetAppendOnlyReader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_parquet', schema, False)
Expand Down
81 changes: 79 additions & 2 deletions paimon-python/pypaimon/tests/reader_basic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,85 @@ def tearDownClass(cls):
shutil.rmtree(cls.tempdir, ignore_errors=True)

def test_overwrite(self):
pass
# TODO: support overwrite
simple_pa_schema = pa.schema([
('f0', pa.int32()),
('f1', pa.string())
])
schema = Schema.from_pyarrow_schema(simple_pa_schema, partition_keys=['f0'],
options={'dynamic-partition-overwrite': 'false'})
self.catalog.create_table('default.test_overwrite', schema, False)
table = self.catalog.get_table('default.test_overwrite')
read_builder = table.new_read_builder()

# test normal write
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

df0 = pd.DataFrame({
'f0': [1, 2],
'f1': ['apple', 'banana'],
})

table_write.write_pandas(df0)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_df0 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
df0['f0'] = df0['f0'].astype('int32')
pd.testing.assert_frame_equal(
actual_df0.reset_index(drop=True), df0.reset_index(drop=True))

# test partially overwrite
write_builder = table.new_batch_write_builder().overwrite({'f0': 1})
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

df1 = pd.DataFrame({
'f0': [1],
'f1': ['watermelon'],
})

table_write.write_pandas(df1)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_df1 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
expected_df1 = pd.DataFrame({
'f0': [1, 2],
'f1': ['watermelon', 'banana']
})
expected_df1['f0'] = expected_df1['f0'].astype('int32')
pd.testing.assert_frame_equal(
actual_df1.reset_index(drop=True), expected_df1.reset_index(drop=True))

# test fully overwrite
write_builder = table.new_batch_write_builder().overwrite()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

df2 = pd.DataFrame({
'f0': [3],
'f1': ['Neo'],
})

table_write.write_pandas(df2)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
actual_df2 = table_read.to_pandas(table_scan.plan().splits())
df2['f0'] = df2['f0'].astype('int32')
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))

def testWriteWrongSchema(self):
self.catalog.create_table('default.test_wrong_schema',
Expand Down
Loading