Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions paimon-python/pypaimon/manifest/manifest_file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import fastavro
from io import BytesIO
from typing import List

import fastavro

from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
ManifestEntry)
Expand Down Expand Up @@ -61,11 +60,15 @@ def read(self, manifest_file_name: str, shard_filter=None) -> List[ManifestEntry
null_counts=key_dict['_NULL_COUNTS'],
)
value_dict = dict(file_dict['_VALUE_STATS'])
if file_dict.get('_VALUE_STATS_COLS') is None:
fields = self.table.table_schema.fields
elif not file_dict.get('_VALUE_STATS_COLS'):
fields = []
else:
fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS']]
value_stats = SimpleStats(
min_values=GenericRowDeserializer.from_bytes(value_dict['_MIN_VALUES'],
self.table.table_schema.fields),
max_values=GenericRowDeserializer.from_bytes(value_dict['_MAX_VALUES'],
self.table.table_schema.fields),
min_values=GenericRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], fields),
max_values=GenericRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], fields),
null_counts=value_dict['_NULL_COUNTS'],
)
file_meta = DataFileMeta(
Expand All @@ -85,6 +88,7 @@ def read(self, manifest_file_name: str, shard_filter=None) -> List[ManifestEntry
delete_row_count=file_dict['_DELETE_ROW_COUNT'],
embedded_index=file_dict['_EMBEDDED_FILE_INDEX'],
file_source=file_dict['_FILE_SOURCE'],
value_stats_cols=file_dict.get('_VALUE_STATS_COLS'),
)
entry = ManifestEntry(
kind=record['_KIND'],
Expand Down Expand Up @@ -132,6 +136,7 @@ def write(self, file_name, entries: List[ManifestEntry]):
"_DELETE_ROW_COUNT": entry.file.delete_row_count,
"_EMBEDDED_FILE_INDEX": entry.file.embedded_index,
"_FILE_SOURCE": entry.file.file_source,
"_VALUE_STATS_COLS": entry.file.value_stats_cols,
}
}
avro_records.append(avro_record)
Expand Down
3 changes: 3 additions & 0 deletions paimon-python/pypaimon/manifest/schema/data_file_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,8 @@ def set_file_path(self, table_path: Path, partition: GenericRow, bucket: int):
{"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default": None},
{"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default": None},
{"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None},
{"name": "_VALUE_STATS_COLS",
"type": ["null", {"type": "array", "items": "string"}],
"default": None},
]
}
1 change: 1 addition & 0 deletions paimon-python/pypaimon/table/file_store_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self, file_io: FileIO, identifier: Identifier, table_path: Path,

self.table_schema = table_schema
self.fields = table_schema.fields
self.field_dict = {field.name: field for field in self.fields}
self.primary_keys = table_schema.primary_keys
self.partition_keys = table_schema.partition_keys
self.options = table_schema.options
Expand Down
174 changes: 168 additions & 6 deletions paimon-python/pypaimon/tests/py36/ao_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@

import pandas as pd
import pyarrow as pa
from pypaimon.schema.data_types import DataField, AtomicType

from pypaimon.table.row.row_kind import RowKind

from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer, GenericRowDeserializer

from pypaimon.api.options import Options
from pypaimon.catalog.catalog_context import CatalogContext
from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.catalog.rest.rest_catalog import RESTCatalog
from pypaimon.common.identifier import Identifier
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.schema.data_types import DataField, AtomicType
from pypaimon.schema.schema import Schema
from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer, GenericRowDeserializer
from pypaimon.table.row.row_kind import RowKind
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest

Expand Down Expand Up @@ -394,3 +395,164 @@ def test_to_bytes_with_long_string(self):

self.assertEqual(deserialized_row.values[0], long_string)
self.assertEqual(deserialized_row.row_kind, RowKind.INSERT)

def test_value_stats_cols_logic(self):
"""Test _VALUE_STATS_COLS logic in ManifestFileManager."""
self.rest_catalog.create_database("test_db", False)

# Define schema with multiple fields
pa_schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('price', pa.float64()),
('category', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema)
self.rest_catalog.create_table("test_db.test_value_stats_cols", schema, False)
table = self.rest_catalog.get_table("test_db.test_value_stats_cols")

