From 7acde65bbca5ffe4568df1e7249206e797d3b305 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 8 Apr 2025 12:58:44 +0000 Subject: [PATCH] Merge pull request #78764 from ucasfl/fix-iceberg Fix reading iceberg failed when min-max value is null --- .../DataLakes/Iceberg/ManifestFile.cpp | 11 +- .../integration/test_storage_iceberg/test.py | 139 ++++++++++++++++++ 2 files changed, 148 insertions(+), 2 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp index 4d9a4c817597..3f934cd2bf5b 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp @@ -273,8 +273,15 @@ ManifestFileContent::ManifestFileContent( for (const auto & [column_id, bounds] : value_for_bounds) { DB::NameAndTypePair name_and_type = schema_processor.getFieldCharacteristics(schema_id, column_id); - auto left = deserializeFieldFromBinaryRepr(bounds.first.safeGet(), name_and_type.type, true); - auto right = deserializeFieldFromBinaryRepr(bounds.second.safeGet(), name_and_type.type, false); + + String left_str; + String right_str; + /// lower_bound and upper_bound may be NULL. + if (!bounds.first.tryGet(left_str) || !bounds.second.tryGet(right_str)) + continue; + + auto left = deserializeFieldFromBinaryRepr(left_str, name_and_type.type, true); + auto right = deserializeFieldFromBinaryRepr(right_str, name_and_type.type, false); if (!left || !right) continue; diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 335addfd7149..8f960d96ed91 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3016,3 +3016,142 @@ def test_explicit_metadata_file(started_cluster, storage_type): create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="metadata/v11.metadata.json") assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 + + +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_minmax_pruning_with_null(started_cluster, storage_type): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_minmax_pruning_with_null" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str): + spark.sql(query) + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + return + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + date DATE, + ts TIMESTAMP, + time_struct struct, + name VARCHAR(50), + number BIGINT + ) + USING iceberg + OPTIONS('format-version'='2') + """ + ) + + # min-max value of time_struct in manifest file is null. + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, DATE '2024-01-20', + TIMESTAMP '2024-02-20 10:00:00', null, 'vasya', 5) + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (2, DATE '2024-02-20', + TIMESTAMP '2024-03-20 15:00:00', null, 'vasilisa', 6) + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (3, DATE '2025-03-20', + TIMESTAMP '2024-04-30 14:00:00', null, 'icebreaker', 7) + """ + ) + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (4, DATE '2025-04-20', + TIMESTAMP '2024-05-30 14:00:00', null, 'iceberg', 8) + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, DATE '2024-01-20', + TIMESTAMP '2024-02-20 10:00:00', named_struct('a', DATE '2024-02-20', 'b', TIMESTAMP '2024-02-20 10:00:00'), 'vasya', 5) + """ + ) + + creation_expression = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True + ) + + def check_validity_and_get_prunned_files(select_expression): + query_id1 = f"{TABLE_NAME}-{uuid.uuid4()}" + query_id2 = f"{TABLE_NAME}-{uuid.uuid4()}" + + data1 = instance.query( + select_expression, + query_id=query_id1, + settings={"use_iceberg_partition_pruning": 0, "input_format_parquet_bloom_filter_push_down": 0, "input_format_parquet_filter_push_down": 0}, + ) + data1 = list( + map( + lambda x: x.split("\t"), + filter(lambda x: len(x) > 0, data1.strip().split("\n")), + ) + ) + + data2 = instance.query( + select_expression, + query_id=query_id2, + settings={"use_iceberg_partition_pruning": 1, "input_format_parquet_bloom_filter_push_down": 0, "input_format_parquet_filter_push_down": 0}, + ) + data2 = list( + map( + lambda x: x.split("\t"), + filter(lambda x: len(x) > 0, data2.strip().split("\n")), + ) + ) + + assert data1 == data2 + + instance.query("SYSTEM FLUSH LOGS") + + print( + "Unprunned: ", + instance.query( + f"SELECT ProfileEvents['IcebergMinMaxIndexPrunnedFiles'] FROM system.query_log WHERE query_id = '{query_id1}' AND type = 'QueryFinish'" + ), + ) + print( + "Prunned: ", + instance.query( + f"SELECT ProfileEvents['IcebergMinMaxIndexPrunnedFiles'] FROM system.query_log WHERE query_id = '{query_id2}' AND type = 'QueryFinish'" + ), + ) + + assert 0 == int( + instance.query( + f"SELECT ProfileEvents['IcebergMinMaxIndexPrunnedFiles'] FROM system.query_log WHERE query_id = '{query_id1}' AND type = 'QueryFinish'" + ) + ) + return int( + instance.query( + f"SELECT ProfileEvents['IcebergMinMaxIndexPrunnedFiles'] FROM system.query_log WHERE query_id = '{query_id2}' AND type = 'QueryFinish'" + ) + ) + + assert ( + check_validity_and_get_prunned_files( + f"SELECT * FROM {creation_expression} WHERE time_struct.a <= '2024-02-01' ORDER BY ALL" + ) + == 1 + )