From 436ca18eeeabf620de21d33cbbc51b93e1a18e3f Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Fri, 24 Feb 2023 11:34:10 +0800 Subject: [PATCH 1/2] PARQUET-2251 Avoid generating Bloomfilter when all pages of a column are encoded by dictionary in parquet pageV1 --- .../parquet/hadoop/ParquetFileWriter.java | 3 +- .../parquet/hadoop/TestBloomFiltering.java | 18 ++- .../parquet/hadoop/TestStoreBloomFilter.java | 132 ++++++++++++++++++ 3 files changed, 151 insertions(+), 2 deletions(-) create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestStoreBloomFilter.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 007eef5d1b..ac376ca7da 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -884,7 +884,8 @@ void writeColumnChunk(ColumnDescriptor descriptor, // write bloom filter if one of data pages is not dictionary encoded boolean isWriteBloomFilter = false; for (Encoding encoding : dataEncodings) { - if (encoding != Encoding.RLE_DICTIONARY) { + // dictionary encoding: `PLAIN_DICTIONARY` is used in pageV1, `RLE_DICTIONARY` is used in pageV2 + if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) { isWriteBloomFilter = true; break; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java index 68a4e34e3d..a9680aa657 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java @@ -150,6 +150,22 @@ private static List generateNames(int rowCount) { return list; } + protected static List generateDictionaryData(int rowCount) { + List users = new ArrayList<>(); + List names = new ArrayList<>(); + for (int i = 0; i < rowCount / 5; i++) { + names.add("miller"); + names.add("anderson"); + names.add("thomas"); + names.add("williams"); + names.add("len"); + } + for (int i = 0; i < rowCount; ++i) { + users.add(new PhoneBookWriter.User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount))); + } + return users; + } + private static List generatePhoneNumbers() { int length = RANDOM.nextInt(5) - 1; if (length < 0) { @@ -239,7 +255,7 @@ private void assertCorrectFiltering(Predicate expectedFilt assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result); } - private static FileEncryptionProperties getFileEncryptionProperties() { + protected static FileEncryptionProperties getFileEncryptionProperties() { ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties .builder("id") .withKey(COLUMN_ENCRYPTION_KEY1) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestStoreBloomFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestStoreBloomFilter.java new file mode 100644 index 0000000000..994803e0ef --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestStoreBloomFilter.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop; + +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.apache.parquet.hadoop.TestBloomFiltering.generateDictionaryData; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.EncodingStats; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.filter2.recordlevel.PhoneBookWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.HadoopInputFile; + +@RunWith(Parameterized.class) +public class TestStoreBloomFilter { + private static final Path FILE_V1 = createTempFile("v1"); + private static final Path FILE_V2 = createTempFile("v2"); + private static final List DATA = Collections.unmodifiableList(generateDictionaryData(10000)); + private final Path file; + private final String pageVersion; + + public TestStoreBloomFilter(Path file, String pageVersion) { + this.file = file; + this.pageVersion = pageVersion; + } + + @Parameterized.Parameters(name = "Run {index}: parquet {1}") + public static Collection params() { + return Arrays.asList( + new Object[]{FILE_V1, "pageV1"}, + new Object[]{FILE_V2, "pageV2"}); + } + + private static Path createTempFile(String version) { + try { + return new Path(Files.createTempFile("test-store-bloom-filter-" + version, ".parquet").toAbsolutePath().toString()); + } catch (IOException e) { + throw new AssertionError("Unable to create temporary file", e); + } + } + + private static void deleteFile(Path file) throws IOException { + file.getFileSystem(new Configuration()).delete(file, false); + } + + @BeforeClass + public static void createFiles() throws IOException { + writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0); + writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0); + } + + private static void writePhoneBookToFile(Path file, + ParquetProperties.WriterVersion parquetVersion) throws IOException { + int pageSize = DATA.size() / 100; // Ensure that several pages will be created + int rowGroupSize = pageSize * 4; // Ensure that there are more row-groups created + PhoneBookWriter.write(ExampleParquetWriter.builder(file) + .withWriteMode(OVERWRITE) + .withRowGroupSize(rowGroupSize) + .withPageSize(pageSize) + .withBloomFilterNDV("location.lat", 10000L) + .withBloomFilterNDV("name", 10000L) + .withBloomFilterNDV("id", 10000L) + .withWriterVersion(parquetVersion), + DATA); + } + + @AfterClass + public static void deleteFiles() throws IOException { + deleteFile(FILE_V1); + deleteFile(FILE_V2); + } + + @Test + public void testStoreBloomFilter() throws IOException { + ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(file, new Configuration()), + ParquetReadOptions.builder().build()); + List blocks = reader.getRowGroups(); + blocks.forEach(block -> { + try { + // column `id` isn't fully encoded in dictionary, it will generate `BloomFilter` + ColumnChunkMetaData idMeta = block.getColumns().get(0); + EncodingStats idEncoding = idMeta.getEncodingStats(); + Assert.assertTrue(idEncoding.hasNonDictionaryEncodedPages()); + Assert.assertNotNull(reader.readBloomFilter(idMeta)); + + // column `name` is fully encoded in dictionary, it won't generate `BloomFilter` + ColumnChunkMetaData nameMeta = block.getColumns().get(1); + EncodingStats stats = nameMeta.getEncodingStats(); + Assert.assertFalse(stats.hasNonDictionaryEncodedPages()); + Assert.assertNull(reader.readBloomFilter(nameMeta)); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } +} From 4f63f61c57697b3b62d287949001eb1240301440 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Fri, 24 Feb 2023 14:21:50 +0800 Subject: [PATCH 2/2] update comments --- .../parquet/hadoop/ParquetFileWriter.java | 2 +- .../parquet/hadoop/TestBloomFiltering.java | 2 +- .../parquet/hadoop/TestStoreBloomFilter.java | 68 +++++++++---------- 3 files changed, 36 insertions(+), 36 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index ac376ca7da..5ad106763b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -884,7 +884,7 @@ void writeColumnChunk(ColumnDescriptor descriptor, // write bloom filter if one of data pages is not dictionary encoded boolean isWriteBloomFilter = false; for (Encoding encoding : dataEncodings) { - // dictionary encoding: `PLAIN_DICTIONARY` is used in pageV1, `RLE_DICTIONARY` is used in pageV2 + // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in parquet v2 if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) { isWriteBloomFilter = true; break; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java index a9680aa657..42a284a373 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java @@ -157,7 +157,7 @@ protected static List generateDictionaryData(int rowCount) names.add("miller"); names.add("anderson"); names.add("thomas"); - names.add("williams"); + names.add("chenLiang"); names.add("len"); } for (int i = 0; i < rowCount; ++i) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestStoreBloomFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestStoreBloomFilter.java index 994803e0ef..25aad81620 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestStoreBloomFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestStoreBloomFilter.java @@ -53,30 +53,18 @@ public class TestStoreBloomFilter { private static final Path FILE_V2 = createTempFile("v2"); private static final List DATA = Collections.unmodifiableList(generateDictionaryData(10000)); private final Path file; - private final String pageVersion; + private final String version; - public TestStoreBloomFilter(Path file, String pageVersion) { + public TestStoreBloomFilter(Path file, String version) { this.file = file; - this.pageVersion = pageVersion; + this.version = version; } @Parameterized.Parameters(name = "Run {index}: parquet {1}") public static Collection params() { return Arrays.asList( - new Object[]{FILE_V1, "pageV1"}, - new Object[]{FILE_V2, "pageV2"}); - } - - private static Path createTempFile(String version) { - try { - return new Path(Files.createTempFile("test-store-bloom-filter-" + version, ".parquet").toAbsolutePath().toString()); - } catch (IOException e) { - throw new AssertionError("Unable to create temporary file", e); - } - } - - private static void deleteFile(Path file) throws IOException { - file.getFileSystem(new Configuration()).delete(file, false); + new Object[]{FILE_V1, "v1"}, + new Object[]{FILE_V2, "v2"}); } @BeforeClass @@ -85,21 +73,6 @@ public static void createFiles() throws IOException { writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0); } - private static void writePhoneBookToFile(Path file, - ParquetProperties.WriterVersion parquetVersion) throws IOException { - int pageSize = DATA.size() / 100; // Ensure that several pages will be created - int rowGroupSize = pageSize * 4; // Ensure that there are more row-groups created - PhoneBookWriter.write(ExampleParquetWriter.builder(file) - .withWriteMode(OVERWRITE) - .withRowGroupSize(rowGroupSize) - .withPageSize(pageSize) - .withBloomFilterNDV("location.lat", 10000L) - .withBloomFilterNDV("name", 10000L) - .withBloomFilterNDV("id", 10000L) - .withWriterVersion(parquetVersion), - DATA); - } - @AfterClass public static void deleteFiles() throws IOException { deleteFile(FILE_V1); @@ -121,12 +94,39 @@ public void testStoreBloomFilter() throws IOException { // column `name` is fully encoded in dictionary, it won't generate `BloomFilter` ColumnChunkMetaData nameMeta = block.getColumns().get(1); - EncodingStats stats = nameMeta.getEncodingStats(); - Assert.assertFalse(stats.hasNonDictionaryEncodedPages()); + EncodingStats nameEncoding = nameMeta.getEncodingStats(); + Assert.assertFalse(nameEncoding.hasNonDictionaryEncodedPages()); Assert.assertNull(reader.readBloomFilter(nameMeta)); } catch (IOException e) { e.printStackTrace(); } }); } + + private static Path createTempFile(String version) { + try { + return new Path(Files.createTempFile("test-store-bloom-filter-" + version, ".parquet") + .toAbsolutePath().toString()); + } catch (IOException e) { + throw new AssertionError("Unable to create temporary file", e); + } + } + + private static void deleteFile(Path file) throws IOException { + file.getFileSystem(new Configuration()).delete(file, false); + } + + private static void writePhoneBookToFile(Path file, + ParquetProperties.WriterVersion parquetVersion) throws IOException { + int pageSize = DATA.size() / 100; // Ensure that several pages will be created + int rowGroupSize = pageSize * 4; // Ensure that there are more row-groups created + PhoneBookWriter.write(ExampleParquetWriter.builder(file) + .withWriteMode(OVERWRITE) + .withRowGroupSize(rowGroupSize) + .withPageSize(pageSize) + .withBloomFilterNDV("id", 10000L) + .withBloomFilterNDV("name", 10000L) + .withWriterVersion(parquetVersion), + DATA); + } }