From a8954bb5bd51f50fe0e7bbce06a7e31c5ad756f0 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 10 Sep 2025 10:49:28 +0800 Subject: [PATCH 1/7] null_value_stats --- .../pypaimon/tests/py36/ao_read_write_test.py | 106 +++++++++++++++++- .../pypaimon/tests/reader_basic_test.py | 106 +++++++++++++++++- 2 files changed, 208 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py index 01a635b54892..d9ae3f87a63f 100644 --- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py @@ -33,8 +33,11 @@ from pypaimon.schema.schema import Schema from pypaimon.tests.py36.pyarrow_compat import table_sort_by from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest - - +from pypaimon.manifest.manifest_file_manager import ManifestFileManager +from pypaimon.table.row.binary_row import BinaryRow +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.manifest_entry import ManifestEntry class RESTTableReadWritePy36Test(RESTCatalogBaseTest): def test_overwrite(self): @@ -394,3 +397,102 @@ def test_to_bytes_with_long_string(self): self.assertEqual(deserialized_row.values[0], long_string) self.assertEqual(deserialized_row.row_kind, RowKind.INSERT) + + def test_manifest_entry_kind_1(self): + """Test ManifestEntry with _KIND=1, which should create empty BinaryRow for value_stats.""" + # Create a catalog and table + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + catalog.create_database("test_db", False) + + # Define schema + pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('price', pa.float64()) + ]) + schema = Schema.from_pyarrow_schema(pa_schema) + catalog.create_table("test_db.test_table", schema, False) + table = catalog.get_table("test_db.test_table") + + # Create a ManifestFileManager + manifest_manager = ManifestFileManager(table) + + # Create test data with _KIND=1 + # For _KIND=1, value_stats should have empty BinaryRow for min_values and max_values + partition_fields = table.table_schema.get_partition_key_fields() + primary_key_fields = table.table_schema.get_trimmed_primary_key_fields() + all_fields = table.table_schema.fields + + # Create empty BinaryRows for _KIND=1 case + empty_binary_row = BinaryRow([], []) + + # Create value_stats with empty BinaryRows for _KIND=1 + value_stats = SimpleStats( + min_values=empty_binary_row, # Empty for _KIND=1 + max_values=empty_binary_row, # Empty for _KIND=1 + null_counts=[0, 0, 0] # Null counts for each field + ) + + # Create key_stats with actual data + # For this test, we'll use empty rows for key stats as well to keep it simple + key_stats = SimpleStats( + min_values=empty_binary_row, + max_values=empty_binary_row, + null_counts=[0, 0, 0] + ) + + # Create a DataFileMeta + file_meta = DataFileMeta( + file_name="test-file.parquet", + file_size=1024, + row_count=100, + min_key=empty_binary_row, + max_key=empty_binary_row, + key_stats=key_stats, + value_stats=value_stats, + min_sequence_number=1, + max_sequence_number=100, + schema_id=0, + level=0, + extra_files=[], + creation_time=1234567890, + delete_row_count=0, + embedded_index=None, + file_source=None + ) + + # Create a ManifestEntry with _KIND=1 + entry = ManifestEntry( + kind=1, # _KIND=1 + partition=empty_binary_row, + bucket=0, + total_buckets=1, + file=file_meta + ) + + # Write the manifest entry + manifest_file_name = "manifest-test-kind-1" + manifest_manager.write(manifest_file_name, [entry]) + + # Read the manifest entry back + entries = manifest_manager.read(manifest_file_name) + + # Verify we have exactly one entry + self.assertEqual(len(entries), 1) + + # Get the entry + read_entry = entries[0] + + # Verify _KIND is 1 + self.assertEqual(read_entry.kind, 1) + + # Verify value_stats has empty BinaryRows for min_values and max_values when _KIND=1 + self.assertIsInstance(read_entry.file.value_stats.min_values, BinaryRow) + self.assertIsInstance(read_entry.file.value_stats.max_values, BinaryRow) + self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0) + + # Verify null_counts are preserved + self.assertEqual(read_entry.file.value_stats.null_counts, [0, 0, 0]) \ No newline at end of file diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py b/paimon-python/pypaimon/tests/reader_basic_test.py index 8b4ed05dd603..a90f7977c5b2 100644 --- a/paimon-python/pypaimon/tests/reader_basic_test.py +++ b/paimon-python/pypaimon/tests/reader_basic_test.py @@ -31,8 +31,11 @@ from pypaimon.catalog.catalog_factory import CatalogFactory from pypaimon.schema.schema import Schema - - +from pypaimon.manifest.manifest_file_manager import ManifestFileManager +from pypaimon.table.row.binary_row import BinaryRow +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.manifest_entry import ManifestEntry class ReaderBasicTest(unittest.TestCase): @classmethod def setUpClass(cls): @@ -222,3 +225,102 @@ def test_to_bytes_with_long_string(self): self.assertEqual(deserialized_row.values[0], long_string) self.assertEqual(deserialized_row.row_kind, RowKind.INSERT) + + def test_manifest_entry_kind_1(self): + """Test ManifestEntry with _KIND=1, which should create empty BinaryRow for value_stats.""" + # Create a catalog and table + catalog = CatalogFactory.create({ + "warehouse": self.warehouse + }) + catalog.create_database("test_db", False) + + # Define schema + pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('price', pa.float64()) + ]) + schema = Schema.from_pyarrow_schema(pa_schema) + catalog.create_table("test_db.test_table", schema, False) + table = catalog.get_table("test_db.test_table") + + # Create a ManifestFileManager + manifest_manager = ManifestFileManager(table) + + # Create test data with _KIND=1 + # For _KIND=1, value_stats should have empty BinaryRow for min_values and max_values + partition_fields = table.table_schema.get_partition_key_fields() + primary_key_fields = table.table_schema.get_trimmed_primary_key_fields() + all_fields = table.table_schema.fields + + # Create empty BinaryRows for _KIND=1 case + empty_binary_row = BinaryRow([], []) + + # Create value_stats with empty BinaryRows for _KIND=1 + value_stats = SimpleStats( + min_values=empty_binary_row, # Empty for _KIND=1 + max_values=empty_binary_row, # Empty for _KIND=1 + null_counts=[0, 0, 0] # Null counts for each field + ) + + # Create key_stats with actual data + # For this test, we'll use empty rows for key stats as well to keep it simple + key_stats = SimpleStats( + min_values=empty_binary_row, + max_values=empty_binary_row, + null_counts=[0, 0, 0] + ) + + # Create a DataFileMeta + file_meta = DataFileMeta( + file_name="test-file.parquet", + file_size=1024, + row_count=100, + min_key=empty_binary_row, + max_key=empty_binary_row, + key_stats=key_stats, + value_stats=value_stats, + min_sequence_number=1, + max_sequence_number=100, + schema_id=0, + level=0, + extra_files=[], + creation_time=1234567890, + delete_row_count=0, + embedded_index=None, + file_source=None + ) + + # Create a ManifestEntry with _KIND=1 + entry = ManifestEntry( + kind=1, # _KIND=1 + partition=empty_binary_row, + bucket=0, + total_buckets=1, + file=file_meta + ) + + # Write the manifest entry + manifest_file_name = "manifest-test-kind-1" + manifest_manager.write(manifest_file_name, [entry]) + + # Read the manifest entry back + entries = manifest_manager.read(manifest_file_name) + + # Verify we have exactly one entry + self.assertEqual(len(entries), 1) + + # Get the entry + read_entry = entries[0] + + # Verify _KIND is 1 + self.assertEqual(read_entry.kind, 1) + + # Verify value_stats has empty BinaryRows for min_values and max_values when _KIND=1 + self.assertIsInstance(read_entry.file.value_stats.min_values, BinaryRow) + self.assertIsInstance(read_entry.file.value_stats.max_values, BinaryRow) + self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0) + + # Verify null_counts are preserved + self.assertEqual(read_entry.file.value_stats.null_counts, [0, 0, 0]) \ No newline at end of file From 6e8259ebadad3a37f7809f4e9b740bd5e52e627f Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 10 Sep 2025 10:51:42 +0800 Subject: [PATCH 2/7] fix --- paimon-python/pypaimon/tests/py36/ao_read_write_test.py | 4 ---- paimon-python/pypaimon/tests/reader_basic_test.py | 4 ---- 2 files changed, 8 deletions(-) diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py index d9ae3f87a63f..9f19e51d2e91 100644 --- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py @@ -421,10 +421,6 @@ def test_manifest_entry_kind_1(self): # Create test data with _KIND=1 # For _KIND=1, value_stats should have empty BinaryRow for min_values and max_values - partition_fields = table.table_schema.get_partition_key_fields() - primary_key_fields = table.table_schema.get_trimmed_primary_key_fields() - all_fields = table.table_schema.fields - # Create empty BinaryRows for _KIND=1 case empty_binary_row = BinaryRow([], []) diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py b/paimon-python/pypaimon/tests/reader_basic_test.py index a90f7977c5b2..944231e59ce7 100644 --- a/paimon-python/pypaimon/tests/reader_basic_test.py +++ b/paimon-python/pypaimon/tests/reader_basic_test.py @@ -249,10 +249,6 @@ def test_manifest_entry_kind_1(self): # Create test data with _KIND=1 # For _KIND=1, value_stats should have empty BinaryRow for min_values and max_values - partition_fields = table.table_schema.get_partition_key_fields() - primary_key_fields = table.table_schema.get_trimmed_primary_key_fields() - all_fields = table.table_schema.fields - # Create empty BinaryRows for _KIND=1 case empty_binary_row = BinaryRow([], []) From ceeda5c188b2b735ec9d663fb6347f1b6c0d31bc Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 10 Sep 2025 10:52:47 +0800 Subject: [PATCH 3/7] fix --- paimon-python/pypaimon/tests/py36/ao_read_write_test.py | 5 +++-- paimon-python/pypaimon/tests/reader_basic_test.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py index 9f19e51d2e91..45ad27ec078c 100644 --- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py @@ -34,10 +34,11 @@ from pypaimon.tests.py36.pyarrow_compat import table_sort_by from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest from pypaimon.manifest.manifest_file_manager import ManifestFileManager -from pypaimon.table.row.binary_row import BinaryRow from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import ManifestEntry + + class RESTTableReadWritePy36Test(RESTCatalogBaseTest): def test_overwrite(self): @@ -491,4 +492,4 @@ def test_manifest_entry_kind_1(self): self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0) # Verify null_counts are preserved - self.assertEqual(read_entry.file.value_stats.null_counts, [0, 0, 0]) \ No newline at end of file + self.assertEqual(read_entry.file.value_stats.null_counts, [0, 0, 0]) diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py b/paimon-python/pypaimon/tests/reader_basic_test.py index 944231e59ce7..adb9ca97e4ea 100644 --- a/paimon-python/pypaimon/tests/reader_basic_test.py +++ b/paimon-python/pypaimon/tests/reader_basic_test.py @@ -32,10 +32,11 @@ from pypaimon.catalog.catalog_factory import CatalogFactory from pypaimon.schema.schema import Schema from pypaimon.manifest.manifest_file_manager import ManifestFileManager -from pypaimon.table.row.binary_row import BinaryRow from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import ManifestEntry + + class ReaderBasicTest(unittest.TestCase): @classmethod def setUpClass(cls): @@ -319,4 +320,4 @@ def test_manifest_entry_kind_1(self): self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0) # Verify null_counts are preserved - self.assertEqual(read_entry.file.value_stats.null_counts, [0, 0, 0]) \ No newline at end of file + self.assertEqual(read_entry.file.value_stats.null_counts, [0, 0, 0]) From b66eb8387a5d6c4d61c00e89e30689ff63706d33 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 10 Sep 2025 19:55:01 +0800 Subject: [PATCH 4/7] statsCols --- .../pypaimon/manifest/manifest_file_manager.py | 12 ++++++++---- .../pypaimon/manifest/schema/data_file_meta.py | 3 +++ paimon-python/pypaimon/write/writer/data_writer.py | 6 +++--- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index cedc27f284a2..7a944b1bb976 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -61,11 +61,13 @@ def read(self, manifest_file_name: str, shard_filter=None) -> List[ManifestEntry null_counts=key_dict['_NULL_COUNTS'], ) value_dict = dict(file_dict['_VALUE_STATS']) + if file_dict.get('_VALUE_STATS_COLS') is None: + fields = self.table.table_schema.fields + elif not file_dict.get('_VALUE_STATS_COLS'): + fields = [] value_stats = SimpleStats( - min_values=GenericRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], - self.table.table_schema.fields), - max_values=GenericRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], - self.table.table_schema.fields), + min_values=BinaryRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], fields), + max_values=BinaryRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], fields), null_counts=value_dict['_NULL_COUNTS'], ) file_meta = DataFileMeta( @@ -85,6 +87,7 @@ def read(self, manifest_file_name: str, shard_filter=None) -> List[ManifestEntry delete_row_count=file_dict['_DELETE_ROW_COUNT'], embedded_index=file_dict['_EMBEDDED_FILE_INDEX'], file_source=file_dict['_FILE_SOURCE'], + value_stats_cols=file_dict.get('_VALUE_STATS_COLS'), ) entry = ManifestEntry( kind=record['_KIND'], @@ -132,6 +135,7 @@ def write(self, file_name, entries: List[ManifestEntry]): "_DELETE_ROW_COUNT": entry.file.delete_row_count, "_EMBEDDED_FILE_INDEX": entry.file.embedded_index, "_FILE_SOURCE": entry.file.file_source, + "_VALUE_STATS_COLS": entry.file.value_stats_cols, } } avro_records.append(avro_record) diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py b/paimon-python/pypaimon/manifest/schema/data_file_meta.py index a4eddabc55fd..82dfb669186c 100644 --- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py +++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py @@ -84,5 +84,8 @@ def set_file_path(self, table_path: Path, partition: GenericRow, bucket: int): {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default": None}, {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default": None}, {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None}, + {"name": "_VALUE_STATS_COLS", + "type": ["null", {"type": "array", "items": "string"}], + "default": None}, ] } diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index a7eb9cb2b8f4..787dc706248b 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -15,15 +15,14 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +import pyarrow as pa +import pyarrow.compute as pc import uuid from abc import ABC, abstractmethod from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple -import pyarrow as pa -import pyarrow.compute as pc - from pypaimon.common.core_options import CoreOptions from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.simple_stats import SimpleStats @@ -166,6 +165,7 @@ def _write_data_to_file(self, data: pa.RecordBatch): extra_files=[], creation_time=datetime.now(), delete_row_count=0, + value_stats_cols=None, # None means all columns have statistics file_path=str(file_path), )) From 630af845000cc9494ffe52c2c055f44360ff130e Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 10 Sep 2025 19:58:11 +0800 Subject: [PATCH 5/7] fix --- paimon-python/pypaimon/manifest/manifest_file_manager.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 7a944b1bb976..d7a9899cb86a 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -15,11 +15,10 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +import fastavro from io import BytesIO from typing import List -import fastavro - from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA, ManifestEntry) @@ -66,8 +65,8 @@ def read(self, manifest_file_name: str, shard_filter=None) -> List[ManifestEntry elif not file_dict.get('_VALUE_STATS_COLS'): fields = [] value_stats = SimpleStats( - min_values=BinaryRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], fields), - max_values=BinaryRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], fields), + min_values=GenericRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], fields), + max_values=GenericRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], fields), null_counts=value_dict['_NULL_COUNTS'], ) file_meta = DataFileMeta( From dccf3d4f58c828569420b1a52526cda64b1325ad Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 10 Sep 2025 20:45:51 +0800 Subject: [PATCH 6/7] fix --- .../manifest/manifest_file_manager.py | 2 + .../pypaimon/tests/py36/ao_read_write_test.py | 164 ++++++++++++----- .../pypaimon/tests/reader_basic_test.py | 169 +++++++++++------- 3 files changed, 225 insertions(+), 110 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index d7a9899cb86a..2a0c0cc95381 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -64,6 +64,8 @@ def read(self, manifest_file_name: str, shard_filter=None) -> List[ManifestEntry fields = self.table.table_schema.fields elif not file_dict.get('_VALUE_STATS_COLS'): fields = [] + else: + raise RuntimeError("_VALUE_STATS_COLS should be None or empty. We don't support other values now.") value_stats = SimpleStats( min_values=GenericRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], fields), max_values=GenericRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], fields), diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py index 45ad27ec078c..8870384f159b 100644 --- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py @@ -19,24 +19,21 @@ import pandas as pd import pyarrow as pa -from pypaimon.schema.data_types import DataField, AtomicType - -from pypaimon.table.row.row_kind import RowKind - -from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer, GenericRowDeserializer - from pypaimon.api.options import Options from pypaimon.catalog.catalog_context import CatalogContext from pypaimon.catalog.catalog_factory import CatalogFactory from pypaimon.catalog.rest.rest_catalog import RESTCatalog from pypaimon.common.identifier import Identifier -from pypaimon.schema.schema import Schema -from pypaimon.tests.py36.pyarrow_compat import table_sort_by -from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest from pypaimon.manifest.manifest_file_manager import ManifestFileManager -from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.schema.data_types import DataField, AtomicType +from pypaimon.schema.schema import Schema +from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer, GenericRowDeserializer +from pypaimon.table.row.row_kind import RowKind +from pypaimon.tests.py36.pyarrow_compat import table_sort_by +from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest class RESTTableReadWritePy36Test(RESTCatalogBaseTest): @@ -399,50 +396,102 @@ def test_to_bytes_with_long_string(self): self.assertEqual(deserialized_row.values[0], long_string) self.assertEqual(deserialized_row.row_kind, RowKind.INSERT) - def test_manifest_entry_kind_1(self): - """Test ManifestEntry with _KIND=1, which should create empty BinaryRow for value_stats.""" - # Create a catalog and table - catalog = CatalogFactory.create({ - "warehouse": self.warehouse - }) - catalog.create_database("test_db", False) + def test_value_stats_cols_logic(self): + """Test _VALUE_STATS_COLS logic in ManifestFileManager.""" + self.rest_catalog.create_database("test_db", False) - # Define schema + # Define schema with multiple fields pa_schema = pa.schema([ ('id', pa.int64()), ('name', pa.string()), - ('price', pa.float64()) + ('price', pa.float64()), + ('category', pa.string()) ]) schema = Schema.from_pyarrow_schema(pa_schema) - catalog.create_table("test_db.test_table", schema, False) - table = catalog.get_table("test_db.test_table") + self.rest_catalog.create_table("test_db.test_value_stats_cols", schema, False) + table = self.rest_catalog.get_table("test_db.test_value_stats_cols") # Create a ManifestFileManager manifest_manager = ManifestFileManager(table) - # Create test data with _KIND=1 - # For _KIND=1, value_stats should have empty BinaryRow for min_values and max_values - # Create empty BinaryRows for _KIND=1 case - empty_binary_row = BinaryRow([], []) + # Test case 1: _VALUE_STATS_COLS is None (should use all table fields) + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=None, + expected_fields_count=4, # All 4 fields + test_name="none_case" + ) + + # Test case 2: _VALUE_STATS_COLS is empty list (should use empty fields) + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=[], + expected_fields_count=0, # No fields + test_name="empty_case" + ) - # Create value_stats with empty BinaryRows for _KIND=1 + # Test case 3: _VALUE_STATS_COLS has specific columns (should raise RuntimeError) + with self.assertRaises(RuntimeError) as context: + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=['id', 'name'], + expected_fields_count=2, # Only 2 specified fields + test_name="specific_case" + ) + self.assertEqual("_VALUE_STATS_COLS should be None or empty. We don't support other values now.", + str(context.exception)) + + def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, expected_fields_count, test_name): + """Helper method to test a specific _VALUE_STATS_COLS case.""" + + # Create test data based on expected_fields_count + if expected_fields_count == 0: + # Empty fields case + test_fields = [] + min_values = [] + max_values = [] + null_counts = [] + elif expected_fields_count == 2: + # Specific fields case (id, name) + test_fields = [ + DataField(0, "id", AtomicType("BIGINT")), + DataField(1, "name", AtomicType("STRING")) + ] + min_values = [1, "apple"] + max_values = [100, "zebra"] + null_counts = [0, 0] + else: + # All fields case + test_fields = table.table_schema.fields + min_values = [1, "apple", 10.5, "electronics"] + max_values = [100, "zebra", 999.9, "toys"] + null_counts = [0, 0, 0, 0] + + # Create BinaryRows for min/max values + min_binary_row = GenericRow(min_values, test_fields) if test_fields else GenericRow([], []) + max_binary_row = GenericRow(max_values, test_fields) if test_fields else GenericRow([], []) + + # Create value_stats value_stats = SimpleStats( - min_values=empty_binary_row, # Empty for _KIND=1 - max_values=empty_binary_row, # Empty for _KIND=1 - null_counts=[0, 0, 0] # Null counts for each field + min_values=min_binary_row, + max_values=max_binary_row, + null_counts=null_counts ) - # Create key_stats with actual data - # For this test, we'll use empty rows for key stats as well to keep it simple + # Create key_stats (empty for simplicity) + empty_binary_row = GenericRow([], []) key_stats = SimpleStats( min_values=empty_binary_row, max_values=empty_binary_row, - null_counts=[0, 0, 0] + null_counts=[] ) - # Create a DataFileMeta + # Create DataFileMeta with value_stats_cols file_meta = DataFileMeta( - file_name="test-file.parquet", + file_name=f"test-file-{test_name}.parquet", file_size=1024, row_count=100, min_key=empty_binary_row, @@ -457,12 +506,14 @@ def test_manifest_entry_kind_1(self): creation_time=1234567890, delete_row_count=0, embedded_index=None, - file_source=None + file_source=None, + value_stats_cols=value_stats_cols, # This is the key field we're testing + external_path=None ) - # Create a ManifestEntry with _KIND=1 + # Create ManifestEntry entry = ManifestEntry( - kind=1, # _KIND=1 + kind=0, # Normal entry partition=empty_binary_row, bucket=0, total_buckets=1, @@ -470,7 +521,7 @@ def test_manifest_entry_kind_1(self): ) # Write the manifest entry - manifest_file_name = "manifest-test-kind-1" + manifest_file_name = f"manifest-test-{test_name}" manifest_manager.write(manifest_file_name, [entry]) # Read the manifest entry back @@ -482,14 +533,29 @@ def test_manifest_entry_kind_1(self): # Get the entry read_entry = entries[0] - # Verify _KIND is 1 - self.assertEqual(read_entry.kind, 1) - - # Verify value_stats has empty BinaryRows for min_values and max_values when _KIND=1 - self.assertIsInstance(read_entry.file.value_stats.min_values, BinaryRow) - self.assertIsInstance(read_entry.file.value_stats.max_values, BinaryRow) - self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0) - self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0) - - # Verify null_counts are preserved - self.assertEqual(read_entry.file.value_stats.null_counts, [0, 0, 0]) + # Verify value_stats_cols is preserved correctly + self.assertEqual(read_entry.file.value_stats_cols, value_stats_cols) + + # Verify value_stats structure based on the logic + if value_stats_cols is None: + # Should use all table fields - verify we have data for all fields + self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) + elif not value_stats_cols: # Empty list + # Should use empty fields - verify we have no field data + self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0) + self.assertEqual(len(read_entry.file.value_stats.null_counts), 0) + else: + # Should use specified fields - verify we have data for specified fields only + self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) + + # Verify the actual values match what we expect + if expected_fields_count > 0: + self.assertEqual(read_entry.file.value_stats.min_values.values, min_values) + self.assertEqual(read_entry.file.value_stats.max_values.values, max_values) + + self.assertEqual(read_entry.file.value_stats.null_counts, null_counts) diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py b/paimon-python/pypaimon/tests/reader_basic_test.py index adb9ca97e4ea..b3e3015a07c4 100644 --- a/paimon-python/pypaimon/tests/reader_basic_test.py +++ b/paimon-python/pypaimon/tests/reader_basic_test.py @@ -23,9 +23,8 @@ import pandas as pd import pyarrow as pa -from pypaimon.table.row.row_kind import RowKind -from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer, GenericRowDeserializer +from pypaimon.table.row.generic_row import GenericRow from pypaimon.schema.data_types import DataField, AtomicType @@ -202,75 +201,106 @@ def testReaderDuckDB(self): expect = pd.DataFrame(self.raw_data) pd.testing.assert_frame_equal(actual.reset_index(drop=True), expect.reset_index(drop=True)) - def test_to_bytes_with_long_string(self): - """Test serialization of strings longer than 7 bytes which require variable part storage.""" - # Create fields with a long string value - fields = [ - DataField(0, "long_string", AtomicType("STRING")), - ] - - # String longer than 7 bytes will be stored in variable part - long_string = "This is a long string that exceeds 7 bytes" - values = [long_string] - - binary_row = GenericRow(values, fields, RowKind.INSERT) - serialized_bytes = GenericRowSerializer.to_bytes(binary_row) - - # Verify the last 6 bytes are 0 - # This is because the variable part data is rounded to the nearest word (8 bytes) - # The last 6 bytes check is to ensure proper padding - self.assertEqual(serialized_bytes[-6:], b'\x00\x00\x00\x00\x00\x00') - self.assertEqual(serialized_bytes[20:62].decode('utf-8'), long_string) - # Deserialize to verify - deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes, fields) - - self.assertEqual(deserialized_row.values[0], long_string) - self.assertEqual(deserialized_row.row_kind, RowKind.INSERT) - - def test_manifest_entry_kind_1(self): - """Test ManifestEntry with _KIND=1, which should create empty BinaryRow for value_stats.""" + def test_value_stats_cols_logic(self): + """Test _VALUE_STATS_COLS logic in ManifestFileManager.""" # Create a catalog and table catalog = CatalogFactory.create({ "warehouse": self.warehouse }) catalog.create_database("test_db", False) - # Define schema + # Define schema with multiple fields pa_schema = pa.schema([ ('id', pa.int64()), ('name', pa.string()), - ('price', pa.float64()) + ('price', pa.float64()), + ('category', pa.string()) ]) schema = Schema.from_pyarrow_schema(pa_schema) - catalog.create_table("test_db.test_table", schema, False) - table = catalog.get_table("test_db.test_table") + catalog.create_table("test_db.test_value_stats_cols", schema, False) + table = catalog.get_table("test_db.test_value_stats_cols") # Create a ManifestFileManager manifest_manager = ManifestFileManager(table) - # Create test data with _KIND=1 - # For _KIND=1, value_stats should have empty BinaryRow for min_values and max_values - # Create empty BinaryRows for _KIND=1 case - empty_binary_row = BinaryRow([], []) + # Test case 1: _VALUE_STATS_COLS is None (should use all table fields) + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=None, + expected_fields_count=4, # All 4 fields + test_name="none_case" + ) + + # Test case 2: _VALUE_STATS_COLS is empty list (should use empty fields) + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=[], + expected_fields_count=0, # No fields + test_name="empty_case" + ) - # Create value_stats with empty BinaryRows for _KIND=1 + # Test case 3: _VALUE_STATS_COLS has specific columns (should raise RuntimeError) + with self.assertRaises(RuntimeError) as context: + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=['id', 'name'], + expected_fields_count=2, # Only 2 specified fields + test_name="specific_case" + ) + self.assertEqual("_VALUE_STATS_COLS should be None or empty. We don't support other values now.", + str(context.exception)) + + def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, expected_fields_count, test_name): + """Helper method to test a specific _VALUE_STATS_COLS case.""" + + # Create test data based on expected_fields_count + if expected_fields_count == 0: + # Empty fields case + test_fields = [] + min_values = [] + max_values = [] + null_counts = [] + elif expected_fields_count == 2: + # Specific fields case (id, name) + test_fields = [ + DataField(0, "id", AtomicType("BIGINT")), + DataField(1, "name", AtomicType("STRING")) + ] + min_values = [1, "apple"] + max_values = [100, "zebra"] + null_counts = [0, 0] + else: + # All fields case + test_fields = table.table_schema.fields + min_values = [1, "apple", 10.5, "electronics"] + max_values = [100, "zebra", 999.9, "toys"] + null_counts = [0, 0, 0, 0] + + # Create BinaryRows for min/max values + min_binary_row = GenericRow(min_values, test_fields) if test_fields else GenericRow([], []) + max_binary_row = GenericRow(max_values, test_fields) if test_fields else GenericRow([], []) + + # Create value_stats value_stats = SimpleStats( - min_values=empty_binary_row, # Empty for _KIND=1 - max_values=empty_binary_row, # Empty for _KIND=1 - null_counts=[0, 0, 0] # Null counts for each field + min_values=min_binary_row, + max_values=max_binary_row, + null_counts=null_counts ) - # Create key_stats with actual data - # For this test, we'll use empty rows for key stats as well to keep it simple + # Create key_stats (empty for simplicity) + empty_binary_row = GenericRow([], []) key_stats = SimpleStats( min_values=empty_binary_row, max_values=empty_binary_row, - null_counts=[0, 0, 0] + null_counts=[] ) - # Create a DataFileMeta + # Create DataFileMeta with value_stats_cols file_meta = DataFileMeta( - file_name="test-file.parquet", + file_name=f"test-file-{test_name}.parquet", file_size=1024, row_count=100, min_key=empty_binary_row, @@ -285,12 +315,14 @@ def test_manifest_entry_kind_1(self): creation_time=1234567890, delete_row_count=0, embedded_index=None, - file_source=None + file_source=None, + value_stats_cols=value_stats_cols, # This is the key field we're testing + external_path=None ) - # Create a ManifestEntry with _KIND=1 + # Create ManifestEntry entry = ManifestEntry( - kind=1, # _KIND=1 + kind=0, # Normal entry partition=empty_binary_row, bucket=0, total_buckets=1, @@ -298,7 +330,7 @@ def test_manifest_entry_kind_1(self): ) # Write the manifest entry - manifest_file_name = "manifest-test-kind-1" + manifest_file_name = f"manifest-test-{test_name}" manifest_manager.write(manifest_file_name, [entry]) # Read the manifest entry back @@ -310,14 +342,29 @@ def test_manifest_entry_kind_1(self): # Get the entry read_entry = entries[0] - # Verify _KIND is 1 - self.assertEqual(read_entry.kind, 1) - - # Verify value_stats has empty BinaryRows for min_values and max_values when _KIND=1 - self.assertIsInstance(read_entry.file.value_stats.min_values, BinaryRow) - self.assertIsInstance(read_entry.file.value_stats.max_values, BinaryRow) - self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0) - self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0) - - # Verify null_counts are preserved - self.assertEqual(read_entry.file.value_stats.null_counts, [0, 0, 0]) + # Verify value_stats_cols is preserved correctly + self.assertEqual(read_entry.file.value_stats_cols, value_stats_cols) + + # Verify value_stats structure based on the logic + if value_stats_cols is None: + # Should use all table fields - verify we have data for all fields + self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) + elif not value_stats_cols: # Empty list + # Should use empty fields - verify we have no field data + self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0) + self.assertEqual(len(read_entry.file.value_stats.null_counts), 0) + else: + # Should use specified fields - verify we have data for specified fields only + self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count) + self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) + + # Verify the actual values match what we expect + if expected_fields_count > 0: + self.assertEqual(read_entry.file.value_stats.min_values.values, min_values) + self.assertEqual(read_entry.file.value_stats.max_values.values, max_values) + + self.assertEqual(read_entry.file.value_stats.null_counts, null_counts) From f2ece776b595d08aae7306055a3902ea3e349d0d Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 10 Sep 2025 21:58:53 +0800 Subject: [PATCH 7/7] specific_fields --- .../manifest/manifest_file_manager.py | 2 +- .../pypaimon/table/file_store_table.py | 1 + .../pypaimon/tests/py36/ao_read_write_test.py | 19 ++++++++----------- .../pypaimon/tests/reader_basic_test.py | 19 ++++++++----------- 4 files changed, 18 insertions(+), 23 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 2a0c0cc95381..aec8bc7ed0bd 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -65,7 +65,7 @@ def read(self, manifest_file_name: str, shard_filter=None) -> List[ManifestEntry elif not file_dict.get('_VALUE_STATS_COLS'): fields = [] else: - raise RuntimeError("_VALUE_STATS_COLS should be None or empty. We don't support other values now.") + fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS']] value_stats = SimpleStats( min_values=GenericRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], fields), max_values=GenericRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], fields), diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index bcde56a5b28a..e0d7c7e6033a 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -46,6 +46,7 @@ def __init__(self, file_io: FileIO, identifier: Identifier, table_path: Path, self.table_schema = table_schema self.fields = table_schema.fields + self.field_dict = {field.name: field for field in self.fields} self.primary_keys = table_schema.primary_keys self.partition_keys = table_schema.partition_keys self.options = table_schema.options diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py index 8870384f159b..8f3201d056f4 100644 --- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py @@ -432,17 +432,14 @@ def test_value_stats_cols_logic(self): test_name="empty_case" ) - # Test case 3: _VALUE_STATS_COLS has specific columns (should raise RuntimeError) - with self.assertRaises(RuntimeError) as context: - self._test_value_stats_cols_case( - manifest_manager, - table, - value_stats_cols=['id', 'name'], - expected_fields_count=2, # Only 2 specified fields - test_name="specific_case" - ) - self.assertEqual("_VALUE_STATS_COLS should be None or empty. We don't support other values now.", - str(context.exception)) + # Test case 3: _VALUE_STATS_COLS has specific columns + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=['id', 'name'], + expected_fields_count=2, # Only 2 specified fields + test_name="specific_case" + ) def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, expected_fields_count, test_name): """Helper method to test a specific _VALUE_STATS_COLS case.""" diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py b/paimon-python/pypaimon/tests/reader_basic_test.py index b3e3015a07c4..8354ed206ac4 100644 --- a/paimon-python/pypaimon/tests/reader_basic_test.py +++ b/paimon-python/pypaimon/tests/reader_basic_test.py @@ -241,17 +241,14 @@ def test_value_stats_cols_logic(self): test_name="empty_case" ) - # Test case 3: _VALUE_STATS_COLS has specific columns (should raise RuntimeError) - with self.assertRaises(RuntimeError) as context: - self._test_value_stats_cols_case( - manifest_manager, - table, - value_stats_cols=['id', 'name'], - expected_fields_count=2, # Only 2 specified fields - test_name="specific_case" - ) - self.assertEqual("_VALUE_STATS_COLS should be None or empty. We don't support other values now.", - str(context.exception)) + # Test case 3: _VALUE_STATS_COLS has specific columns + self._test_value_stats_cols_case( + manifest_manager, + table, + value_stats_cols=['id', 'name'], + expected_fields_count=2, # Only 2 specified fields + test_name="specific_case" + ) def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, expected_fields_count, test_name): """Helper method to test a specific _VALUE_STATS_COLS case."""