From d72d15c949756a27e5d777ae037dbbe1964a429e Mon Sep 17 00:00:00 2001 From: Mika Ristimaki Date: Mon, 19 Apr 2021 15:08:33 +0300 Subject: [PATCH 1/2] Parquet-2030: Expose page size row check configurations to ParquetWriter.Builder --- .../apache/parquet/hadoop/ParquetWriter.java | 22 +++ .../parquet/hadoop/TestParquetWriter.java | 155 ++++++++++++++++-- 2 files changed, 167 insertions(+), 10 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index e1afaca994..696fec3140 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -615,6 +615,28 @@ public SELF withBloomFilterEnabled(String columnPath, boolean enabled) { return self(); } + /** + * Sets the minimum number of rows to write before a page size check is done. + * + * @param min writes at least `min` rows before invoking a page size check + * @return this builder for method chaining + */ + public SELF withMinRowCountForPageSizeCheck(int min) { + encodingPropsBuilder.withMinRowCountForPageSizeCheck(min); + return self(); + } + + /** + * Sets the maximum number of rows to write before a page size check is done. + * + * @param max makes a page size check after `max` rows have been written + * @return this builder for method chaining + */ + public SELF withMaxRowCountForPageSizeCheck(int max) { + encodingPropsBuilder.withMaxRowCountForPageSizeCheck(max); + return self(); + } + /** * Set a property that will be available to the read path. For writers that use a Hadoop * configuration, this is the recommended way to add configuration values. diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index de53e96264..8f2c09f910 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -37,6 +37,7 @@ import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -44,12 +45,14 @@ import net.openhft.hashing.LongHashFunction; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.example.data.GroupFactory; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.Types; @@ -77,32 +80,35 @@ public class TestParquetWriter { /** * A test OutputFile implementation to validate the scenario of an OutputFile is implemented by an API client. */ - private static class TestOutputFile implements OutputFile { + private class TestOutputFile implements OutputFile { - private final OutputFile outputFile; + private final File file; + private final FileSystem.Statistics stats; - TestOutputFile(Path path, Configuration conf) throws IOException { - outputFile = HadoopOutputFile.fromPath(path, conf); + TestOutputFile() throws IOException { + this.file = temp.newFile(); + this.stats = new FileSystem.Statistics("TestParquetWriter"); } @Override public PositionOutputStream create(long blockSizeHint) throws IOException { - return outputFile.create(blockSizeHint); + return createOrOverwrite(blockSizeHint); } @Override public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { - return outputFile.createOrOverwrite(blockSizeHint); + FileOutputStream fos = new FileOutputStream(file); + return HadoopStreams.wrap(new FSDataOutputStream(fos, this.stats)); } @Override public boolean supportsBlockSize() { - return outputFile.supportsBlockSize(); + return false; } @Override public long defaultBlockSize() { - return outputFile.defaultBlockSize(); + return 0; } } @@ -132,7 +138,7 @@ public void test() throws Exception { for (int modulo : asList(10, 1000)) { for (WriterVersion version : WriterVersion.values()) { Path file = new Path(root, version.name() + "_" + modulo); - ParquetWriter writer = ExampleParquetWriter.builder(new TestOutputFile(file, conf)) + ParquetWriter writer = ExampleParquetWriter.builder(file) .withCompressionCodec(UNCOMPRESSED) .withRowGroupSize(1024) .withPageSize(1024) @@ -279,4 +285,133 @@ public void testParquetFileWithBloomFilter() throws IOException { LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer()))); } } + + @Test + public void testParquetFileNotFlushedWhenRowCountLimitsAreNotExceeded() throws IOException { + MessageType schema = Types + .buildMessage() + .required(BINARY) + .as(stringType()) + .named("str") + .named("msg"); + + TestOutputFile file = new TestOutputFile(); + ParquetWriter writer = getParquetWriterBuilder(schema, file) + .withMinRowCountForPageSizeCheck(4) + .withMaxRowCountForPageSizeCheck(4) + .build(); + + writeRecords(writer, schema); + assertHasNotFlushed(file); + } + + @Test + public void testParquetFileIsFlushedWhenMinRowCountIsExceeded() throws IOException { + MessageType schema = Types + .buildMessage() + .required(BINARY) + .as(stringType()) + .named("str") + .named("msg"); + + TestOutputFile file = new TestOutputFile(); + ParquetWriter writer = getParquetWriterBuilder(schema, file) + .withMinRowCountForPageSizeCheck(3) + .withMaxRowCountForPageSizeCheck(4) + .build(); + + writeRecords(writer, schema); + + assertHasFlushed(file, 3, 1); + } + + @Test + public void testParquetFileIsNotFlushedIfMinRowCountIsNotExceeded() throws IOException { + MessageType schema = Types + .buildMessage() + .required(BINARY) + .as(stringType()) + .named("str") + .named("msg"); + + TestOutputFile file = new TestOutputFile(); + ParquetWriter writer = getParquetWriterBuilder(schema, file) + .withMinRowCountForPageSizeCheck(4) + .withMaxRowCountForPageSizeCheck(2) + .build(); + + writeRecords(writer, schema); + + assertHasNotFlushed(file); + } + + @Test + public void testParquetFileIsFlushedAfterEachRecord() throws IOException { + MessageType schema = Types + .buildMessage() + .required(BINARY) + .as(stringType()) + .named("str") + .named("msg"); + + TestOutputFile file = new TestOutputFile(); + ParquetWriter writer = getParquetWriterBuilder(schema, file) + .withMinRowCountForPageSizeCheck(1) + .withMaxRowCountForPageSizeCheck(4) + .build(); + + writeRecords(writer, schema); + + assertHasFlushed(file, 3, 3); + } + + @Test + public void testParquetFileNotFlushingAllRows() throws IOException { + MessageType schema = Types + .buildMessage() + .required(BINARY) + .as(stringType()) + .named("str") + .named("msg"); + + TestOutputFile file = new TestOutputFile(); + ParquetWriter writer = getParquetWriterBuilder(schema, file) + .withMinRowCountForPageSizeCheck(2) + .withMaxRowCountForPageSizeCheck(3) + .build(); + + writeRecords(writer, schema); + + assertHasFlushed(file, 2, 1); + } + + private ExampleParquetWriter.Builder getParquetWriterBuilder(MessageType schema, + TestOutputFile file) throws IOException { + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + return ExampleParquetWriter.builder(file) + .withConf(conf) + // Set row group size to 1, to make sure we flush every time + // minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded + .withRowGroupSize(1); + } + + private void writeRecords(ParquetWriter writer, MessageType schema) throws IOException { + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + writer.write(factory.newGroup().append("str", "foo")); + writer.write(factory.newGroup().append("str", "bar")); + writer.write(factory.newGroup().append("str", "baz")); + } + + private void assertHasNotFlushed(TestOutputFile file) { + int emptyFileLength = ParquetFileWriter.MAGIC.length; + assertEquals(emptyFileLength, file.stats.getBytesWritten()); + } + + private void assertHasFlushed(TestOutputFile file, int numWrites, int numFlushes) { + int emptyFileLength = ParquetFileWriter.MAGIC.length; + // Each write in "writerRecords" writes 7B, and every flush writers 23B + assertEquals(emptyFileLength + 7L * numWrites + 23L * numFlushes, + file.stats.getBytesWritten()); + } } From 1aa15e64d5cfc6ff0c8fa809fbcbcf1d9ddbf5d6 Mon Sep 17 00:00:00 2001 From: Mika Ristimaki Date: Wed, 21 Apr 2021 11:24:08 +0300 Subject: [PATCH 2/2] Parquet-2030: Improve unit tests for page size row checks --- .../parquet/hadoop/TestParquetWriter.java | 162 ++++-------------- 1 file changed, 37 insertions(+), 125 deletions(-) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 8f2c09f910..9e9b735f32 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -37,7 +37,6 @@ import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -45,14 +44,13 @@ import net.openhft.hashing.LongHashFunction; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.example.data.GroupFactory; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.hadoop.util.HadoopStreams; +import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.Types; @@ -80,35 +78,32 @@ public class TestParquetWriter { /** * A test OutputFile implementation to validate the scenario of an OutputFile is implemented by an API client. */ - private class TestOutputFile implements OutputFile { + private static class TestOutputFile implements OutputFile { - private final File file; - private final FileSystem.Statistics stats; + private final OutputFile outputFile; - TestOutputFile() throws IOException { - this.file = temp.newFile(); - this.stats = new FileSystem.Statistics("TestParquetWriter"); + TestOutputFile(Path path, Configuration conf) throws IOException { + outputFile = HadoopOutputFile.fromPath(path, conf); } @Override public PositionOutputStream create(long blockSizeHint) throws IOException { - return createOrOverwrite(blockSizeHint); + return outputFile.create(blockSizeHint); } @Override public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { - FileOutputStream fos = new FileOutputStream(file); - return HadoopStreams.wrap(new FSDataOutputStream(fos, this.stats)); + return outputFile.createOrOverwrite(blockSizeHint); } @Override public boolean supportsBlockSize() { - return false; + return outputFile.supportsBlockSize(); } @Override public long defaultBlockSize() { - return 0; + return outputFile.defaultBlockSize(); } } @@ -138,7 +133,7 @@ public void test() throws Exception { for (int modulo : asList(10, 1000)) { for (WriterVersion version : WriterVersion.values()) { Path file = new Path(root, version.name() + "_" + modulo); - ParquetWriter writer = ExampleParquetWriter.builder(file) + ParquetWriter writer = ExampleParquetWriter.builder(new TestOutputFile(file, conf)) .withCompressionCodec(UNCOMPRESSED) .withRowGroupSize(1024) .withPageSize(1024) @@ -287,46 +282,17 @@ public void testParquetFileWithBloomFilter() throws IOException { } @Test - public void testParquetFileNotFlushedWhenRowCountLimitsAreNotExceeded() throws IOException { - MessageType schema = Types - .buildMessage() - .required(BINARY) - .as(stringType()) - .named("str") - .named("msg"); - - TestOutputFile file = new TestOutputFile(); - ParquetWriter writer = getParquetWriterBuilder(schema, file) - .withMinRowCountForPageSizeCheck(4) - .withMaxRowCountForPageSizeCheck(4) - .build(); - - writeRecords(writer, schema); - assertHasNotFlushed(file); - } - - @Test - public void testParquetFileIsFlushedWhenMinRowCountIsExceeded() throws IOException { - MessageType schema = Types - .buildMessage() - .required(BINARY) - .as(stringType()) - .named("str") - .named("msg"); - - TestOutputFile file = new TestOutputFile(); - ParquetWriter writer = getParquetWriterBuilder(schema, file) - .withMinRowCountForPageSizeCheck(3) - .withMaxRowCountForPageSizeCheck(4) - .build(); - - writeRecords(writer, schema); + public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException { + testParquetFileNumberOfBlocks(ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK, + ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK, + 1); + testParquetFileNumberOfBlocks(1, 1, 3); - assertHasFlushed(file, 3, 1); } - @Test - public void testParquetFileIsNotFlushedIfMinRowCountIsNotExceeded() throws IOException { + private void testParquetFileNumberOfBlocks(int minRowCountForPageSizeCheck, + int maxRowCountForPageSizeCheck, + int expectedNumberOfBlocks) throws IOException { MessageType schema = Types .buildMessage() .required(BINARY) @@ -334,84 +300,30 @@ public void testParquetFileIsNotFlushedIfMinRowCountIsNotExceeded() throws IOExc .named("str") .named("msg"); - TestOutputFile file = new TestOutputFile(); - ParquetWriter writer = getParquetWriterBuilder(schema, file) - .withMinRowCountForPageSizeCheck(4) - .withMaxRowCountForPageSizeCheck(2) - .build(); - - writeRecords(writer, schema); - - assertHasNotFlushed(file); - } - - @Test - public void testParquetFileIsFlushedAfterEachRecord() throws IOException { - MessageType schema = Types - .buildMessage() - .required(BINARY) - .as(stringType()) - .named("str") - .named("msg"); - - TestOutputFile file = new TestOutputFile(); - ParquetWriter writer = getParquetWriterBuilder(schema, file) - .withMinRowCountForPageSizeCheck(1) - .withMaxRowCountForPageSizeCheck(4) - .build(); - - writeRecords(writer, schema); - - assertHasFlushed(file, 3, 3); - } - - @Test - public void testParquetFileNotFlushingAllRows() throws IOException { - MessageType schema = Types - .buildMessage() - .required(BINARY) - .as(stringType()) - .named("str") - .named("msg"); - - TestOutputFile file = new TestOutputFile(); - ParquetWriter writer = getParquetWriterBuilder(schema, file) - .withMinRowCountForPageSizeCheck(2) - .withMaxRowCountForPageSizeCheck(3) - .build(); - - writeRecords(writer, schema); - - assertHasFlushed(file, 2, 1); - } - - private ExampleParquetWriter.Builder getParquetWriterBuilder(MessageType schema, - TestOutputFile file) throws IOException { Configuration conf = new Configuration(); GroupWriteSupport.setSchema(schema, conf); - return ExampleParquetWriter.builder(file) + + File file = temp.newFile(); + temp.delete(); + Path path = new Path(file.getAbsolutePath()); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) .withConf(conf) // Set row group size to 1, to make sure we flush every time // minRowCountForPageSizeCheck or maxRowCountForPageSizeCheck is exceeded - .withRowGroupSize(1); - } - - private void writeRecords(ParquetWriter writer, MessageType schema) throws IOException { - SimpleGroupFactory factory = new SimpleGroupFactory(schema); - writer.write(factory.newGroup().append("str", "foo")); - writer.write(factory.newGroup().append("str", "bar")); - writer.write(factory.newGroup().append("str", "baz")); - } + .withRowGroupSize(1) + .withMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck) + .withMaxRowCountForPageSizeCheck(maxRowCountForPageSizeCheck) + .build()) { - private void assertHasNotFlushed(TestOutputFile file) { - int emptyFileLength = ParquetFileWriter.MAGIC.length; - assertEquals(emptyFileLength, file.stats.getBytesWritten()); - } + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + writer.write(factory.newGroup().append("str", "foo")); + writer.write(factory.newGroup().append("str", "bar")); + writer.write(factory.newGroup().append("str", "baz")); + } - private void assertHasFlushed(TestOutputFile file, int numWrites, int numFlushes) { - int emptyFileLength = ParquetFileWriter.MAGIC.length; - // Each write in "writerRecords" writes 7B, and every flush writers 23B - assertEquals(emptyFileLength + 7L * numWrites + 23L * numFlushes, - file.stats.getBytesWritten()); + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) { + ParquetMetadata footer = reader.getFooter(); + assertEquals(expectedNumberOfBlocks, footer.getBlocks().size()); + } } }