diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index cedc27f284a2..aec8bc7ed0bd 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -15,11 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +import fastavro from io import BytesIO from typing import List -import fastavro - from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA, ManifestEntry) @@ -61,11 +60,15 @@ def read(self, manifest_file_name: str, shard_filter=None) -> List[ManifestEntry null_counts=key_dict['_NULL_COUNTS'], ) value_dict = dict(file_dict['_VALUE_STATS']) + if file_dict.get('_VALUE_STATS_COLS') is None: + fields = self.table.table_schema.fields + elif not file_dict.get('_VALUE_STATS_COLS'): + fields = [] + else: + fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS']] value_stats = SimpleStats( - min_values=GenericRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], - self.table.table_schema.fields), - max_values=GenericRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], - self.table.table_schema.fields), + min_values=GenericRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], fields), + max_values=GenericRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], fields), null_counts=value_dict['_NULL_COUNTS'], ) file_meta = DataFileMeta( @@ -85,6 +88,7 @@ def read(self, manifest_file_name: str, shard_filter=None) -> List[ManifestEntry delete_row_count=file_dict['_DELETE_ROW_COUNT'], embedded_index=file_dict['_EMBEDDED_FILE_INDEX'], file_source=file_dict['_FILE_SOURCE'], + value_stats_cols=file_dict.get('_VALUE_STATS_COLS'), ) entry = ManifestEntry( kind=record['_KIND'], @@ -132,6 +136,7 @@ def write(self, file_name, entries: List[ManifestEntry]): "_DELETE_ROW_COUNT": entry.file.delete_row_count, "_EMBEDDED_FILE_INDEX": entry.file.embedded_index, "_FILE_SOURCE": entry.file.file_source, + "_VALUE_STATS_COLS": entry.file.value_stats_cols, } } avro_records.append(avro_record) diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py b/paimon-python/pypaimon/manifest/schema/data_file_meta.py index a4eddabc55fd..82dfb669186c 100644 --- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py +++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py @@ -84,5 +84,8 @@ def set_file_path(self, table_path: Path, partition: GenericRow, bucket: int): {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default": None}, {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default": None}, {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None}, + {"name": "_VALUE_STATS_COLS", + "type": ["null", {"type": "array", "items": "string"}], + "default": None}, ] } diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index bcde56a5b28a..e0d7c7e6033a 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -46,6 +46,7 @@ def __init__(self, file_io: FileIO, identifier: Identifier, table_path: Path, self.table_schema = table_schema self.fields = table_schema.fields + self.field_dict = {field.name: field for field in self.fields} self.primary_keys = table_schema.primary_keys self.partition_keys = table_schema.partition_keys self.options = table_schema.options diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py index 01a635b54892..8f3201d056f4 100644 --- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py @@ -19,18 +19,19 @@ import pandas as pd import pyarrow as pa -from pypaimon.schema.data_types import DataField, AtomicType - -from pypaimon.table.row.row_kind import RowKind - -from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer, GenericRowDeserializer - from pypaimon.api.options import Options from pypaimon.catalog.catalog_context import CatalogContext from pypaimon.catalog.catalog_factory import CatalogFactory from pypaimon.catalog.rest.rest_catalog import RESTCatalog from pypaimon.common.identifier import Identifier +from pypaimon.manifest.manifest_file_manager import ManifestFileManager +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.schema.data_types import DataField, AtomicType from pypaimon.schema.schema import Schema +from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer, GenericRowDeserializer +from pypaimon.table.row.row_kind import RowKind from pypaimon.tests.py36.pyarrow_compat import table_sort_by from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest @@ -394,3 +395,164 @@ def test_to_bytes_with_long_string(self): self.assertEqual(deserialized_row.values[0], long_string) self.assertEqual(deserialized_row.row_kind, RowKind.INSERT) + + def test_value_stats_cols_logic(self): + """Test _VALUE_STATS_COLS logic in ManifestFileManager.""" + self.rest_catalog.create_database("test_db", False) + + # Define schema with multiple fields + pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('price', pa.float64()), + ('category', pa.string()) + ]) + schema = Schema.from_pyarrow_schema(pa_schema) + self.rest_catalog.create_table("test_db.test_value_stats_cols", schema, False) + table = self.rest_catalog.get_table("test_db.test_value_stats_cols") + + # Create a ManifestFileManager + manifest_manager = ManifestFileManager(table) + + # Test case 1: _VALUE_STATS_COLS is None (should use all table fields) + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=None, + expected_fields_count=4, # All 4 fields + test_name="none_case" + ) + + # Test case 2: _VALUE_STATS_COLS is empty list (should use empty fields) + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=[], + expected_fields_count=0, # No fields + test_name="empty_case" + ) + + # Test case 3: _VALUE_STATS_COLS has specific columns + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=['id', 'name'], + expected_fields_count=2, # Only 2 specified fields + test_name="specific_case" + ) + + def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, expected_fields_count, test_name): + """Helper method to test a specific _VALUE_STATS_COLS case.""" + + # Create test data based on expected_fields_count + if expected_fields_count == 0: + # Empty fields case + test_fields = [] + min_values = [] + max_values = [] + null_counts = [] + elif expected_fields_count == 2: + # Specific fields case (id, name) + test_fields = [ + DataField(0, "id", AtomicType("BIGINT")), + DataField(1, "name", AtomicType("STRING")) + ] + min_values = [1, "apple"] + max_values = [100, "zebra"] + null_counts = [0, 0] + else: + # All fields case + test_fields = table.table_schema.fields + min_values = [1, "apple", 10.5, "electronics"] + max_values = [100, "zebra", 999.9, "toys"] + null_counts = [0, 0, 0, 0] + + # Create BinaryRows for min/max values + min_binary_row = GenericRow(min_values, test_fields) if test_fields else GenericRow([], []) + max_binary_row = GenericRow(max_values, test_fields) if test_fields else GenericRow([], []) + + # Create value_stats + value_stats = SimpleStats( + min_values=min_binary_row, + max_values=max_binary_row, + null_counts=null_counts + ) + + # Create key_stats (empty for simplicity) + empty_binary_row = GenericRow([], []) + key_stats = SimpleStats( + min_values=empty_binary_row, + max_values=empty_binary_row, + null_counts=[] + ) + + # Create DataFileMeta with value_stats_cols + file_meta = DataFileMeta( + file_name=f"test-file-{test_name}.parquet", + file_size=1024, + row_count=100, + min_key=empty_binary_row, + max_key=empty_binary_row, + key_stats=key_stats, + value_stats=value_stats, + min_sequence_number=1, + max_sequence_number=100, + schema_id=0, + level=0, + extra_files=[], + creation_time=1234567890, + delete_row_count=0, + embedded_index=None, + file_source=None, + value_stats_cols=value_stats_cols, # This is the key field we're testing + external_path=None + ) + + # Create ManifestEntry + entry = ManifestEntry( + kind=0, # Normal entry + partition=empty_binary_row, + bucket=0, + total_buckets=1, + file=file_meta + ) + + # Write the manifest entry + manifest_file_name = f"manifest-test-{test_name}" + manifest_manager.write(manifest_file_name, [entry]) + + # Read the manifest entry back + entries = manifest_manager.read(manifest_file_name) + + # Verify we have exactly one entry + self.assertEqual(len(entries), 1) + + # Get the entry + read_entry = entries[0] + + # Verify value_stats_cols is preserved correctly + self.assertEqual(read_entry.file.value_stats_cols, value_stats_cols) + + # Verify value_stats structure based on the logic + if value_stats_cols is None: + # Should use all table fields - verify we have data for all fields + self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) + elif not value_stats_cols: # Empty list + # Should use empty fields - verify we have no field data + self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0) + self.assertEqual(len(read_entry.file.value_stats.null_counts), 0) + else: + # Should use specified fields - verify we have data for specified fields only + self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) + + # Verify the actual values match what we expect + if expected_fields_count > 0: + self.assertEqual(read_entry.file.value_stats.min_values.values, min_values) + self.assertEqual(read_entry.file.value_stats.max_values.values, max_values) + + self.assertEqual(read_entry.file.value_stats.null_counts, null_counts) diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py b/paimon-python/pypaimon/tests/reader_basic_test.py index 8b4ed05dd603..8354ed206ac4 100644 --- a/paimon-python/pypaimon/tests/reader_basic_test.py +++ b/paimon-python/pypaimon/tests/reader_basic_test.py @@ -23,14 +23,17 @@ import pandas as pd import pyarrow as pa -from pypaimon.table.row.row_kind import RowKind -from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer, GenericRowDeserializer +from pypaimon.table.row.generic_row import GenericRow from pypaimon.schema.data_types import DataField, AtomicType from pypaimon.catalog.catalog_factory import CatalogFactory from pypaimon.schema.schema import Schema +from pypaimon.manifest.manifest_file_manager import ManifestFileManager +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.manifest_entry import ManifestEntry class ReaderBasicTest(unittest.TestCase): @@ -198,27 +201,167 @@ def testReaderDuckDB(self): expect = pd.DataFrame(self.raw_data) pd.testing.assert_frame_equal(actual.reset_index(drop=True), expect.reset_index(drop=True)) - def test_to_bytes_with_long_string(self): - """Test serialization of strings longer than 7 bytes which require variable part storage.""" - # Create fields with a long string value - fields = [ - DataField(0, "long_string", AtomicType("STRING")), - ] - - # String longer than 7 bytes will be stored in variable part - long_string = "This is a long string that exceeds 7 bytes" - values = [long_string] - - binary_row = GenericRow(values, fields, RowKind.INSERT) - serialized_bytes = GenericRowSerializer.to_bytes(binary_row) - - # Verify the last 6 bytes are 0 - # This is because the variable part data is rounded to the nearest word (8 bytes) - # The last 6 bytes check is to ensure proper padding - self.assertEqual(serialized_bytes[-6:], b'\x00\x00\x00\x00\x00\x00') - self.assertEqual(serialized_bytes[20:62].decode('utf-8'), long_string) - # Deserialize to verify - deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes, fields) - - self.assertEqual(deserialized_row.values[0], long_string) - self.assertEqual(deserialized_row.row_kind, RowKind.INSERT) + def test_value_stats_cols_logic(self): + """Test _VALUE_STATS_COLS logic in ManifestFileManager.""" + # Create a catalog and table + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + catalog.create_database("test_db", False) + + # Define schema with multiple fields + pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('price', pa.float64()), + ('category', pa.string()) + ]) + schema = Schema.from_pyarrow_schema(pa_schema) + catalog.create_table("test_db.test_value_stats_cols", schema, False) + table = catalog.get_table("test_db.test_value_stats_cols") + + # Create a ManifestFileManager + manifest_manager = ManifestFileManager(table) + + # Test case 1: _VALUE_STATS_COLS is None (should use all table fields) + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=None, + expected_fields_count=4, # All 4 fields + test_name="none_case" + ) + + # Test case 2: _VALUE_STATS_COLS is empty list (should use empty fields) + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=[], + expected_fields_count=0, # No fields + test_name="empty_case" + ) + + # Test case 3: _VALUE_STATS_COLS has specific columns + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=['id', 'name'], + expected_fields_count=2, # Only 2 specified fields + test_name="specific_case" + ) + + def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, expected_fields_count, test_name): + """Helper method to test a specific _VALUE_STATS_COLS case.""" + + # Create test data based on expected_fields_count + if expected_fields_count == 0: + # Empty fields case + test_fields = [] + min_values = [] + max_values = [] + null_counts = [] + elif expected_fields_count == 2: + # Specific fields case (id, name) + test_fields = [ + DataField(0, "id", AtomicType("BIGINT")), + DataField(1, "name", AtomicType("STRING")) + ] + min_values = [1, "apple"] + max_values = [100, "zebra"] + null_counts = [0, 0] + else: + # All fields case + test_fields = table.table_schema.fields + min_values = [1, "apple", 10.5, "electronics"] + max_values = [100, "zebra", 999.9, "toys"] + null_counts = [0, 0, 0, 0] + + # Create BinaryRows for min/max values + min_binary_row = GenericRow(min_values, test_fields) if test_fields else GenericRow([], []) + max_binary_row = GenericRow(max_values, test_fields) if test_fields else GenericRow([], []) + + # Create value_stats + value_stats = SimpleStats( + min_values=min_binary_row, + max_values=max_binary_row, + null_counts=null_counts + ) + + # Create key_stats (empty for simplicity) + empty_binary_row = GenericRow([], []) + key_stats = SimpleStats( + min_values=empty_binary_row, + max_values=empty_binary_row, + null_counts=[] + ) + + # Create DataFileMeta with value_stats_cols + file_meta = DataFileMeta( + file_name=f"test-file-{test_name}.parquet", + file_size=1024, + row_count=100, + min_key=empty_binary_row, + max_key=empty_binary_row, + key_stats=key_stats, + value_stats=value_stats, + min_sequence_number=1, + max_sequence_number=100, + schema_id=0, + level=0, + extra_files=[], + creation_time=1234567890, + delete_row_count=0, + embedded_index=None, + file_source=None, + value_stats_cols=value_stats_cols, # This is the key field we're testing + external_path=None + ) + + # Create ManifestEntry + entry = ManifestEntry( + kind=0, # Normal entry + partition=empty_binary_row, + bucket=0, + total_buckets=1, + file=file_meta + ) + + # Write the manifest entry + manifest_file_name = f"manifest-test-{test_name}" + manifest_manager.write(manifest_file_name, [entry]) + + # Read the manifest entry back + entries = manifest_manager.read(manifest_file_name) + + # Verify we have exactly one entry + self.assertEqual(len(entries), 1) + + # Get the entry + read_entry = entries[0] + + # Verify value_stats_cols is preserved correctly + self.assertEqual(read_entry.file.value_stats_cols, value_stats_cols) + + # Verify value_stats structure based on the logic + if value_stats_cols is None: + # Should use all table fields - verify we have data for all fields + self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) + elif not value_stats_cols: # Empty list + # Should use empty fields - verify we have no field data + self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0) + self.assertEqual(len(read_entry.file.value_stats.null_counts), 0) + else: + # Should use specified fields - verify we have data for specified fields only + self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) + + # Verify the actual values match what we expect + if expected_fields_count > 0: + self.assertEqual(read_entry.file.value_stats.min_values.values, min_values) + self.assertEqual(read_entry.file.value_stats.max_values.values, max_values) + + self.assertEqual(read_entry.file.value_stats.null_counts, null_counts) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index a7eb9cb2b8f4..787dc706248b 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -15,15 +15,14 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +import pyarrow as pa +import pyarrow.compute as pc import uuid from abc import ABC, abstractmethod from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple -import pyarrow as pa -import pyarrow.compute as pc - from pypaimon.common.core_options import CoreOptions from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.simple_stats import SimpleStats @@ -166,6 +165,7 @@ def _write_data_to_file(self, data: pa.RecordBatch): extra_files=[], creation_time=datetime.now(), delete_row_count=0, + value_stats_cols=None, # None means all columns have statistics file_path=str(file_path), ))