From f6efbee50dcec1846200a30f7ea2f79894fc61cf Mon Sep 17 00:00:00 2001 From: Vitalii Diravka Date: Tue, 13 Apr 2021 18:04:35 +0300 Subject: [PATCH 1/3] DRILL-7825: Error: SYSTEM ERROR: RuntimeException: Unknown logical type --- exec/java-exec/pom.xml | 4 - .../store/parquet/ParquetRecordWriter.java | 16 +- .../columnreaders/ColumnReaderFactory.java | 4 +- .../parquet2/DrillParquetGroupConverter.java | 27 +- .../ParquetColumnChunkPageWriteStore.java | 316 ---- .../parquet/hadoop/ParquetFileWriter.java | 1633 +++++++++++++++++ .../ParquetSimpleTestFileGenerator.java | 61 +- .../store/parquet/TestParquetComplex.java | 14 + .../parquet/TestParquetLogicalTypes.java | 31 + .../parquet/parquet_test_file_simple.parquet | Bin 0 -> 5048 bytes .../uuid-simple-fixed-length-array.parquet | Bin 0 -> 1328 bytes .../store/parquet/complex/uuid.parquet | Bin 0 -> 38974 bytes exec/jdbc-all/pom.xml | 35 +- pom.xml | 2 +- 14 files changed, 1744 insertions(+), 399 deletions(-) delete mode 100644 exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java create mode 100644 exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java create mode 100644 exec/java-exec/src/test/resources/parquet/parquet_test_file_simple.parquet create mode 100644 exec/java-exec/src/test/resources/parquet/uuid-simple-fixed-length-array.parquet create mode 100644 exec/java-exec/src/test/resources/store/parquet/complex/uuid.parquet diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index f30700c9f3d..a08e17207d1 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -257,10 +257,6 @@ - - org.apache.parquet - parquet-format - org.apache.parquet parquet-common diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index 4fd5064468d..6900c9bc7d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -57,14 +57,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.parquet.bytes.CapacityByteArrayOutputStream; import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.column.impl.ColumnWriteStoreV1; import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory; import org.apache.parquet.hadoop.CodecFactory; -import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore; +import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.io.ColumnIOFactory; @@ -115,7 +114,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; private ColumnWriteStore store; - private ParquetColumnChunkPageWriteStore pageStore; + private ColumnChunkPageWriteStore pageStore; private RecordConsumer consumer; private BatchSchema batchSchema; @@ -251,10 +250,6 @@ private void newSchema() throws IOException { // We don't want this number to be too small either. Ideally, slightly bigger than the page size, // but not bigger than the block buffer int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); - // TODO: Use initialSlabSize from ParquetProperties once drill will be updated to the latest version of Parquet library - int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(64, pageSize, 10); - // TODO: Replace ParquetColumnChunkPageWriteStore with ColumnChunkPageWriteStore from parquet library - // once PARQUET-1006 will be resolved ParquetProperties parquetProperties = ParquetProperties.builder() .withPageSize(pageSize) .withDictionaryEncoding(enableDictionary) @@ -263,10 +258,8 @@ private void newSchema() throws IOException { .withAllocator(new ParquetDirectByteBufferAllocator(oContext)) .withValuesWriterFactory(new DefaultV1ValuesWriterFactory()) .build(); - pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize, - pageSize, parquetProperties.getAllocator(), parquetProperties.getPageWriteChecksumEnabled(), - parquetProperties.getColumnIndexTruncateLength() - ); + pageStore = new ColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, parquetProperties.getAllocator(), + parquetProperties.getColumnIndexTruncateLength(), parquetProperties.getPageWriteChecksumEnabled()); store = new ColumnWriteStoreV1(pageStore, parquetProperties); MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema); consumer = columnIO.getRecordWriter(store); @@ -444,7 +437,6 @@ private void flush(boolean cleanUp) throws IOException { } } finally { store.close(); - pageStore.close(); store = null; pageStore = null; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index fcb61f69f5e..25c99044c56 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -201,13 +201,15 @@ static ColumnReader createFixedColumnReader(ParquetRecordReader recordReader, } else if (convertedType == ConvertedType.INTERVAL) { return new NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, descriptor, columnChunkMetaData, fixedLength, (NullableIntervalVector) v, schemaElement); + } else { + return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(recordReader, descriptor, + columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement); } } else { return getNullableColumnReader(recordReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } } - throw new Exception("Unexpected parquet metadata configuration."); } static VarLengthValuesColumn getReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java index fbe3ae32eda..fdb6e79d0e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.function.BiFunction; import java.util.function.Function; @@ -71,6 +72,7 @@ import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Type.Repetition; @@ -328,23 +330,30 @@ protected PrimitiveConverter getConverterForType(String name, PrimitiveType type } } case FIXED_LEN_BYTE_ARRAY: - switch (type.getOriginalType()) { - case DECIMAL: { + LogicalTypeAnnotation.LogicalTypeAnnotationVisitor typeAnnotationVisitor = new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor() { + @Override + public Optional visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) { ParquetReaderUtility.checkDecimalTypeEnabled(options); - return getVarDecimalConverter(name, type); + return Optional.of(getVarDecimalConverter(name, type)); } - case INTERVAL: { + + @Override + public Optional visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) { IntervalWriter writer = type.isRepetition(Repetition.REPEATED) ? getWriter(name, (m, f) -> m.list(f).interval(), l -> l.list().interval()) - : getWriter(name, (m, f) -> m.interval(f), l -> l.interval()); - return new DrillFixedLengthByteArrayToInterval(writer); + : getWriter(name, MapWriter::interval, ListWriter::interval); + return Optional.of(new DrillFixedLengthByteArrayToInterval(writer)); } - default: { + }; + + LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); + if (logicalTypeAnnotation != null) { + return logicalTypeAnnotation.accept(typeAnnotationVisitor).orElseGet(() -> { VarBinaryWriter writer = type.isRepetition(Repetition.REPEATED) ? getWriter(name, (m, f) -> m.list(f).varBinary(), l -> l.list().varBinary()) - : getWriter(name, (m, f) -> m.varBinary(f), l -> l.varBinary()); + : getWriter(name, MapWriter::varBinary, ListWriter::varBinary); return new DrillFixedBinaryToVarbinaryConverter(writer, type.getTypeLength(), mutator.getManagedBuffer()); - } + }); } default: throw new UnsupportedOperationException("Unsupported type: " + type.getPrimitiveTypeName()); diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java deleted file mode 100644 index b8f707d21c3..00000000000 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.parquet.hadoop; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.zip.CRC32; - -import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.bytes.CapacityByteArrayOutputStream; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.Encoding; -import org.apache.parquet.column.page.DictionaryPage; -import org.apache.parquet.column.page.PageWriteStore; -import org.apache.parquet.column.page.PageWriter; -import org.apache.parquet.column.statistics.Statistics; -import org.apache.parquet.format.converter.ParquetMetadataConverter; -import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; -import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; -import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; -import org.apache.parquet.io.ParquetEncodingException; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.bytes.ByteBufferAllocator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here. - * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators. - * It will be no need in this class once PARQUET-1006 is resolved. - */ -public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeable { - - private static final Logger logger = LoggerFactory.getLogger(ParquetColumnChunkPageWriteStore.class); - - private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - - private final Map writers = new HashMap<>(); - private final MessageType schema; - - public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, - MessageType schema, - int initialSlabSize, - int maxCapacityHint, - ByteBufferAllocator allocator, - boolean pageWriteChecksumEnabled, - int columnIndexTruncateLength) { - this.schema = schema; - for (ColumnDescriptor path : schema.getColumns()) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize, - maxCapacityHint, allocator, pageWriteChecksumEnabled, columnIndexTruncateLength)); - } - } - - @Override - public PageWriter getPageWriter(ColumnDescriptor path) { - return writers.get(path); - } - - /** - * Writes the column chunks in the corresponding row group - * @param writer the parquet file writer - * @throws IOException if the file can not be created - */ - public void flushToFileWriter(ParquetFileWriter writer) throws IOException { - for (ColumnDescriptor path : schema.getColumns()) { - ColumnChunkPageWriter pageWriter = writers.get(path); - pageWriter.writeToFileWriter(writer); - } - } - - @Override - public void close() { - for (ColumnChunkPageWriter pageWriter : writers.values()) { - pageWriter.close(); - } - } - - private static final class ColumnChunkPageWriter implements PageWriter, Closeable { - - private final ColumnDescriptor path; - private final BytesCompressor compressor; - - private final CapacityByteArrayOutputStream buf; - private DictionaryPage dictionaryPage; - - private long uncompressedLength; - private long compressedLength; - private long totalValueCount; - private int pageCount; - - // repetition and definition level encodings are used only for v1 pages and don't change - private Set rlEncodings = new HashSet<>(); - private Set dlEncodings = new HashSet<>(); - private List dataEncodings = new ArrayList<>(); - - private ColumnIndexBuilder columnIndexBuilder; - private OffsetIndexBuilder offsetIndexBuilder; - private Statistics totalStatistics; - - private final CRC32 crc; - boolean pageWriteChecksumEnabled; - - private ColumnChunkPageWriter(ColumnDescriptor path, - BytesCompressor compressor, - int initialSlabSize, - int maxCapacityHint, - ByteBufferAllocator allocator, - boolean pageWriteChecksumEnabled, - int columnIndexTruncateLength) { - this.path = path; - this.compressor = compressor; - this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator); - this.totalStatistics = Statistics.createStats(this.path.getPrimitiveType()); - this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength); - this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); - this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; - this.crc = pageWriteChecksumEnabled ? new CRC32() : null; - } - - @Override - public void writePage(BytesInput bytesInput, int valueCount, Statistics statistics, Encoding rlEncoding, - Encoding dlEncoding, Encoding valuesEncoding) throws IOException { - // Setting the builders to the no-op ones so no column/offset indexes will be written for this column chunk - columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); - offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder(); - - writePage(bytesInput, valueCount, -1, statistics, rlEncoding, dlEncoding, valuesEncoding); - } - - @Override - public void writePage(BytesInput bytes, int valueCount, int rowCount, Statistics statistics, - Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException { - long uncompressedSize = bytes.size(); - if (uncompressedSize > Integer.MAX_VALUE) { - throw new ParquetEncodingException( - "Cannot write page larger than Integer.MAX_VALUE bytes: " + - uncompressedSize); - } - - BytesInput compressedBytes = compressor.compress(bytes); - long compressedSize = compressedBytes.size(); - if (compressedSize > Integer.MAX_VALUE) { - throw new ParquetEncodingException( - "Cannot write compressed page larger than Integer.MAX_VALUE bytes: " - + compressedSize); - } - - if (pageWriteChecksumEnabled) { - crc.reset(); - crc.update(compressedBytes.toByteArray()); - parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, (int) compressedSize, - valueCount, rlEncoding, dlEncoding, valuesEncoding, (int) crc.getValue(), buf); - } else { - parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, (int) compressedSize, - valueCount, rlEncoding, dlEncoding, valuesEncoding, buf); - } - - this.uncompressedLength += uncompressedSize; - this.compressedLength += compressedSize; - this.totalValueCount += valueCount; - this.pageCount += 1; - - addStatistics(statistics); - - offsetIndexBuilder.add(toIntWithCheck(buf.size() + compressedSize), rowCount); - - compressedBytes.writeAllTo(buf); - rlEncodings.add(rlEncoding); - dlEncodings.add(dlEncoding); - dataEncodings.add(valuesEncoding); - } - - private void addStatistics(Statistics statistics) { - // Copying the statistics if it is not initialized yet so we have the correct typed one - if (totalStatistics == null) { - totalStatistics = statistics.copy(); - } else { - totalStatistics.mergeStatistics(statistics); - } - - columnIndexBuilder.add(statistics); - } - - @Override - public void writePageV2(int rowCount, - int nullCount, - int valueCount, - BytesInput repetitionLevels, - BytesInput definitionLevels, - Encoding dataEncoding, - BytesInput data, - Statistics statistics) throws IOException { - int rlByteLength = toIntWithCheck(repetitionLevels.size()); - int dlByteLength = toIntWithCheck(definitionLevels.size()); - int uncompressedSize = toIntWithCheck( - data.size() + repetitionLevels.size() + definitionLevels.size() - ); - BytesInput compressedData = compressor.compress(data); - int compressedSize = toIntWithCheck( - compressedData.size() + repetitionLevels.size() + definitionLevels.size() - ); - parquetMetadataConverter.writeDataPageV2Header( - uncompressedSize, compressedSize, - valueCount, nullCount, rowCount, - statistics, - dataEncoding, - rlByteLength, - dlByteLength, - buf); - this.uncompressedLength += uncompressedSize; - this.compressedLength += compressedSize; - this.totalValueCount += valueCount; - this.pageCount += 1; - - addStatistics(statistics); - - offsetIndexBuilder.add(toIntWithCheck(buf.size() + compressedSize), rowCount); - - repetitionLevels.writeAllTo(buf); - definitionLevels.writeAllTo(buf); - compressedData.writeAllTo(buf); - - dataEncodings.add(dataEncoding); - } - - private int toIntWithCheck(long size) { - if (size > Integer.MAX_VALUE) { - throw new ParquetEncodingException( - "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + - size); - } - return (int)size; - } - - @Override - public long getMemSize() { - return buf.size(); - } - - /** - * Writes a number of pages within corresponding column chunk - * @param writer the parquet file writer - * @throws IOException if the file can not be created - */ - public void writeToFileWriter(ParquetFileWriter writer) throws IOException { - writer.writeColumnChunk(path, totalValueCount, compressor.getCodecName(), - dictionaryPage, BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, - columnIndexBuilder, offsetIndexBuilder, rlEncodings, dlEncodings, dataEncodings); - if (logger.isDebugEnabled()) { - logger.debug( - String.format( - "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", - buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, new HashSet<>(dataEncodings)) - + (dictionaryPage != null ? String.format( - ", dic { %,d entries, %,dB raw, %,dB comp}", - dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) - : "") - ); - } - rlEncodings.clear(); - dlEncodings.clear(); - dataEncodings.clear(); - pageCount = 0; - } - - @Override - public long allocatedSize() { - return buf.getCapacity(); - } - - @Override - public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - if (this.dictionaryPage != null) { - throw new ParquetEncodingException("Only one dictionary page is allowed"); - } - BytesInput dictionaryBytes = dictionaryPage.getBytes(); - int uncompressedSize = (int)dictionaryBytes.size(); - BytesInput compressedBytes = compressor.compress(dictionaryBytes); - this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, - dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding()); - } - - @Override - public String memUsageString(String prefix) { - return buf.memUsageString(prefix + " ColumnChunkPageWriter"); - } - - @Override - public void close() { - buf.close(); - } - } - -} diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java new file mode 100644 index 00000000000..ae23a6ebbaf --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -0,0 +1,1633 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.hadoop; + +import static org.apache.parquet.format.Util.writeFileCryptoMetaData; +import static org.apache.parquet.format.Util.writeFileMetaData; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE; +import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; +import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.zip.CRC32; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.parquet.Preconditions; +import org.apache.parquet.Version; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.EncodingStats; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.ColumnEncryptionProperties; +import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.crypto.InternalColumnEncryptionSetup; +import org.apache.parquet.crypto.InternalFileEncryptor; +import org.apache.parquet.crypto.ModuleCipherFactory; +import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import org.apache.parquet.crypto.ParquetCryptoRuntimeException; +import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.Util; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.GlobalMetaData; +import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.hadoop.util.HadoopStreams; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; +import org.apache.parquet.internal.hadoop.metadata.IndexReference; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.io.PositionOutputStream; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.TypeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Internal implementation of the Parquet file writer as a block container
+ * Note: this is temporary Drill-Parquet class needed to write empty parquet files. Details in + * PARQUET-2026 + */ +public class ParquetFileWriter { + private static final Logger LOG = LoggerFactory.getLogger(ParquetFileWriter.class); + + private final ParquetMetadataConverter metadataConverter; + + public static final String PARQUET_METADATA_FILE = "_metadata"; + public static final String MAGIC_STR = "PAR1"; + public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII); + public static final String EF_MAGIC_STR = "PARE"; + public static final byte[] EFMAGIC = EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII); + public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata"; + public static final int CURRENT_VERSION = 1; + + // File creation modes + public static enum Mode { + CREATE, + OVERWRITE + } + + protected final PositionOutputStream out; + + private final MessageType schema; + private final AlignmentStrategy alignment; + private final int columnIndexTruncateLength; + + // file data + private List blocks = new ArrayList(); + + // The column/offset indexes per blocks per column chunks + private final List> columnIndexes = new ArrayList<>(); + private final List> offsetIndexes = new ArrayList<>(); + + // The Bloom filters + private final List> bloomFilters = new ArrayList<>(); + + // The file encryptor + private final InternalFileEncryptor fileEncryptor; + + // row group data + private BlockMetaData currentBlock; // appended to by endColumn + + // The column/offset indexes for the actual block + private List currentColumnIndexes; + private List currentOffsetIndexes; + + // The Bloom filter for the actual block + private Map currentBloomFilters; + + // row group data set at the start of a row group + private long currentRecordCount; // set in startBlock + + // column chunk data accumulated as pages are written + private EncodingStats.Builder encodingStatsBuilder; + private Set currentEncodings; + private long uncompressedLength; + private long compressedLength; + private Statistics currentStatistics; // accumulated in writePage(s) + private ColumnIndexBuilder columnIndexBuilder; + private OffsetIndexBuilder offsetIndexBuilder; + + // column chunk data set at the start of a column + private CompressionCodecName currentChunkCodec; // set in startColumn + private ColumnPath currentChunkPath; // set in startColumn + private PrimitiveType currentChunkType; // set in startColumn + private long currentChunkValueCount; // set in startColumn + private long currentChunkFirstDataPage; // set in startColumn & page writes + private long currentChunkDictionaryPageOffset; // set in writeDictionaryPage + + // set when end is called + private ParquetMetadata footer = null; + + private final CRC32 crc; + private boolean pageWriteChecksumEnabled; + + /** + * Captures the order in which methods should be called + */ + private enum STATE { + NOT_STARTED { + STATE start() { + return STARTED; + } + }, + STARTED { + STATE startBlock() { + return BLOCK; + } + STATE end() { + return ENDED; + } + }, + BLOCK { + STATE startColumn() { + return COLUMN; + } + STATE endBlock() { + return STARTED; + } + }, + COLUMN { + STATE endColumn() { + return BLOCK; + }; + STATE write() { + return this; + } + }, + ENDED; + + STATE start() throws IOException { return error(); } + STATE startBlock() throws IOException { return error(); } + STATE startColumn() throws IOException { return error(); } + STATE write() throws IOException { return error(); } + STATE endColumn() throws IOException { return error(); } + STATE endBlock() throws IOException { return error(); } + STATE end() throws IOException { return error(); } + + private final STATE error() throws IOException { + throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name()); + } + } + + private STATE state = STATE.NOT_STARTED; + + /** + * @param configuration Hadoop configuration + * @param schema the schema of the data + * @param file the file to write to + * @throws IOException if the file can not be created + * @deprecated will be removed in 2.0.0 + */ + @Deprecated + public ParquetFileWriter(Configuration configuration, MessageType schema, + Path file) throws IOException { + this(HadoopOutputFile.fromPath(file, configuration), + schema, Mode.CREATE, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT); + } + + /** + * @param configuration Hadoop configuration + * @param schema the schema of the data + * @param file the file to write to + * @param mode file creation mode + * @throws IOException if the file can not be created + * @deprecated will be removed in 2.0.0 + */ + @Deprecated + public ParquetFileWriter(Configuration configuration, MessageType schema, + Path file, Mode mode) throws IOException { + this(HadoopOutputFile.fromPath(file, configuration), + schema, mode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT); + } + + /** + * @param configuration Hadoop configuration + * @param schema the schema of the data + * @param file the file to write to + * @param mode file creation mode + * @param rowGroupSize the row group size + * @param maxPaddingSize the maximum padding + * @throws IOException if the file can not be created + * @deprecated will be removed in 2.0.0 + */ + @Deprecated + public ParquetFileWriter(Configuration configuration, MessageType schema, + Path file, Mode mode, long rowGroupSize, + int maxPaddingSize) + throws IOException { + this(HadoopOutputFile.fromPath(file, configuration), + schema, mode, rowGroupSize, maxPaddingSize); + } + + /** + * @param file OutputFile to create or overwrite + * @param schema the schema of the data + * @param mode file creation mode + * @param rowGroupSize the row group size + * @param maxPaddingSize the maximum padding + * @throws IOException if the file can not be created + * @deprecated will be removed in 2.0.0 + */ + @Deprecated + public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, + long rowGroupSize, int maxPaddingSize) + throws IOException { + this(file, schema, mode, rowGroupSize, maxPaddingSize, + ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, + ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, + ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); + } + + /** + * @param file OutputFile to create or overwrite + * @param schema the schema of the data + * @param mode file creation mode + * @param rowGroupSize the row group size + * @param maxPaddingSize the maximum padding + * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to + * @param statisticsTruncateLength the length which the min/max values in row groups tried to be truncated to + * @param pageWriteChecksumEnabled whether to write out page level checksums + * @throws IOException if the file can not be created + */ + public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, + long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength, + int statisticsTruncateLength, boolean pageWriteChecksumEnabled) + throws IOException{ + this(file, schema, mode, rowGroupSize, maxPaddingSize, columnIndexTruncateLength, + statisticsTruncateLength, pageWriteChecksumEnabled, null); + } + + public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, + long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength, + int statisticsTruncateLength, boolean pageWriteChecksumEnabled, + FileEncryptionProperties encryptionProperties) + throws IOException { + TypeUtil.checkValidWriteSchema(schema); + + this.schema = schema; + + long blockSize = rowGroupSize; + if (file.supportsBlockSize()) { + blockSize = Math.max(file.defaultBlockSize(), rowGroupSize); + this.alignment = PaddingAlignment.get(blockSize, rowGroupSize, maxPaddingSize); + } else { + this.alignment = NoAlignment.get(rowGroupSize); + } + + if (mode == Mode.OVERWRITE) { + this.out = file.createOrOverwrite(blockSize); + } else { + this.out = file.create(blockSize); + } + + this.encodingStatsBuilder = new EncodingStats.Builder(); + this.columnIndexTruncateLength = columnIndexTruncateLength; + this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; + this.crc = pageWriteChecksumEnabled ? new CRC32() : null; + + this.metadataConverter = new ParquetMetadataConverter(statisticsTruncateLength); + + if (null == encryptionProperties) { + this.fileEncryptor = null; + } else { + // Verify that every encrypted column is in file schema + Map columnEncryptionProperties = encryptionProperties.getEncryptedColumns(); + if (null != columnEncryptionProperties) { // if null, every column in file schema will be encrypted with footer key + for (Map.Entry entry : columnEncryptionProperties.entrySet()) { + String[] path = entry.getKey().toArray(); + if(!schema.containsPath(path)) { + throw new ParquetCryptoRuntimeException("Encrypted column " + Arrays.toString(path) + " not in file schema"); + } + } + } + this.fileEncryptor = new InternalFileEncryptor(encryptionProperties); + } + } + + /** + * FOR TESTING ONLY. This supports testing block padding behavior on the local FS. + * + * @param configuration Hadoop configuration + * @param schema the schema of the data + * @param file the file to write to + * @param rowAndBlockSize the row group size + * @param maxPaddingSize the maximum padding + * @throws IOException if the file can not be created + */ + ParquetFileWriter(Configuration configuration, MessageType schema, + Path file, long rowAndBlockSize, int maxPaddingSize) + throws IOException { + FileSystem fs = file.getFileSystem(configuration); + this.schema = schema; + this.alignment = PaddingAlignment.get( + rowAndBlockSize, rowAndBlockSize, maxPaddingSize); + this.out = HadoopStreams.wrap( + fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize)); + this.encodingStatsBuilder = new EncodingStats.Builder(); + // no truncation is needed for testing + this.columnIndexTruncateLength = Integer.MAX_VALUE; + this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration); + this.crc = pageWriteChecksumEnabled ? new CRC32() : null; + this.metadataConverter = new ParquetMetadataConverter(ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH); + this.fileEncryptor = null; + } + /** + * start the file + * @throws IOException if there is an error while writing + */ + public void start() throws IOException { + state = state.start(); + LOG.debug("{}: start", out.getPos()); + byte[] magic = MAGIC; + if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) { + magic = EFMAGIC; + } + out.write(magic); + } + + InternalFileEncryptor getEncryptor() { + return fileEncryptor; + } + + /** + * start a block + * @param recordCount the record count in this block + * @throws IOException if there is an error while writing + */ + public void startBlock(long recordCount) throws IOException { + state = state.startBlock(); + LOG.debug("{}: start block", out.getPos()); +// out.write(MAGIC); // TODO: add a magic delimiter + + alignment.alignForRowGroup(out); + + currentBlock = new BlockMetaData(); + currentRecordCount = recordCount; + + currentColumnIndexes = new ArrayList<>(); + currentOffsetIndexes = new ArrayList<>(); + + currentBloomFilters = new HashMap<>(); + } + + /** + * start a column inside a block + * @param descriptor the column descriptor + * @param valueCount the value count in this column + * @param compressionCodecName a compression codec name + * @throws IOException if there is an error while writing + */ + public void startColumn(ColumnDescriptor descriptor, + long valueCount, + CompressionCodecName compressionCodecName) throws IOException { + state = state.startColumn(); + encodingStatsBuilder.clear(); + currentEncodings = new HashSet(); + currentChunkPath = ColumnPath.get(descriptor.getPath()); + currentChunkType = descriptor.getPrimitiveType(); + currentChunkCodec = compressionCodecName; + currentChunkValueCount = valueCount; + currentChunkFirstDataPage = -1; + compressedLength = 0; + uncompressedLength = 0; + // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one + currentStatistics = null; + + columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); + offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + } + + /** + * writes a dictionary page page + * @param dictionaryPage the dictionary page + * @throws IOException if there is an error while writing + */ + public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { + writeDictionaryPage(dictionaryPage, null, null); + } + + public void writeDictionaryPage(DictionaryPage dictionaryPage, + BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws IOException { + state = state.write(); + LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize()); + currentChunkDictionaryPageOffset = out.getPos(); + int uncompressedSize = dictionaryPage.getUncompressedSize(); + int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts + if (pageWriteChecksumEnabled) { + crc.reset(); + crc.update(dictionaryPage.getBytes().toByteArray()); + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + (int) crc.getValue(), + out, + headerBlockEncryptor, + AAD); + } else { + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + out, + headerBlockEncryptor, + AAD); + } + long headerSize = out.getPos() - currentChunkDictionaryPageOffset; + this.uncompressedLength += uncompressedSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize); + dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted + encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding()); + currentEncodings.add(dictionaryPage.getEncoding()); + } + + + /** + * writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @throws IOException if there is an error while writing + */ + @Deprecated + public void writeDataPage( + int valueCount, int uncompressedPageSize, + BytesInput bytes, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) throws IOException { + state = state.write(); + // We are unable to build indexes without rowCount so skip them for this column + offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder(); + columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); + long beforeHeader = out.getPos(); + LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); + int compressedPageSize = (int)bytes.size(); + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + out); + long headerSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedPageSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); + bytes.writeAllTo(out); + encodingStatsBuilder.addDataEncoding(valuesEncoding); + currentEncodings.add(rlEncoding); + currentEncodings.add(dlEncoding); + currentEncodings.add(valuesEncoding); + if (currentChunkFirstDataPage < 0) { + currentChunkFirstDataPage = beforeHeader; + } + } + + /** + * writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param statistics statistics for the page + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @throws IOException if there is an error while writing + * @deprecated this method does not support writing column indexes; Use + * {@link #writeDataPage(int, int, BytesInput, Statistics, long, Encoding, Encoding, Encoding)} instead + */ + @Deprecated + public void writeDataPage( + int valueCount, int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) throws IOException { + // We are unable to build indexes without rowCount so skip them for this column + offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder(); + columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); + innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding); + } + + /** + * Writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param statistics the statistics of the page + * @param rowCount the number of rows in the page + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPage( + int valueCount, int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + long rowCount, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) throws IOException { + long beforeHeader = out.getPos(); + innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding); + + offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount); + } + + private void innerWriteDataPage( + int valueCount, int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) throws IOException { + state = state.write(); + long beforeHeader = out.getPos(); + if (currentChunkFirstDataPage < 0) { + currentChunkFirstDataPage = beforeHeader; + } + LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); + int compressedPageSize = (int) bytes.size(); + if (pageWriteChecksumEnabled) { + crc.reset(); + crc.update(bytes.toByteArray()); + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + (int) crc.getValue(), + out); + } else { + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + out); + } + long headerSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedPageSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); + bytes.writeAllTo(out); + + // Copying the statistics if it is not initialized yet so we have the correct typed one + if (currentStatistics == null) { + currentStatistics = statistics.copy(); + } else { + currentStatistics.mergeStatistics(statistics); + } + + columnIndexBuilder.add(statistics); + + encodingStatsBuilder.addDataEncoding(valuesEncoding); + currentEncodings.add(rlEncoding); + currentEncodings.add(dlEncoding); + currentEncodings.add(valuesEncoding); + } + + /** + * Add a Bloom filter that will be written out. This is only used in unit test. + * + * @param column the column name + * @param bloomFilter the bloom filter of column values + */ + void addBloomFilter(String column, BloomFilter bloomFilter) { + currentBloomFilters.put(column, bloomFilter); + } + + /** + * Writes a single v2 data page + * @param rowCount count of rows + * @param nullCount count of nulls + * @param valueCount count of values + * @param repetitionLevels repetition level bytes + * @param definitionLevels definition level bytes + * @param dataEncoding encoding for data + * @param compressedData compressed data bytes + * @param uncompressedDataSize the size of uncompressed data + * @param statistics the statistics of the page + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPageV2(int rowCount, int nullCount, int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput compressedData, + int uncompressedDataSize, + Statistics statistics) throws IOException { + state = state.write(); + int rlByteLength = toIntWithCheck(repetitionLevels.size()); + int dlByteLength = toIntWithCheck(definitionLevels.size()); + + int compressedSize = toIntWithCheck( + compressedData.size() + repetitionLevels.size() + definitionLevels.size() + ); + + int uncompressedSize = toIntWithCheck( + uncompressedDataSize + repetitionLevels.size() + definitionLevels.size() + ); + + long beforeHeader = out.getPos(); + if (currentChunkFirstDataPage < 0) { + currentChunkFirstDataPage = beforeHeader; + } + + metadataConverter.writeDataPageV2Header( + uncompressedSize, compressedSize, + valueCount, nullCount, rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + out); + + long headersSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedSize + headersSize; + this.compressedLength += compressedSize + headersSize; + + if (currentStatistics == null) { + currentStatistics = statistics.copy(); + } else { + currentStatistics.mergeStatistics(statistics); + } + + columnIndexBuilder.add(statistics); + currentEncodings.add(dataEncoding); + encodingStatsBuilder.addDataEncoding(dataEncoding); + + BytesInput.concat(repetitionLevels, definitionLevels, compressedData) + .writeAllTo(out); + + offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount); + } + + /** + * Writes a column chunk at once + * @param descriptor the descriptor of the column + * @param valueCount the value count in this column + * @param compressionCodecName the name of the compression codec used for compressing the pages + * @param dictionaryPage the dictionary page for this column chunk (might be null) + * @param bytes the encoded pages including page headers to be written as is + * @param uncompressedTotalPageSize total uncompressed size (without page headers) + * @param compressedTotalPageSize total compressed size (without page headers) + * @param totalStats accumulated statistics for the column chunk + * @param columnIndexBuilder the builder object for the column index + * @param offsetIndexBuilder the builder object for the offset index + * @param bloomFilter the bloom filter for this column + * @param rlEncodings the RL encodings used in this column chunk + * @param dlEncodings the DL encodings used in this column chunk + * @param dataEncodings the data encodings used in this column chunk + * @throws IOException if there is an error while writing + */ + void writeColumnChunk(ColumnDescriptor descriptor, + long valueCount, + CompressionCodecName compressionCodecName, + DictionaryPage dictionaryPage, + BytesInput bytes, + long uncompressedTotalPageSize, + long compressedTotalPageSize, + Statistics totalStats, + ColumnIndexBuilder columnIndexBuilder, + OffsetIndexBuilder offsetIndexBuilder, + BloomFilter bloomFilter, + Set rlEncodings, + Set dlEncodings, + List dataEncodings) throws IOException { + writeColumnChunk(descriptor, valueCount, compressionCodecName, dictionaryPage, bytes, + uncompressedTotalPageSize, compressedTotalPageSize, totalStats, columnIndexBuilder, offsetIndexBuilder, + bloomFilter, rlEncodings, dlEncodings, dataEncodings, null, 0, 0, null); + } + + void writeColumnChunk(ColumnDescriptor descriptor, + long valueCount, + CompressionCodecName compressionCodecName, + DictionaryPage dictionaryPage, + BytesInput bytes, + long uncompressedTotalPageSize, + long compressedTotalPageSize, + Statistics totalStats, + ColumnIndexBuilder columnIndexBuilder, + OffsetIndexBuilder offsetIndexBuilder, + BloomFilter bloomFilter, + Set rlEncodings, + Set dlEncodings, + List dataEncodings, + BlockCipher.Encryptor headerBlockEncryptor, + int rowGroupOrdinal, + int columnOrdinal, + byte[] fileAAD) throws IOException { + startColumn(descriptor, valueCount, compressionCodecName); + + state = state.write(); + if (dictionaryPage != null) { + byte[] dictonaryPageHeaderAAD = null; + if (null != headerBlockEncryptor) { + dictonaryPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader, + rowGroupOrdinal, columnOrdinal, -1); + } + writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD); + } + + if (bloomFilter != null) { + // write bloom filter if one of data pages is not dictionary encoded + boolean isWriteBloomFilter = false; + for (Encoding encoding : dataEncodings) { + if (encoding != Encoding.RLE_DICTIONARY) { + isWriteBloomFilter = true; + break; + } + } + if (isWriteBloomFilter) { + currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); + } + } + LOG.debug("{}: write data pages", out.getPos()); + long headersSize = bytes.size() - compressedTotalPageSize; + this.uncompressedLength += uncompressedTotalPageSize + headersSize; + this.compressedLength += compressedTotalPageSize + headersSize; + LOG.debug("{}: write data pages content", out.getPos()); + currentChunkFirstDataPage = out.getPos(); + bytes.writeAllTo(out); + encodingStatsBuilder.addDataEncodings(dataEncodings); + if (rlEncodings.isEmpty()) { + encodingStatsBuilder.withV2Pages(); + } + currentEncodings.addAll(rlEncodings); + currentEncodings.addAll(dlEncodings); + currentEncodings.addAll(dataEncodings); + currentStatistics = totalStats; + + this.columnIndexBuilder = columnIndexBuilder; + this.offsetIndexBuilder = offsetIndexBuilder; + + endColumn(); + } + + /** + * end a column (once all rep, def and data have been written) + * @throws IOException if there is an error while writing + */ + public void endColumn() throws IOException { + state = state.endColumn(); + LOG.debug("{}: end column", out.getPos()); + if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) { + currentColumnIndexes.add(null); + } else { + currentColumnIndexes.add(columnIndexBuilder.build()); + } + currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage)); + currentBlock.addColumn(ColumnChunkMetaData.get( + currentChunkPath, + currentChunkType, + currentChunkCodec, + encodingStatsBuilder.build(), + currentEncodings, + currentStatistics, + currentChunkFirstDataPage, + currentChunkDictionaryPageOffset, + currentChunkValueCount, + compressedLength, + uncompressedLength)); + this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); + this.uncompressedLength = 0; + this.compressedLength = 0; + columnIndexBuilder = null; + offsetIndexBuilder = null; + } + + /** + * ends a block once all column chunks have been written + * @throws IOException if there is an error while writing + */ + public void endBlock() throws IOException { +// if (currentRecordCount == 0) { +// throw new ParquetEncodingException("End block with zero record"); +// } + + state = state.endBlock(); + LOG.debug("{}: end block", out.getPos()); + currentBlock.setRowCount(currentRecordCount); + currentBlock.setOrdinal(blocks.size()); + blocks.add(currentBlock); + columnIndexes.add(currentColumnIndexes); + offsetIndexes.add(currentOffsetIndexes); + bloomFilters.add(currentBloomFilters); + currentColumnIndexes = null; + currentOffsetIndexes = null; + currentBloomFilters = null; + currentBlock = null; + } + + /** + * @param conf a configuration + * @param file a file path to append the contents of to this file + * @throws IOException if there is an error while reading or writing + * @deprecated will be removed in 2.0.0; use {@link #appendFile(InputFile)} instead + */ + @Deprecated + public void appendFile(Configuration conf, Path file) throws IOException { + ParquetFileReader.open(conf, file).appendTo(this); + } + + public void appendFile(InputFile file) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(file)) { + reader.appendTo(this); + } + } + + /** + * @param file a file stream to read from + * @param rowGroups row groups to copy + * @param dropColumns whether to drop columns from the file that are not in this file's schema + * @throws IOException if there is an error while reading or writing + * @deprecated will be removed in 2.0.0; + * use {@link #appendRowGroups(SeekableInputStream,List,boolean)} instead + */ + @Deprecated + public void appendRowGroups(FSDataInputStream file, + List rowGroups, + boolean dropColumns) throws IOException { + appendRowGroups(HadoopStreams.wrap(file), rowGroups, dropColumns); + } + + public void appendRowGroups(SeekableInputStream file, + List rowGroups, + boolean dropColumns) throws IOException { + for (BlockMetaData block : rowGroups) { + appendRowGroup(file, block, dropColumns); + } + } + + /** + * @param from a file stream to read from + * @param rowGroup row group to copy + * @param dropColumns whether to drop columns from the file that are not in this file's schema + * @throws IOException if there is an error while reading or writing + * @deprecated will be removed in 2.0.0; + * use {@link #appendRowGroup(SeekableInputStream,BlockMetaData,boolean)} instead + */ + @Deprecated + public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup, + boolean dropColumns) throws IOException { + appendRowGroup(HadoopStreams.wrap(from), rowGroup, dropColumns); + } + + public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, + boolean dropColumns) throws IOException { + startBlock(rowGroup.getRowCount()); + + Map columnsToCopy = + new HashMap(); + for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { + columnsToCopy.put(chunk.getPath().toDotString(), chunk); + } + + List columnsInOrder = + new ArrayList(); + + for (ColumnDescriptor descriptor : schema.getColumns()) { + String path = ColumnPath.get(descriptor.getPath()).toDotString(); + ColumnChunkMetaData chunk = columnsToCopy.remove(path); + if (chunk != null) { + columnsInOrder.add(chunk); + } else { + throw new IllegalArgumentException(String.format( + "Missing column '%s', cannot copy row group: %s", path, rowGroup)); + } + } + + // complain if some columns would be dropped and that's not okay + if (!dropColumns && !columnsToCopy.isEmpty()) { + throw new IllegalArgumentException(String.format( + "Columns cannot be copied (missing from target schema): %s", + String.join(", ", columnsToCopy.keySet()))); + } + + // copy the data for all chunks + long start = -1; + long length = 0; + long blockUncompressedSize = 0L; + for (int i = 0; i < columnsInOrder.size(); i += 1) { + ColumnChunkMetaData chunk = columnsInOrder.get(i); + + // get this chunk's start position in the new file + long newChunkStart = out.getPos() + length; + + // add this chunk to be copied with any previous chunks + if (start < 0) { + // no previous chunk included, start at this chunk's starting pos + start = chunk.getStartingPos(); + } + length += chunk.getTotalSize(); + + if ((i + 1) == columnsInOrder.size() || + columnsInOrder.get(i + 1).getStartingPos() != (start + length)) { + // not contiguous. do the copy now. + copy(from, out, start, length); + // reset to start at the next column chunk + start = -1; + length = 0; + } + + // TODO: column/offset indexes are not copied + // (it would require seeking to the end of the file for each row groups) + currentColumnIndexes.add(null); + currentOffsetIndexes.add(null); + + Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); + currentBlock.addColumn(ColumnChunkMetaData.get( + chunk.getPath(), + chunk.getPrimitiveType(), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + offsets.firstDataPageOffset, + offsets.dictionaryPageOffset, + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize())); + + blockUncompressedSize += chunk.getTotalUncompressedSize(); + } + + currentBlock.setTotalByteSize(blockUncompressedSize); + + endBlock(); + } + + /** + * @param descriptor the descriptor for the target column + * @param from a file stream to read from + * @param chunk the column chunk to be copied + * @param bloomFilter the bloomFilter for this chunk + * @param columnIndex the column index for this chunk + * @param offsetIndex the offset index for this chunk + * @throws IOException + */ + public void appendColumnChunk(ColumnDescriptor descriptor, SeekableInputStream from, ColumnChunkMetaData chunk, + BloomFilter bloomFilter, ColumnIndex columnIndex, OffsetIndex offsetIndex) throws IOException { + long start = chunk.getStartingPos(); + long length = chunk.getTotalSize(); + long newChunkStart = out.getPos(); + + copy(from, out, start, length); + + currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); + currentColumnIndexes.add(columnIndex); + currentOffsetIndexes.add(offsetIndex); + + Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); + currentBlock.addColumn(ColumnChunkMetaData.get( + chunk.getPath(), + chunk.getPrimitiveType(), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + offsets.firstDataPageOffset, + offsets.dictionaryPageOffset, + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize())); + + currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); + } + + // Buffers for the copy function. + private static final ThreadLocal COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]); + + /** + * Copy from a FS input stream to an output stream. Thread-safe + * + * @param from a {@link SeekableInputStream} + * @param to any {@link PositionOutputStream} + * @param start where in the from stream to start copying + * @param length the number of bytes to copy + * @throws IOException if there is an error while reading or writing + */ + private static void copy(SeekableInputStream from, PositionOutputStream to, + long start, long length) throws IOException{ + LOG.debug("Copying {} bytes at {} to {}", length, start, to.getPos()); + from.seek(start); + long bytesCopied = 0; + byte[] buffer = COPY_BUFFER.get(); + while (bytesCopied < length) { + long bytesLeft = length - bytesCopied; + int bytesRead = from.read(buffer, 0, + (buffer.length < bytesLeft ? buffer.length : (int) bytesLeft)); + if (bytesRead < 0) { + throw new IllegalArgumentException( + "Unexpected end of input file at " + start + bytesCopied); + } + to.write(buffer, 0, bytesRead); + bytesCopied += bytesRead; + } + } + + /** + * ends a file once all blocks have been written. + * closes the file. + * @param extraMetaData the extra meta data to write in the footer + * @throws IOException if there is an error while writing + */ + public void end(Map extraMetaData) throws IOException { + state = state.end(); + serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor); + serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor); + serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor); + LOG.debug("{}: end", out.getPos()); + this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); + serializeFooter(footer, out, fileEncryptor); + out.close(); + } + + private static void serializeColumnIndexes( + List> columnIndexes, + List blocks, + PositionOutputStream out, + InternalFileEncryptor fileEncryptor) throws IOException { + LOG.debug("{}: column indexes", out.getPos()); + for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) { + BlockMetaData block = blocks.get(bIndex); + List columns = block.getColumns(); + List blockColumnIndexes = columnIndexes.get(bIndex); + for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) { + ColumnChunkMetaData column = columns.get(cIndex); + org.apache.parquet.format.ColumnIndex columnIndex = ParquetMetadataConverter + .toParquetColumnIndex(column.getPrimitiveType(), blockColumnIndexes.get(cIndex)); + if (columnIndex == null) { + continue; + } + BlockCipher.Encryptor columnIndexEncryptor = null; + byte[] columnIndexAAD = null; + if (null != fileEncryptor) { + InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex); + if (columnEncryptionSetup.isEncrypted()) { + columnIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor(); + columnIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.ColumnIndex, + block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1); + } + } + long offset = out.getPos(); + Util.writeColumnIndex(columnIndex, out, columnIndexEncryptor, columnIndexAAD); + column.setColumnIndexReference(new IndexReference(offset, (int) (out.getPos() - offset))); + } + } + } + + private int toIntWithCheck(long size) { + if ((int)size != size) { + throw new ParquetEncodingException("Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + size); + } + return (int)size; + } + + private static void serializeOffsetIndexes( + List> offsetIndexes, + List blocks, + PositionOutputStream out, + InternalFileEncryptor fileEncryptor) throws IOException { + LOG.debug("{}: offset indexes", out.getPos()); + for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) { + BlockMetaData block = blocks.get(bIndex); + List columns = block.getColumns(); + List blockOffsetIndexes = offsetIndexes.get(bIndex); + for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) { + OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex); + if (offsetIndex == null) { + continue; + } + ColumnChunkMetaData column = columns.get(cIndex); + BlockCipher.Encryptor offsetIndexEncryptor = null; + byte[] offsetIndexAAD = null; + if (null != fileEncryptor) { + InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex); + if (columnEncryptionSetup.isEncrypted()) { + offsetIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor(); + offsetIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.OffsetIndex, + block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1); + } + } + long offset = out.getPos(); + Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), out, offsetIndexEncryptor, offsetIndexAAD); + column.setOffsetIndexReference(new IndexReference(offset, (int) (out.getPos() - offset))); + } + } + } + + private static void serializeBloomFilters( + List> bloomFilters, + List blocks, + PositionOutputStream out, + InternalFileEncryptor fileEncryptor) throws IOException { + LOG.debug("{}: bloom filters", out.getPos()); + for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) { + BlockMetaData block = blocks.get(bIndex); + List columns = block.getColumns(); + Map blockBloomFilters = bloomFilters.get(bIndex); + if (blockBloomFilters.isEmpty()) { + continue; + } + for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) { + ColumnChunkMetaData column = columns.get(cIndex); + BloomFilter bloomFilter = blockBloomFilters.get(column.getPath().toDotString()); + if (bloomFilter == null) { + continue; + } + + long offset = out.getPos(); + column.setBloomFilterOffset(offset); + + BlockCipher.Encryptor bloomFilterEncryptor = null; + byte[] bloomFilterHeaderAAD = null; + byte[] bloomFilterBitsetAAD = null; + if (null != fileEncryptor) { + InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex); + if (columnEncryptionSetup.isEncrypted()) { + bloomFilterEncryptor = columnEncryptionSetup.getMetaDataEncryptor(); + int columnOrdinal = columnEncryptionSetup.getOrdinal(); + bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterHeader, + block.getOrdinal(), columnOrdinal, -1); + bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterBitset, + block.getOrdinal(), columnOrdinal, -1); + } + } + + Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter), out, + bloomFilterEncryptor, bloomFilterHeaderAAD); + + ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream(); + bloomFilter.writeTo(tempOutStream); + byte[] serializedBitset = tempOutStream.toByteArray(); + if (null != bloomFilterEncryptor) { + serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, bloomFilterBitsetAAD); + } + out.write(serializedBitset); + } + } + } + + private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out, + InternalFileEncryptor fileEncryptor) throws IOException { + + ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); + + // Unencrypted file + if (null == fileEncryptor) { + long footerIndex = out.getPos(); + org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer); + writeFileMetaData(parquetMetadata, out); + LOG.debug("{}: footer length = {}", out.getPos(), (out.getPos() - footerIndex)); + BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex)); + out.write(MAGIC); + return; + } + + org.apache.parquet.format.FileMetaData parquetMetadata = + metadataConverter.toParquetMetadata(CURRENT_VERSION, footer, fileEncryptor); + + // Encrypted file with plaintext footer + if (!fileEncryptor.isFooterEncrypted()) { + long footerIndex = out.getPos(); + parquetMetadata.setEncryption_algorithm(fileEncryptor.getEncryptionAlgorithm()); + // create footer signature (nonce + tag of encrypted footer) + byte[] footerSigningKeyMetaData = fileEncryptor.getFooterSigningKeyMetaData(); + if (null != footerSigningKeyMetaData) { + parquetMetadata.setFooter_signing_key_metadata(footerSigningKeyMetaData); + } + ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream(); + writeFileMetaData(parquetMetadata, tempOutStream); + byte[] serializedFooter = tempOutStream.toByteArray(); + byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD()); + byte[] encryptedFooter = fileEncryptor.getSignedFooterEncryptor().encrypt(serializedFooter, footerAAD); + byte[] signature = new byte[AesCipher.NONCE_LENGTH + AesCipher.GCM_TAG_LENGTH]; + System.arraycopy(encryptedFooter, ModuleCipherFactory.SIZE_LENGTH, signature, 0, AesCipher.NONCE_LENGTH); // copy Nonce + System.arraycopy(encryptedFooter, encryptedFooter.length - AesCipher.GCM_TAG_LENGTH, + signature, AesCipher.NONCE_LENGTH, AesCipher.GCM_TAG_LENGTH); // copy GCM Tag + out.write(serializedFooter); + out.write(signature); + LOG.debug("{}: footer and signature length = {}", out.getPos(), (out.getPos() - footerIndex)); + BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex)); + out.write(MAGIC); + return; + } + + // Encrypted file with encrypted footer + long cryptoFooterIndex = out.getPos(); + writeFileCryptoMetaData(fileEncryptor.getFileCryptoMetaData(), out); + byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD()); + writeFileMetaData(parquetMetadata, out, fileEncryptor.getFooterEncryptor(), footerAAD); + int combinedMetaDataLength = (int)(out.getPos() - cryptoFooterIndex); + LOG.debug("{}: crypto metadata and footer length = {}", out.getPos(), combinedMetaDataLength); + BytesUtils.writeIntLittleEndian(out, combinedMetaDataLength); + out.write(EFMAGIC); + } + + public ParquetMetadata getFooter() { + Preconditions.checkState(state == STATE.ENDED, "Cannot return unfinished footer."); + return footer; + } + + /** + * Given a list of metadata files, merge them into a single ParquetMetadata + * Requires that the schemas be compatible, and the extraMetadata be exactly equal. + * @param files a list of files to merge metadata from + * @param conf a configuration + * @return merged parquet metadata for the files + * @throws IOException if there is an error while writing + * @deprecated metadata files are not recommended and will be removed in 2.0.0 + */ + @Deprecated + public static ParquetMetadata mergeMetadataFiles(List files, Configuration conf) throws IOException { + return mergeMetadataFiles(files, conf, new StrictKeyValueMetadataMergeStrategy()); + } + + /** + * Given a list of metadata files, merge them into a single ParquetMetadata + * Requires that the schemas be compatible, and the extraMetadata be exactly equal. + * @param files a list of files to merge metadata from + * @param conf a configuration + * @param keyValueMetadataMergeStrategy strategy to merge values for same key, if there are multiple + * @return merged parquet metadata for the files + * @throws IOException if there is an error while writing + * @deprecated metadata files are not recommended and will be removed in 2.0.0 + */ + @Deprecated + public static ParquetMetadata mergeMetadataFiles(List files, Configuration conf, + KeyValueMetadataMergeStrategy keyValueMetadataMergeStrategy) throws IOException { + Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list of metadata"); + + GlobalMetaData globalMetaData = null; + List blocks = new ArrayList(); + + for (Path p : files) { + ParquetMetadata pmd = ParquetFileReader.readFooter(conf, p, ParquetMetadataConverter.NO_FILTER); + FileMetaData fmd = pmd.getFileMetaData(); + globalMetaData = mergeInto(fmd, globalMetaData, true); + blocks.addAll(pmd.getBlocks()); + } + + // collapse GlobalMetaData into a single FileMetaData, which will throw if they are not compatible + return new ParquetMetadata(globalMetaData.merge(keyValueMetadataMergeStrategy), blocks); + } + + /** + * Given a list of metadata files, merge them into a single metadata file. + * Requires that the schemas be compatible, and the extraMetaData be exactly equal. + * This is useful when merging 2 directories of parquet files into a single directory, as long + * as both directories were written with compatible schemas and equal extraMetaData. + * @param files a list of files to merge metadata from + * @param outputPath path to write merged metadata to + * @param conf a configuration + * @throws IOException if there is an error while reading or writing + * @deprecated metadata files are not recommended and will be removed in 2.0.0 + */ + @Deprecated + public static void writeMergedMetadataFile(List files, Path outputPath, Configuration conf) throws IOException { + ParquetMetadata merged = mergeMetadataFiles(files, conf); + writeMetadataFile(outputPath, merged, outputPath.getFileSystem(conf)); + } + + /** + * writes a _metadata and _common_metadata file + * @param configuration the configuration to use to get the FileSystem + * @param outputPath the directory to write the _metadata file to + * @param footers the list of footers to merge + * @throws IOException if there is an error while writing + * @deprecated metadata files are not recommended and will be removed in 2.0.0 + */ + @Deprecated + public static void writeMetadataFile(Configuration configuration, Path outputPath, List