# Create a ManifestFileManager
manifest_manager = ManifestFileManager(table)

# Test case 1: _VALUE_STATS_COLS is None (should use all table fields)
self._test_value_stats_cols_case(
manifest_manager,
table,
value_stats_cols=None,
expected_fields_count=4, # All 4 fields
test_name="none_case"
)

# Test case 2: _VALUE_STATS_COLS is empty list (should use empty fields)
self._test_value_stats_cols_case(
manifest_manager,
table,
value_stats_cols=[],
expected_fields_count=0, # No fields
test_name="empty_case"
)

# Test case 3: _VALUE_STATS_COLS has specific columns
self._test_value_stats_cols_case(
manifest_manager,
table,
value_stats_cols=['id', 'name'],
expected_fields_count=2, # Only 2 specified fields
test_name="specific_case"
)

def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, expected_fields_count, test_name):
"""Helper method to test a specific _VALUE_STATS_COLS case."""

# Create test data based on expected_fields_count
if expected_fields_count == 0:
# Empty fields case
test_fields = []
min_values = []
max_values = []
null_counts = []
elif expected_fields_count == 2:
# Specific fields case (id, name)
test_fields = [
DataField(0, "id", AtomicType("BIGINT")),
DataField(1, "name", AtomicType("STRING"))
]
min_values = [1, "apple"]
max_values = [100, "zebra"]
null_counts = [0, 0]
else:
# All fields case
test_fields = table.table_schema.fields
min_values = [1, "apple", 10.5, "electronics"]
max_values = [100, "zebra", 999.9, "toys"]
null_counts = [0, 0, 0, 0]

# Create BinaryRows for min/max values
min_binary_row = GenericRow(min_values, test_fields) if test_fields else GenericRow([], [])
max_binary_row = GenericRow(max_values, test_fields) if test_fields else GenericRow([], [])

# Create value_stats
value_stats = SimpleStats(
min_values=min_binary_row,
max_values=max_binary_row,
null_counts=null_counts
)

# Create key_stats (empty for simplicity)
empty_binary_row = GenericRow([], [])
key_stats = SimpleStats(
min_values=empty_binary_row,
max_values=empty_binary_row,
null_counts=[]
)

# Create DataFileMeta with value_stats_cols
file_meta = DataFileMeta(
file_name=f"test-file-{test_name}.parquet",
file_size=1024,
row_count=100,
min_key=empty_binary_row,
max_key=empty_binary_row,
key_stats=key_stats,
value_stats=value_stats,
min_sequence_number=1,
max_sequence_number=100,
schema_id=0,
level=0,
extra_files=[],
creation_time=1234567890,
delete_row_count=0,
embedded_index=None,
file_source=None,
value_stats_cols=value_stats_cols, # This is the key field we're testing
external_path=None
)

# Create ManifestEntry
entry = ManifestEntry(
kind=0, # Normal entry
partition=empty_binary_row,
bucket=0,
total_buckets=1,
file=file_meta
)

# Write the manifest entry
manifest_file_name = f"manifest-test-{test_name}"
manifest_manager.write(manifest_file_name, [entry])

# Read the manifest entry back
entries = manifest_manager.read(manifest_file_name)

# Verify we have exactly one entry
self.assertEqual(len(entries), 1)

# Get the entry
read_entry = entries[0]

# Verify value_stats_cols is preserved correctly
self.assertEqual(read_entry.file.value_stats_cols, value_stats_cols)

# Verify value_stats structure based on the logic
if value_stats_cols is None:
# Should use all table fields - verify we have data for all fields
self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count)
self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count)
self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count)
elif not value_stats_cols: # Empty list
# Should use empty fields - verify we have no field data
self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0)
self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0)
self.assertEqual(len(read_entry.file.value_stats.null_counts), 0)
else:
# Should use specified fields - verify we have data for specified fields only
self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count)
self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count)
self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count)

# Verify the actual values match what we expect
if expected_fields_count > 0:
self.assertEqual(read_entry.file.value_stats.min_values.values, min_values)
self.assertEqual(read_entry.file.value_stats.max_values.values, max_values)

self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)
Loading