Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions paimon-python/pypaimon/tests/py36/ao_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
limitations under the License.
"""
import logging
from datetime import datetime
from unittest.mock import Mock

import pandas as pd
import pyarrow as pa
Expand All @@ -34,6 +36,7 @@
from pypaimon.table.row.row_kind import RowKind
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest
from pypaimon.write.file_store_commit import FileStoreCommit


class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
Expand Down Expand Up @@ -119,6 +122,125 @@ def test_overwrite(self):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))

def test_mixed_add_and_delete_entries_same_partition(self):
"""Test record_count calculation with mixed ADD/DELETE entries in same partition."""
pa_schema = pa.schema([
('region', pa.string()),
('city', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema)
self.rest_catalog.create_table('default.tb', schema, False)
table = self.rest_catalog.get_table('default.tb')
partition_fields = [
DataField(0, "region", AtomicType("STRING")),
DataField(1, "city", AtomicType("STRING"))
]
partition = GenericRow(['East', 'Boston'], partition_fields)

# Create ADD entry
add_file_meta = Mock(spec=DataFileMeta)
add_file_meta.row_count = 200
add_file_meta.file_size = 2048
add_file_meta.creation_time = datetime.now()

add_entry = ManifestEntry(
kind=0, # ADD
partition=partition,
bucket=0,
total_buckets=1,
file=add_file_meta
)

# Create DELETE entry
delete_file_meta = Mock(spec=DataFileMeta)
delete_file_meta.row_count = 80
delete_file_meta.file_size = 800
delete_file_meta.creation_time = datetime.now()

delete_entry = ManifestEntry(
kind=1, # DELETE
partition=partition,
bucket=0,
total_buckets=1,
file=delete_file_meta
)
file_store_commit = FileStoreCommit(None, table, "")
# Test the method with both entries
statistics = file_store_commit._generate_partition_statistics([add_entry, delete_entry])

# Verify results - should be merged into single partition statistics
self.assertEqual(len(statistics), 1)
stat = statistics[0]

# Net record count: +200 + (-80) = 120
self.assertEqual(stat.record_count, 120)
self.assertEqual(stat.file_count, 0)
self.assertEqual(stat.file_size_in_bytes, 1248)

def test_multiple_partitions_with_different_operations(self):
"""Test record_count calculation across multiple partitions."""
pa_schema = pa.schema([
('region', pa.string()),
('city', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema)
self.rest_catalog.create_table('default.tb1', schema, False)
table = self.rest_catalog.get_table('default.tb1')
partition_fields = [
DataField(0, "region", AtomicType("STRING")),
DataField(1, "city", AtomicType("STRING"))
]
partition1 = GenericRow(['East', 'Boston'], partition_fields)
file_meta1 = Mock(spec=DataFileMeta)
file_meta1.row_count = 150
file_meta1.file_size = 1500
file_meta1.creation_time = datetime.now()

entry1 = ManifestEntry(
kind=0, # ADD
partition=partition1,
bucket=0,
total_buckets=1,
file=file_meta1
)

# Partition 2: South/LA - DELETE operation
partition2 = GenericRow(['South', 'LA'], partition_fields)
file_meta2 = Mock(spec=DataFileMeta)
file_meta2.row_count = 75
file_meta2.file_size = 750
file_meta2.creation_time = datetime.now()

entry2 = ManifestEntry(
kind=1, # DELETE
partition=partition2,
bucket=0,
total_buckets=1,
file=file_meta2
)

file_store_commit = FileStoreCommit(None, table, "")
# Test the method with both entries
statistics = file_store_commit._generate_partition_statistics([entry1, entry2])

# Verify results - should have 2 separate partition statistics
self.assertEqual(len(statistics), 2)

# Sort by partition spec for consistent testing
statistics.sort(key=lambda s: (s.spec.get('region', ''), s.spec.get('city', '')))

# Check North/NY partition (ADD)
north_stat = statistics[0]
self.assertEqual(north_stat.record_count, 150) # Positive for ADD
self.assertEqual(north_stat.file_count, 1)
self.assertEqual(north_stat.file_size_in_bytes, 1500)

# Check South/LA partition (DELETE)
south_stat = statistics[1]
self.assertEqual(south_stat.record_count, -75) # Negative for DELETE
self.assertEqual(south_stat.file_count, -1)
self.assertEqual(south_stat.file_size_in_bytes, -750)

def testParquetAppendOnlyReader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_parquet', schema, False)
Expand Down
122 changes: 122 additions & 0 deletions paimon-python/pypaimon/tests/reader_basic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import shutil
import tempfile
import unittest
from datetime import datetime
from unittest.mock import Mock

import pandas as pd
import pyarrow as pa
Expand All @@ -34,6 +36,7 @@
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.write.file_store_commit import FileStoreCommit


class ReaderBasicTest(unittest.TestCase):
Expand Down Expand Up @@ -156,6 +159,125 @@ def test_overwrite(self):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))

def test_mixed_add_and_delete_entries_same_partition(self):
"""Test record_count calculation with mixed ADD/DELETE entries in same partition."""
pa_schema = pa.schema([
('region', pa.string()),
('city', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema)
self.catalog.create_table('default.tb', schema, False)
table = self.catalog.get_table('default.tb')
partition_fields = [
DataField(0, "region", AtomicType("STRING")),
DataField(1, "city", AtomicType("STRING"))
]
partition = GenericRow(['East', 'Boston'], partition_fields)

# Create ADD entry
add_file_meta = Mock(spec=DataFileMeta)
add_file_meta.row_count = 200
add_file_meta.file_size = 2048
add_file_meta.creation_time = datetime.now()

add_entry = ManifestEntry(
kind=0, # ADD
partition=partition,
bucket=0,
total_buckets=1,
file=add_file_meta
)

# Create DELETE entry
delete_file_meta = Mock(spec=DataFileMeta)
delete_file_meta.row_count = 80
delete_file_meta.file_size = 800
delete_file_meta.creation_time = datetime.now()

delete_entry = ManifestEntry(
kind=1, # DELETE
partition=partition,
bucket=0,
total_buckets=1,
file=delete_file_meta
)
file_store_commit = FileStoreCommit(None, table, "")
# Test the method with both entries
statistics = file_store_commit._generate_partition_statistics([add_entry, delete_entry])

# Verify results - should be merged into single partition statistics
self.assertEqual(len(statistics), 1)
stat = statistics[0]

# Net record count: +200 + (-80) = 120
self.assertEqual(stat.record_count, 120)
self.assertEqual(stat.file_count, 0)
self.assertEqual(stat.file_size_in_bytes, 1248)

def test_multiple_partitions_with_different_operations(self):
"""Test record_count calculation across multiple partitions."""
pa_schema = pa.schema([
('region', pa.string()),
('city', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema)
self.catalog.create_table('default.tb1', schema, False)
table = self.catalog.get_table('default.tb1')
partition_fields = [
DataField(0, "region", AtomicType("STRING")),
DataField(1, "city", AtomicType("STRING"))
]
partition1 = GenericRow(['East', 'Boston'], partition_fields)
file_meta1 = Mock(spec=DataFileMeta)
file_meta1.row_count = 150
file_meta1.file_size = 1500
file_meta1.creation_time = datetime.now()

entry1 = ManifestEntry(
kind=0, # ADD
partition=partition1,
bucket=0,
total_buckets=1,
file=file_meta1
)

# Partition 2: South/LA - DELETE operation
partition2 = GenericRow(['South', 'LA'], partition_fields)
file_meta2 = Mock(spec=DataFileMeta)
file_meta2.row_count = 75
file_meta2.file_size = 750
file_meta2.creation_time = datetime.now()

entry2 = ManifestEntry(
kind=1, # DELETE
partition=partition2,
bucket=0,
total_buckets=1,
file=file_meta2
)

file_store_commit = FileStoreCommit(None, table, "")
# Test the method with both entries
statistics = file_store_commit._generate_partition_statistics([entry1, entry2])

# Verify results - should have 2 separate partition statistics
self.assertEqual(len(statistics), 2)

# Sort by partition spec for consistent testing
statistics.sort(key=lambda s: (s.spec.get('region', ''), s.spec.get('city', '')))

# Check North/NY partition (ADD)
north_stat = statistics[0]
self.assertEqual(north_stat.record_count, 150) # Positive for ADD
self.assertEqual(north_stat.file_count, 1)
self.assertEqual(north_stat.file_size_in_bytes, 1500)

# Check South/LA partition (DELETE)
south_stat = statistics[1]
self.assertEqual(south_stat.record_count, -75) # Negative for DELETE
self.assertEqual(south_stat.file_count, -1)
self.assertEqual(south_stat.file_size_in_bytes, -750)

def testWriteWrongSchema(self):
self.catalog.create_table('default.test_wrong_schema',
Schema.from_pyarrow_schema(self.pa_schema),
Expand Down
6 changes: 3 additions & 3 deletions paimon-python/pypaimon/write/file_store_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ def _generate_partition_statistics(self, commit_entries: List[ManifestEntry]) ->
# Following Java implementation: PartitionEntry.fromDataFile()
file_meta = entry.file
# Extract actual file metadata (following Java DataFileMeta pattern)
record_count = file_meta.row_count
file_size_in_bytes = file_meta.file_size
file_count = 1
record_count = file_meta.row_count if entry.kind == 0 else file_meta.row_count * -1
file_size_in_bytes = file_meta.file_size if entry.kind == 0 else file_meta.file_size * -1
file_count = 1 if entry.kind == 0 else -1

# Convert creation_time to milliseconds (Java uses epoch millis)
if file_meta.creation_time:
Expand Down