diff --git a/contrib/storage-hive/hive-exec-shade/pom.xml b/contrib/storage-hive/hive-exec-shade/pom.xml index 1cd5980b0b8..cc98078a9d5 100644 --- a/contrib/storage-hive/hive-exec-shade/pom.xml +++ b/contrib/storage-hive/hive-exec-shade/pom.xml @@ -32,7 +32,7 @@ Drill : Contrib : Storage : Hive : Exec Shaded - 1.8.3 + 1.15.1 @@ -219,6 +219,12 @@ META-INF/versions/22/** + + org.apache.parquet:parquet-hadoop-bundle + + META-INF/versions/22/** + + diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml index 35cb6022e62..8ff48673f35 100644 --- a/contrib/storage-kafka/pom.xml +++ b/contrib/storage-kafka/pom.xml @@ -97,7 +97,7 @@ io.confluent kafka-avro-serializer - 6.1.1 + 7.9.0 diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java deleted file mode 100644 index 1d9ccdad509..00000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.parquet; - -import io.netty.buffer.DrillBuf; - -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.hadoop.fs.FSDataInputStream; - -import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.format.PageHeader; -import org.apache.parquet.format.Util; -import org.apache.parquet.hadoop.util.HadoopStreams; - -/** - * @deprecated it is never used. So can be removed in Drill 1.21.0 - */ -public class ColumnDataReader { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class); - - private final long endPosition; - public final FSDataInputStream input; - - public ColumnDataReader(FSDataInputStream input, long start, long length) throws IOException{ - this.input = input; - this.input.seek(start); - this.endPosition = start + length; - } - - public PageHeader readPageHeader() throws IOException{ - return Util.readPageHeader(input); - } - - public FSDataInputStream getInputStream() { - return input; - } - - public BytesInput getPageAsBytesInput(int pageLength) throws IOException{ - byte[] b = new byte[pageLength]; - input.read(b); - return new HadoopBytesInput(b); - } - - public void loadPage(DrillBuf target, int pageLength) throws IOException { - target.clear(); - HadoopStreams.wrap(input).read(target.nioBuffer(0, pageLength)); - target.writerIndex(pageLength); - } - - public void clear(){ - try{ - input.close(); - }catch(IOException ex){ - logger.warn("Error while closing input stream.", ex); - } - } - - public boolean hasRemainder() throws IOException{ - return input.getPos() < endPosition; - } - - public class HadoopBytesInput extends BytesInput{ - - private final byte[] pageBytes; - - public HadoopBytesInput(byte[] pageBytes) { - super(); - this.pageBytes = pageBytes; - } - - @Override - public byte[] toByteArray() throws IOException { - return pageBytes; - } - - @Override - public long size() { - return pageBytes.length; - } - - @Override - public void writeAllTo(OutputStream out) throws IOException { - out.write(pageBytes); - } - } -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index c57a7c2e9d7..14e9a2b9d6a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -69,7 +69,7 @@ import org.apache.parquet.column.values.factory.DefaultV2ValuesWriterFactory; import org.apache.parquet.column.values.factory.ValuesWriterFactory; import org.apache.parquet.compression.CompressionCodecFactory; -import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore; +import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.io.ColumnIOFactory; @@ -120,7 +120,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; private ColumnWriteStore store; - private ParquetColumnChunkPageWriteStore pageStore; + private ColumnChunkPageWriteStore pageStore; private RecordConsumer consumer; private BatchSchema batchSchema; @@ -285,10 +285,7 @@ private void newSchema() { .withWriterVersion(writerVersion) .build(); - // TODO: Replace ParquetColumnChunkPageWriteStore with ColumnChunkPageWriteStore from parquet library - // once DRILL-7906 (PARQUET-1006) will be resolved - pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, - parquetProperties.getInitialSlabSize(), pageSize, parquetProperties.getAllocator(), + pageStore = new ColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, parquetProperties.getAllocator(), parquetProperties.getColumnIndexTruncateLength(), parquetProperties.getPageWriteChecksumEnabled()); store = writerVersion == WriterVersion.PARQUET_1_0 diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index ae76971a373..f35c2323ce6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -55,7 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -417,13 +416,9 @@ public void close() { recordReader = null; recordMaterializer = null; nullFilledVectors = null; - try { - if (pageReadStore != null) { - pageReadStore.close(); - pageReadStore = null; - } - } catch (IOException e) { - logger.warn("Failure while closing PageReadStore", e); + if (pageReadStore != null) { + pageReadStore.close(); + pageReadStore = null; } } diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java index 7834eaa8166..5dc6658a24f 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java @@ -295,9 +295,13 @@ public void addColumn(ColumnDescriptor descriptor, ColumnChunkMetaData metaData) columns.put(descriptor, reader); } - public void close() throws IOException { + public void close() { for (FSDataInputStream stream : streams) { - stream.close(); + try { + stream.close(); + } catch (IOException e) { + logger.warn("Error closing stream: {}", e.getMessage(), e); + } } for (ColumnChunkIncPageReader reader : columns.values()) { reader.close(); diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java deleted file mode 100644 index 28dfc278596..00000000000 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java +++ /dev/null @@ -1,483 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.parquet.hadoop; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.zip.CRC32; - -import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.bytes.CapacityByteArrayOutputStream; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.Encoding; -import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.column.page.DictionaryPage; -import org.apache.parquet.column.page.PageWriteStore; -import org.apache.parquet.column.page.PageWriter; -import org.apache.parquet.column.statistics.Statistics; -import org.apache.parquet.column.values.bloomfilter.BloomFilter; -import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; -import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; -import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; -import org.apache.parquet.crypto.AesCipher; -import org.apache.parquet.crypto.InternalColumnEncryptionSetup; -import org.apache.parquet.crypto.InternalFileEncryptor; -import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; -import org.apache.parquet.format.BlockCipher; -import org.apache.parquet.format.converter.ParquetMetadataConverter; -import org.apache.parquet.hadoop.metadata.ColumnPath; -import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; -import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; -import org.apache.parquet.io.ParquetEncodingException; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.bytes.ByteBufferAllocator; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@InterfaceAudience.Private -public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore, -AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(ParquetColumnChunkPageWriteStore.class); - - private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - - private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter, Closeable { - - private final ColumnDescriptor path; - private final BytesInputCompressor compressor; - private final CapacityByteArrayOutputStream tempOutputStream; - private final CapacityByteArrayOutputStream buf; - private DictionaryPage dictionaryPage; - - private long uncompressedLength; - private long compressedLength; - private long totalValueCount; - private int pageCount; - - // repetition and definition level encodings are used only for v1 pages and don't change - private Set rlEncodings = new HashSet(); - private Set dlEncodings = new HashSet(); - private List dataEncodings = new ArrayList(); - - private BloomFilter bloomFilter; - private ColumnIndexBuilder columnIndexBuilder; - private OffsetIndexBuilder offsetIndexBuilder; - private Statistics totalStatistics; - private final ByteBufferAllocator allocator; - - private final CRC32 crc; - boolean pageWriteChecksumEnabled; - - private final BlockCipher.Encryptor headerBlockEncryptor; - private final BlockCipher.Encryptor pageBlockEncryptor; - private final int rowGroupOrdinal; - private final int columnOrdinal; - private int pageOrdinal; - private final byte[] dataPageAAD; - private final byte[] dataPageHeaderAAD; - private final byte[] fileAAD; - - private ColumnChunkPageWriter(ColumnDescriptor path, - BytesInputCompressor compressor, - int initialSlabSize, - int maxCapacityHint, - ByteBufferAllocator allocator, - int columnIndexTruncateLength, - boolean pageWriteChecksumEnabled, - BlockCipher.Encryptor headerBlockEncryptor, - BlockCipher.Encryptor pageBlockEncryptor, - byte[] fileAAD, - int rowGroupOrdinal, - int columnOrdinal) { - this.path = path; - this.compressor = compressor; - this.allocator = allocator; - this.tempOutputStream = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator); - this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator); - this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength); - this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); - this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; - this.crc = pageWriteChecksumEnabled ? new CRC32() : null; - - this.headerBlockEncryptor = headerBlockEncryptor; - this.pageBlockEncryptor = pageBlockEncryptor; - this.fileAAD = fileAAD; - this.rowGroupOrdinal = rowGroupOrdinal; - this.columnOrdinal = columnOrdinal; - this.pageOrdinal = -1; - if (null != headerBlockEncryptor) { - dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader, - rowGroupOrdinal, columnOrdinal, 0); - } else { - dataPageHeaderAAD = null; - } - if (null != pageBlockEncryptor) { - dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage, - rowGroupOrdinal, columnOrdinal, 0); - } else { - dataPageAAD = null; - } - } - - @Override - @Deprecated - public void writePage(BytesInput bytesInput, int valueCount, Statistics statistics, Encoding rlEncoding, - Encoding dlEncoding, Encoding valuesEncoding) throws IOException { - // Setting the builders to the no-op ones so no column/offset indexes will be written for this column chunk - columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); - offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder(); - - writePage(bytesInput, valueCount, -1, statistics, rlEncoding, dlEncoding, valuesEncoding); - } - - @Override - public void writePage(BytesInput bytes, - int valueCount, - int rowCount, - Statistics statistics, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - pageOrdinal++; - long uncompressedSize = bytes.size(); - if (uncompressedSize > Integer.MAX_VALUE) { - throw new ParquetEncodingException( - "Cannot write page larger than Integer.MAX_VALUE bytes: " + - uncompressedSize); - } - BytesInput compressedBytes = compressor.compress(bytes); - if (null != pageBlockEncryptor) { - AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal); - compressedBytes = BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dataPageAAD)); - } - long compressedSize = compressedBytes.size(); - if (compressedSize > Integer.MAX_VALUE) { - throw new ParquetEncodingException( - "Cannot write compressed page larger than Integer.MAX_VALUE bytes: " - + compressedSize); - } - tempOutputStream.reset(); - if (null != headerBlockEncryptor) { - AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); - } - if (pageWriteChecksumEnabled) { - crc.reset(); - crc.update(compressedBytes.toByteArray()); - parquetMetadataConverter.writeDataPageV1Header( - (int)uncompressedSize, - (int)compressedSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - (int) crc.getValue(), - tempOutputStream, - headerBlockEncryptor, - dataPageHeaderAAD); - } else { - parquetMetadataConverter.writeDataPageV1Header( - (int)uncompressedSize, - (int)compressedSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - tempOutputStream, - headerBlockEncryptor, - dataPageHeaderAAD); - } - this.uncompressedLength += uncompressedSize; - this.compressedLength += compressedSize; - this.totalValueCount += valueCount; - this.pageCount += 1; - - // Copying the statistics if it is not initialized yet so we have the correct typed one - if (totalStatistics == null) { - totalStatistics = statistics.copy(); - } else { - totalStatistics.mergeStatistics(statistics); - } - - columnIndexBuilder.add(statistics); - offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount); - - // by concatenating before writing instead of writing twice, - // we only allocate one buffer to copy into instead of multiple. - BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes).writeAllTo(buf); // is used instead of above - rlEncodings.add(rlEncoding); - dlEncodings.add(dlEncoding); - dataEncodings.add(valuesEncoding); - } - - @Override - public void writePageV2( - int rowCount, int nullCount, int valueCount, - BytesInput repetitionLevels, BytesInput definitionLevels, - Encoding dataEncoding, BytesInput data, - Statistics statistics) throws IOException { - pageOrdinal++; - - int rlByteLength = toIntWithCheck(repetitionLevels.size()); - int dlByteLength = toIntWithCheck(definitionLevels.size()); - int uncompressedSize = toIntWithCheck( - data.size() + repetitionLevels.size() + definitionLevels.size() - ); - // TODO: decide if we compress - BytesInput compressedData = compressor.compress(data); - if (null != pageBlockEncryptor) { - AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal); - compressedData = BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD)); - } - int compressedSize = toIntWithCheck( - compressedData.size() + repetitionLevels.size() + definitionLevels.size() - ); - tempOutputStream.reset(); - if (null != headerBlockEncryptor) { - AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); - } - parquetMetadataConverter.writeDataPageV2Header( - uncompressedSize, compressedSize, - valueCount, nullCount, rowCount, - dataEncoding, - rlByteLength, - dlByteLength, - tempOutputStream, - headerBlockEncryptor, - dataPageHeaderAAD); - this.uncompressedLength += uncompressedSize; - this.compressedLength += compressedSize; - this.totalValueCount += valueCount; - this.pageCount += 1; - - // Copying the statistics if it is not initialized yet so we have the correct typed one - if (totalStatistics == null) { - totalStatistics = statistics.copy(); - } else { - totalStatistics.mergeStatistics(statistics); - } - - columnIndexBuilder.add(statistics); - offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() + compressedSize), rowCount); - - // by concatenating before writing instead of writing twice, - // we only allocate one buffer to copy into instead of multiple. - BytesInput.concat( - BytesInput.from(tempOutputStream), - repetitionLevels, - definitionLevels, - compressedData).writeAllTo(buf); - dataEncodings.add(dataEncoding); - } - - private int toIntWithCheck(long size) { - if (size > Integer.MAX_VALUE) { - throw new ParquetEncodingException( - "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + - size); - } - return (int)size; - } - - @Override - public long getMemSize() { - return buf.size(); - } - - public void writeToFileWriter(ParquetFileWriter writer) throws IOException { - if (null == headerBlockEncryptor) { - writer.writeColumnChunk( - path, - totalValueCount, - compressor.getCodecName(), - dictionaryPage, - BytesInput.from(buf), - uncompressedLength, - compressedLength, - totalStatistics, - columnIndexBuilder, - offsetIndexBuilder, - bloomFilter, - rlEncodings, - dlEncodings, - dataEncodings); - } else { - writer.writeColumnChunk( - path, - totalValueCount, - compressor.getCodecName(), - dictionaryPage, - BytesInput.from(buf), - uncompressedLength, - compressedLength, - totalStatistics, - columnIndexBuilder, - offsetIndexBuilder, - bloomFilter, - rlEncodings, - dlEncodings, - dataEncodings, - headerBlockEncryptor, - rowGroupOrdinal, - columnOrdinal, - fileAAD); - } - if (LOG.isDebugEnabled()) { - LOG.debug( - String.format( - "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", - buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, new HashSet(dataEncodings)) - + (dictionaryPage != null ? String.format( - ", dic { %,d entries, %,dB raw, %,dB comp}", - dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) - : "")); - } - rlEncodings.clear(); - dlEncodings.clear(); - dataEncodings.clear(); - pageCount = 0; - pageOrdinal = -1; - } - - @Override - public long allocatedSize() { - return buf.size(); - } - - @Override - public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - if (this.dictionaryPage != null) { - throw new ParquetEncodingException("Only one dictionary page is allowed"); - } - BytesInput dictionaryBytes = dictionaryPage.getBytes(); - int uncompressedSize = (int)dictionaryBytes.size(); - BytesInput compressedBytes = compressor.compress(dictionaryBytes); - if (null != pageBlockEncryptor) { - byte[] dictonaryPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage, - rowGroupOrdinal, columnOrdinal, -1); - compressedBytes = BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dictonaryPageAAD)); - } - this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, - dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding()); - } - - @Override - public String memUsageString(String prefix) { - return buf.memUsageString(prefix + " ColumnChunkPageWriter"); - } - - @Override - public void writeBloomFilter(BloomFilter bloomFilter) { - this.bloomFilter = bloomFilter; - } - - @Override - public void close() { - tempOutputStream.close(); - buf.close(); - } - } - - private final Map writers = new HashMap(); - private final MessageType schema; - - public ParquetColumnChunkPageWriteStore(BytesInputCompressor compressor, MessageType schema, int initialSlabSize, - int maxCapacityHint, ByteBufferAllocator allocator, - int columnIndexTruncateLength) { - this(compressor, schema, initialSlabSize, maxCapacityHint, allocator, columnIndexTruncateLength, - ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); - } - - public ParquetColumnChunkPageWriteStore(BytesInputCompressor compressor, MessageType schema, int initialSlabSize, - int maxCapacityHint, ByteBufferAllocator allocator, - int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) { - this.schema = schema; - for (ColumnDescriptor path : schema.getColumns()) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator, columnIndexTruncateLength, - pageWriteChecksumEnabled, null, null, null, -1, -1)); - } - } - - public ParquetColumnChunkPageWriteStore(BytesInputCompressor compressor, MessageType schema, int initialSlabSize, - int maxCapacityHint, ByteBufferAllocator allocator, - int columnIndexTruncateLength, boolean pageWriteChecksumEnabled, - InternalFileEncryptor fileEncryptor, int rowGroupOrdinal) { - this.schema = schema; - if (null == fileEncryptor) { - for (ColumnDescriptor path : schema.getColumns()) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator, - columnIndexTruncateLength, pageWriteChecksumEnabled, null, null, - null, -1, -1)); - } - return; - } - - // Encrypted file - int columnOrdinal = -1; - byte[] fileAAD = fileEncryptor.getFileAAD(); - for (ColumnDescriptor path : schema.getColumns()) { - columnOrdinal++; - BlockCipher.Encryptor headerBlockEncryptor = null; - BlockCipher.Encryptor pageBlockEncryptor = null; - ColumnPath columnPath = ColumnPath.get(path.getPath()); - - InternalColumnEncryptionSetup columnSetup = fileEncryptor.getColumnSetup(columnPath, true, columnOrdinal); - if (columnSetup.isEncrypted()) { - headerBlockEncryptor = columnSetup.getMetaDataEncryptor(); - pageBlockEncryptor = columnSetup.getDataEncryptor(); - } - - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator, - columnIndexTruncateLength, pageWriteChecksumEnabled, headerBlockEncryptor, pageBlockEncryptor, fileAAD, - rowGroupOrdinal, columnOrdinal)); - } - } - - @Override - public PageWriter getPageWriter(ColumnDescriptor path) { - return writers.get(path); - } - - @Override - public BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path) { - return writers.get(path); - } - - - - public void flushToFileWriter(ParquetFileWriter writer) throws IOException { - for (ColumnDescriptor path : schema.getColumns()) { - ColumnChunkPageWriter pageWriter = writers.get(path); - pageWriter.writeToFileWriter(writer); - } - } - - @Override - public void close() { - for (ColumnChunkPageWriter pageWriter : writers.values()) { - pageWriter.close(); - } - } - -} diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index f90f4c84131..275cb6654a8 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -7,13 +7,14 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.parquet.hadoop; @@ -27,30 +28,34 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.zip.CRC32; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - +import org.apache.parquet.ParquetSizeOverflowException; import org.apache.parquet.Preconditions; import org.apache.parquet.Version; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.bytes.ReusingByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.crypto.AesCipher; @@ -61,19 +66,19 @@ import org.apache.parquet.crypto.ModuleCipherFactory; import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; import org.apache.parquet.crypto.ParquetCryptoRuntimeException; -import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; -import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.format.BlockCipher; import org.apache.parquet.format.Util; import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.GlobalMetaData; import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.hadoop.util.HadoopStreams; import org.apache.parquet.internal.column.columnindex.ColumnIndex; @@ -83,9 +88,9 @@ import org.apache.parquet.internal.hadoop.metadata.IndexReference; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.OutputFile; -import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.io.PositionOutputStream; +import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.TypeUtil; @@ -94,1541 +99,2196 @@ /** * Internal implementation of the Parquet file writer as a block container
- * Note: this is temporary Drill-Parquet class needed to write empty parquet files. Details in + * Note: this is temporary Drill-Parquet class needed to write empty parquet files. + * This is a full copy of the Parquet library implementation with the lines that throw an error + * on writing empty Parquet files commented out. See details in: * PARQUET-2026 and * DRILL-7907 */ -public class ParquetFileWriter { - private static final Logger LOG = LoggerFactory.getLogger(ParquetFileWriter.class); - - private final ParquetMetadataConverter metadataConverter; - - public static final String PARQUET_METADATA_FILE = "_metadata"; - public static final String MAGIC_STR = "PAR1"; - public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII); - public static final String EF_MAGIC_STR = "PARE"; - public static final byte[] EFMAGIC = EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII); - public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata"; - public static final int CURRENT_VERSION = 1; - - // File creation modes - public static enum Mode { - CREATE, - OVERWRITE - } - - protected final PositionOutputStream out; - - private final MessageType schema; - private final AlignmentStrategy alignment; - private final int columnIndexTruncateLength; - - // file data - private List blocks = new ArrayList(); - - // The column/offset indexes per blocks per column chunks - private final List> columnIndexes = new ArrayList<>(); - private final List> offsetIndexes = new ArrayList<>(); - - // The Bloom filters - private final List> bloomFilters = new ArrayList<>(); - - // The file encryptor - private final InternalFileEncryptor fileEncryptor; - - // row group data - private BlockMetaData currentBlock; // appended to by endColumn - - // The column/offset indexes for the actual block - private List currentColumnIndexes; - private List currentOffsetIndexes; - - // The Bloom filter for the actual block - private Map currentBloomFilters; - - // row group data set at the start of a row group - private long currentRecordCount; // set in startBlock - - // column chunk data accumulated as pages are written - private EncodingStats.Builder encodingStatsBuilder; - private Set currentEncodings; - private long uncompressedLength; - private long compressedLength; - private Statistics currentStatistics; // accumulated in writePage(s) - private ColumnIndexBuilder columnIndexBuilder; - private OffsetIndexBuilder offsetIndexBuilder; - - // column chunk data set at the start of a column - private CompressionCodecName currentChunkCodec; // set in startColumn - private ColumnPath currentChunkPath; // set in startColumn - private PrimitiveType currentChunkType; // set in startColumn - private long currentChunkValueCount; // set in startColumn - private long currentChunkFirstDataPage; // set in startColumn & page writes - private long currentChunkDictionaryPageOffset; // set in writeDictionaryPage - - // set when end is called - private ParquetMetadata footer = null; - - private final CRC32 crc; - private boolean pageWriteChecksumEnabled; - - /** - * Captures the order in which methods should be called - */ - private enum STATE { - NOT_STARTED { - STATE start() { - return STARTED; - } - }, - STARTED { - STATE startBlock() { - return BLOCK; - } - STATE end() { - return ENDED; - } - }, - BLOCK { - STATE startColumn() { - return COLUMN; - } - STATE endBlock() { - return STARTED; - } - }, - COLUMN { - STATE endColumn() { - return BLOCK; - }; - STATE write() { - return this; - } - }, - ENDED; - - STATE start() throws IOException { return error(); } - STATE startBlock() throws IOException { return error(); } - STATE startColumn() throws IOException { return error(); } - STATE write() throws IOException { return error(); } - STATE endColumn() throws IOException { return error(); } - STATE endBlock() throws IOException { return error(); } - STATE end() throws IOException { return error(); } - - private final STATE error() throws IOException { - throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name()); - } +public class ParquetFileWriter implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(ParquetFileWriter.class); + + private final ParquetMetadataConverter metadataConverter; + + public static final String PARQUET_METADATA_FILE = "_metadata"; + public static final String MAGIC_STR = "PAR1"; + public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII); + public static final String EF_MAGIC_STR = "PARE"; + public static final byte[] EFMAGIC = EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII); + public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata"; + public static final int CURRENT_VERSION = 1; + + // File creation modes + public static enum Mode { + CREATE, + OVERWRITE + } + + protected final PositionOutputStream out; + + private final MessageType schema; + private final AlignmentStrategy alignment; + private final int columnIndexTruncateLength; + + // file data + private final List blocks = new ArrayList(); + + // The column/offset indexes per blocks per column chunks + private final List> columnIndexes = new ArrayList<>(); + private final List> offsetIndexes = new ArrayList<>(); + + // The Bloom filters + private final List> bloomFilters = new ArrayList<>(); + + // The file encryptor + private final InternalFileEncryptor fileEncryptor; + + // row group data + private BlockMetaData currentBlock; // appended to by endColumn + + // The column/offset indexes for the actual block + private List currentColumnIndexes; + private List currentOffsetIndexes; + + // The Bloom filter for the actual block + private Map currentBloomFilters; + + // row group data set at the start of a row group + private long currentRecordCount; // set in startBlock + + // column chunk data accumulated as pages are written + private final EncodingStats.Builder encodingStatsBuilder; + private Set currentEncodings; + private long uncompressedLength; + private long compressedLength; + private Statistics currentStatistics; // accumulated in writePage(s) + private SizeStatistics currentSizeStatistics; // accumulated in writePage(s) + private ColumnIndexBuilder columnIndexBuilder; + private OffsetIndexBuilder offsetIndexBuilder; + + // column chunk data set at the start of a column + private CompressionCodecName currentChunkCodec; // set in startColumn + private ColumnPath currentChunkPath; // set in startColumn + private PrimitiveType currentChunkType; // set in startColumn + private long currentChunkValueCount; // set in startColumn + private long currentChunkFirstDataPage; // set in startColumn & page writes + private long currentChunkDictionaryPageOffset; // set in writeDictionaryPage + + // set when end is called + private ParquetMetadata footer = null; + private boolean closed; + + private final CRC32 crc; + private final ReusingByteBufferAllocator crcAllocator; + private final boolean pageWriteChecksumEnabled; + + /** + * Captures the order in which methods should be called + */ + private enum STATE { + NOT_STARTED { + STATE start() { + return STARTED; + } + }, + STARTED { + STATE startBlock() { + return BLOCK; + } + + STATE end() { + return ENDED; + } + }, + BLOCK { + STATE startColumn() { + return COLUMN; + } + + STATE endBlock() { + return STARTED; + } + }, + COLUMN { + STATE endColumn() { + return BLOCK; + } + + STATE write() { + return this; + } + }, + ENDED; + + STATE start() throws IOException { + return error(); } - private STATE state = STATE.NOT_STARTED; - - /** - * @param configuration Hadoop configuration - * @param schema the schema of the data - * @param file the file to write to - * @throws IOException if the file can not be created - * @deprecated will be removed in 2.0.0 - */ - @Deprecated - public ParquetFileWriter(Configuration configuration, MessageType schema, - Path file) throws IOException { - this(HadoopOutputFile.fromPath(file, configuration), - schema, Mode.CREATE, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT); - } - - /** - * @param configuration Hadoop configuration - * @param schema the schema of the data - * @param file the file to write to - * @param mode file creation mode - * @throws IOException if the file can not be created - * @deprecated will be removed in 2.0.0 - */ - @Deprecated - public ParquetFileWriter(Configuration configuration, MessageType schema, - Path file, Mode mode) throws IOException { - this(HadoopOutputFile.fromPath(file, configuration), - schema, mode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT); - } - - /** - * @param configuration Hadoop configuration - * @param schema the schema of the data - * @param file the file to write to - * @param mode file creation mode - * @param rowGroupSize the row group size - * @param maxPaddingSize the maximum padding - * @throws IOException if the file can not be created - * @deprecated will be removed in 2.0.0 - */ - @Deprecated - public ParquetFileWriter(Configuration configuration, MessageType schema, - Path file, Mode mode, long rowGroupSize, - int maxPaddingSize) - throws IOException { - this(HadoopOutputFile.fromPath(file, configuration), - schema, mode, rowGroupSize, maxPaddingSize); - } - - /** - * @param file OutputFile to create or overwrite - * @param schema the schema of the data - * @param mode file creation mode - * @param rowGroupSize the row group size - * @param maxPaddingSize the maximum padding - * @throws IOException if the file can not be created - * @deprecated will be removed in 2.0.0 - */ - @Deprecated - public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, - long rowGroupSize, int maxPaddingSize) - throws IOException { - this(file, schema, mode, rowGroupSize, maxPaddingSize, - ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, - ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, - ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); - } - - /** - * @param file OutputFile to create or overwrite - * @param schema the schema of the data - * @param mode file creation mode - * @param rowGroupSize the row group size - * @param maxPaddingSize the maximum padding - * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to - * @param statisticsTruncateLength the length which the min/max values in row groups tried to be truncated to - * @param pageWriteChecksumEnabled whether to write out page level checksums - * @throws IOException if the file can not be created - */ - public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, - long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength, - int statisticsTruncateLength, boolean pageWriteChecksumEnabled) - throws IOException{ - this(file, schema, mode, rowGroupSize, maxPaddingSize, columnIndexTruncateLength, - statisticsTruncateLength, pageWriteChecksumEnabled, null); - } - - public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, - long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength, - int statisticsTruncateLength, boolean pageWriteChecksumEnabled, - FileEncryptionProperties encryptionProperties) - throws IOException { - TypeUtil.checkValidWriteSchema(schema); - - this.schema = schema; - - long blockSize = rowGroupSize; - if (file.supportsBlockSize()) { - blockSize = Math.max(file.defaultBlockSize(), rowGroupSize); - this.alignment = PaddingAlignment.get(blockSize, rowGroupSize, maxPaddingSize); - } else { - this.alignment = NoAlignment.get(rowGroupSize); - } + STATE startBlock() throws IOException { + return error(); + } - if (mode == Mode.OVERWRITE) { - this.out = file.createOrOverwrite(blockSize); - } else { - this.out = file.create(blockSize); - } + STATE startColumn() throws IOException { + return error(); + } - this.encodingStatsBuilder = new EncodingStats.Builder(); - this.columnIndexTruncateLength = columnIndexTruncateLength; - this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; - this.crc = pageWriteChecksumEnabled ? new CRC32() : null; - - this.metadataConverter = new ParquetMetadataConverter(statisticsTruncateLength); - - if (null == encryptionProperties) { - this.fileEncryptor = null; - } else { - // Verify that every encrypted column is in file schema - Map columnEncryptionProperties = encryptionProperties.getEncryptedColumns(); - if (null != columnEncryptionProperties) { // if null, every column in file schema will be encrypted with footer key - for (Map.Entry entry : columnEncryptionProperties.entrySet()) { - String[] path = entry.getKey().toArray(); - if(!schema.containsPath(path)) { - throw new ParquetCryptoRuntimeException("Encrypted column " + Arrays.toString(path) + " not in file schema"); - } - } - } - this.fileEncryptor = new InternalFileEncryptor(encryptionProperties); - } + STATE write() throws IOException { + return error(); } - /** - * FOR TESTING ONLY. This supports testing block padding behavior on the local FS. - * - * @param configuration Hadoop configuration - * @param schema the schema of the data - * @param file the file to write to - * @param rowAndBlockSize the row group size - * @param maxPaddingSize the maximum padding - * @throws IOException if the file can not be created - */ - ParquetFileWriter(Configuration configuration, MessageType schema, - Path file, long rowAndBlockSize, int maxPaddingSize) - throws IOException { - FileSystem fs = file.getFileSystem(configuration); - this.schema = schema; - this.alignment = PaddingAlignment.get( - rowAndBlockSize, rowAndBlockSize, maxPaddingSize); - this.out = HadoopStreams.wrap( - fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize)); - this.encodingStatsBuilder = new EncodingStats.Builder(); - // no truncation is needed for testing - this.columnIndexTruncateLength = Integer.MAX_VALUE; - this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration); - this.crc = pageWriteChecksumEnabled ? new CRC32() : null; - this.metadataConverter = new ParquetMetadataConverter(ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH); - this.fileEncryptor = null; - } - /** - * start the file - * @throws IOException if there is an error while writing - */ - public void start() throws IOException { - state = state.start(); - LOG.debug("{}: start", out.getPos()); - byte[] magic = MAGIC; - if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) { - magic = EFMAGIC; - } - out.write(magic); - } - - InternalFileEncryptor getEncryptor() { - return fileEncryptor; - } - - /** - * start a block - * @param recordCount the record count in this block - * @throws IOException if there is an error while writing - */ - public void startBlock(long recordCount) throws IOException { - state = state.startBlock(); - LOG.debug("{}: start block", out.getPos()); -// out.write(MAGIC); // TODO: add a magic delimiter - - alignment.alignForRowGroup(out); - - currentBlock = new BlockMetaData(); - currentRecordCount = recordCount; - - currentColumnIndexes = new ArrayList<>(); - currentOffsetIndexes = new ArrayList<>(); - - currentBloomFilters = new HashMap<>(); - } - - /** - * start a column inside a block - * @param descriptor the column descriptor - * @param valueCount the value count in this column - * @param compressionCodecName a compression codec name - * @throws IOException if there is an error while writing - */ - public void startColumn(ColumnDescriptor descriptor, - long valueCount, - CompressionCodecName compressionCodecName) throws IOException { - state = state.startColumn(); - encodingStatsBuilder.clear(); - currentEncodings = new HashSet(); - currentChunkPath = ColumnPath.get(descriptor.getPath()); - currentChunkType = descriptor.getPrimitiveType(); - currentChunkCodec = compressionCodecName; - currentChunkValueCount = valueCount; - currentChunkFirstDataPage = -1; - compressedLength = 0; - uncompressedLength = 0; - // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one - currentStatistics = null; - - columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); - offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); - } - - /** - * writes a dictionary page page - * @param dictionaryPage the dictionary page - * @throws IOException if there is an error while writing - */ - public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - writeDictionaryPage(dictionaryPage, null, null); - } - - public void writeDictionaryPage(DictionaryPage dictionaryPage, - BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws IOException { - state = state.write(); - LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize()); - currentChunkDictionaryPageOffset = out.getPos(); - int uncompressedSize = dictionaryPage.getUncompressedSize(); - int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts - if (pageWriteChecksumEnabled) { - crc.reset(); - crc.update(dictionaryPage.getBytes().toByteArray()); - metadataConverter.writeDictionaryPageHeader( - uncompressedSize, - compressedPageSize, - dictionaryPage.getDictionarySize(), - dictionaryPage.getEncoding(), - (int) crc.getValue(), - out, - headerBlockEncryptor, - AAD); - } else { - metadataConverter.writeDictionaryPageHeader( - uncompressedSize, - compressedPageSize, - dictionaryPage.getDictionarySize(), - dictionaryPage.getEncoding(), - out, - headerBlockEncryptor, - AAD); - } - long headerSize = out.getPos() - currentChunkDictionaryPageOffset; - this.uncompressedLength += uncompressedSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize); - dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted - encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding()); - currentEncodings.add(dictionaryPage.getEncoding()); - } - - - /** - * writes a single page - * @param valueCount count of values - * @param uncompressedPageSize the size of the data once uncompressed - * @param bytes the compressed data for the page without header - * @param rlEncoding encoding of the repetition level - * @param dlEncoding encoding of the definition level - * @param valuesEncoding encoding of values - * @throws IOException if there is an error while writing - */ - @Deprecated - public void writeDataPage( - int valueCount, int uncompressedPageSize, - BytesInput bytes, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - state = state.write(); - // We are unable to build indexes without rowCount so skip them for this column - offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder(); - columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); - long beforeHeader = out.getPos(); - LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); - int compressedPageSize = (int)bytes.size(); - metadataConverter.writeDataPageV1Header( - uncompressedPageSize, compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - out); - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); - bytes.writeAllTo(out); - encodingStatsBuilder.addDataEncoding(valuesEncoding); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); - if (currentChunkFirstDataPage < 0) { - currentChunkFirstDataPage = beforeHeader; - } + STATE endColumn() throws IOException { + return error(); } - /** - * writes a single page - * @param valueCount count of values - * @param uncompressedPageSize the size of the data once uncompressed - * @param bytes the compressed data for the page without header - * @param statistics statistics for the page - * @param rlEncoding encoding of the repetition level - * @param dlEncoding encoding of the definition level - * @param valuesEncoding encoding of values - * @throws IOException if there is an error while writing - * @deprecated this method does not support writing column indexes; Use - * {@link #writeDataPage(int, int, BytesInput, Statistics, long, Encoding, Encoding, Encoding)} instead - */ - @Deprecated - public void writeDataPage( - int valueCount, int uncompressedPageSize, - BytesInput bytes, - Statistics statistics, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - // We are unable to build indexes without rowCount so skip them for this column - offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder(); - columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); - innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding); - } - - /** - * Writes a single page - * @param valueCount count of values - * @param uncompressedPageSize the size of the data once uncompressed - * @param bytes the compressed data for the page without header - * @param statistics the statistics of the page - * @param rowCount the number of rows in the page - * @param rlEncoding encoding of the repetition level - * @param dlEncoding encoding of the definition level - * @param valuesEncoding encoding of values - * @throws IOException if any I/O error occurs during writing the file - */ - public void writeDataPage( - int valueCount, int uncompressedPageSize, - BytesInput bytes, - Statistics statistics, - long rowCount, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - long beforeHeader = out.getPos(); - innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding); - - offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount); - } - - private void innerWriteDataPage( - int valueCount, int uncompressedPageSize, - BytesInput bytes, - Statistics statistics, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - state = state.write(); - long beforeHeader = out.getPos(); - if (currentChunkFirstDataPage < 0) { - currentChunkFirstDataPage = beforeHeader; - } - LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); - int compressedPageSize = (int) bytes.size(); - if (pageWriteChecksumEnabled) { - crc.reset(); - crc.update(bytes.toByteArray()); - metadataConverter.writeDataPageV1Header( - uncompressedPageSize, compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - (int) crc.getValue(), - out); - } else { - metadataConverter.writeDataPageV1Header( - uncompressedPageSize, compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - out); - } - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); - bytes.writeAllTo(out); - - // Copying the statistics if it is not initialized yet so we have the correct typed one - if (currentStatistics == null) { - currentStatistics = statistics.copy(); - } else { - currentStatistics.mergeStatistics(statistics); - } + STATE endBlock() throws IOException { + return error(); + } - columnIndexBuilder.add(statistics); - - encodingStatsBuilder.addDataEncoding(valuesEncoding); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); - } - - /** - * Add a Bloom filter that will be written out. This is only used in unit test. - * - * @param column the column name - * @param bloomFilter the bloom filter of column values - */ - void addBloomFilter(String column, BloomFilter bloomFilter) { - currentBloomFilters.put(column, bloomFilter); - } - - /** - * Writes a single v2 data page - * @param rowCount count of rows - * @param nullCount count of nulls - * @param valueCount count of values - * @param repetitionLevels repetition level bytes - * @param definitionLevels definition level bytes - * @param dataEncoding encoding for data - * @param compressedData compressed data bytes - * @param uncompressedDataSize the size of uncompressed data - * @param statistics the statistics of the page - * @throws IOException if any I/O error occurs during writing the file - */ - public void writeDataPageV2(int rowCount, int nullCount, int valueCount, - BytesInput repetitionLevels, - BytesInput definitionLevels, - Encoding dataEncoding, - BytesInput compressedData, - int uncompressedDataSize, - Statistics statistics) throws IOException { - state = state.write(); - int rlByteLength = toIntWithCheck(repetitionLevels.size()); - int dlByteLength = toIntWithCheck(definitionLevels.size()); - - int compressedSize = toIntWithCheck( - compressedData.size() + repetitionLevels.size() + definitionLevels.size() - ); - - int uncompressedSize = toIntWithCheck( - uncompressedDataSize + repetitionLevels.size() + definitionLevels.size() - ); - - long beforeHeader = out.getPos(); - if (currentChunkFirstDataPage < 0) { - currentChunkFirstDataPage = beforeHeader; - } + STATE end() throws IOException { + return error(); + } - metadataConverter.writeDataPageV2Header( - uncompressedSize, compressedSize, - valueCount, nullCount, rowCount, - dataEncoding, - rlByteLength, - dlByteLength, - out); - - long headersSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedSize + headersSize; - this.compressedLength += compressedSize + headersSize; - - if (currentStatistics == null) { - currentStatistics = statistics.copy(); - } else { - currentStatistics.mergeStatistics(statistics); - } + private final STATE error() throws IOException { + throw new IOException( + "The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + + this.name()); + } + } + + private STATE state = STATE.NOT_STARTED; + + /** + * @param configuration Hadoop configuration + * @param schema the schema of the data + * @param file the file to write to + * @throws IOException if the file can not be created + * @deprecated will be removed in 2.0.0 + */ + @Deprecated + public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException { + this( + HadoopOutputFile.fromPath(file, configuration), + schema, + Mode.CREATE, + DEFAULT_BLOCK_SIZE, + MAX_PADDING_SIZE_DEFAULT); + } + + /** + * @param configuration Hadoop configuration + * @param schema the schema of the data + * @param file the file to write to + * @param mode file creation mode + * @throws IOException if the file can not be created + * @deprecated will be removed in 2.0.0 + */ + @Deprecated + public ParquetFileWriter(Configuration configuration, MessageType schema, Path file, Mode mode) throws IOException { + this( + HadoopOutputFile.fromPath(file, configuration), + schema, + mode, + DEFAULT_BLOCK_SIZE, + MAX_PADDING_SIZE_DEFAULT); + } + + /** + * @param configuration Hadoop configuration + * @param schema the schema of the data + * @param file the file to write to + * @param mode file creation mode + * @param rowGroupSize the row group size + * @param maxPaddingSize the maximum padding + * @throws IOException if the file can not be created + * @deprecated will be removed in 2.0.0 + */ + @Deprecated + public ParquetFileWriter( + Configuration configuration, + MessageType schema, + Path file, + Mode mode, + long rowGroupSize, + int maxPaddingSize) + throws IOException { + this(HadoopOutputFile.fromPath(file, configuration), schema, mode, rowGroupSize, maxPaddingSize); + } + + /** + * @param file OutputFile to create or overwrite + * @param schema the schema of the data + * @param mode file creation mode + * @param rowGroupSize the row group size + * @param maxPaddingSize the maximum padding + * @throws IOException if the file can not be created + * @deprecated will be removed in 2.0.0 + */ + @Deprecated + public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, long rowGroupSize, int maxPaddingSize) + throws IOException { + this( + file, + schema, + mode, + rowGroupSize, + maxPaddingSize, + ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, + ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, + ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); + } + + /** + * @param file OutputFile to create or overwrite + * @param schema the schema of the data + * @param mode file creation mode + * @param rowGroupSize the row group size + * @param maxPaddingSize the maximum padding + * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to + * @param statisticsTruncateLength the length which the min/max values in row groups tried to be truncated to + * @param pageWriteChecksumEnabled whether to write out page level checksums + * @throws IOException if the file can not be created + */ + public ParquetFileWriter( + OutputFile file, + MessageType schema, + Mode mode, + long rowGroupSize, + int maxPaddingSize, + int columnIndexTruncateLength, + int statisticsTruncateLength, + boolean pageWriteChecksumEnabled) + throws IOException { + this( + file, + schema, + mode, + rowGroupSize, + maxPaddingSize, + columnIndexTruncateLength, + statisticsTruncateLength, + pageWriteChecksumEnabled, + null, + null, + null); + } + + public ParquetFileWriter( + OutputFile file, + MessageType schema, + Mode mode, + long rowGroupSize, + int maxPaddingSize, + int columnIndexTruncateLength, + int statisticsTruncateLength, + boolean pageWriteChecksumEnabled, + FileEncryptionProperties encryptionProperties) + throws IOException { + this( + file, + schema, + mode, + rowGroupSize, + maxPaddingSize, + columnIndexTruncateLength, + statisticsTruncateLength, + pageWriteChecksumEnabled, + encryptionProperties, + null, + null); + } + + public ParquetFileWriter( + OutputFile file, + MessageType schema, + Mode mode, + long rowGroupSize, + int maxPaddingSize, + FileEncryptionProperties encryptionProperties, + ParquetProperties props) + throws IOException { + this( + file, + schema, + mode, + rowGroupSize, + maxPaddingSize, + props.getColumnIndexTruncateLength(), + props.getStatisticsTruncateLength(), + props.getPageWriteChecksumEnabled(), + encryptionProperties, + null, + props.getAllocator()); + } + + @Deprecated + public ParquetFileWriter( + OutputFile file, + MessageType schema, + Mode mode, + long rowGroupSize, + int maxPaddingSize, + int columnIndexTruncateLength, + int statisticsTruncateLength, + boolean pageWriteChecksumEnabled, + InternalFileEncryptor encryptor) + throws IOException { + this( + file, + schema, + mode, + rowGroupSize, + maxPaddingSize, + columnIndexTruncateLength, + statisticsTruncateLength, + pageWriteChecksumEnabled, + null, + encryptor, + null); + } + + private ParquetFileWriter( + OutputFile file, + MessageType schema, + Mode mode, + long rowGroupSize, + int maxPaddingSize, + int columnIndexTruncateLength, + int statisticsTruncateLength, + boolean pageWriteChecksumEnabled, + FileEncryptionProperties encryptionProperties, + InternalFileEncryptor encryptor, + ByteBufferAllocator allocator) + throws IOException { + TypeUtil.checkValidWriteSchema(schema); + + this.schema = schema; + + long blockSize = rowGroupSize; + if (file.supportsBlockSize()) { + blockSize = Math.max(file.defaultBlockSize(), rowGroupSize); + this.alignment = PaddingAlignment.get(blockSize, rowGroupSize, maxPaddingSize); + } else { + this.alignment = NoAlignment.get(rowGroupSize); + } - columnIndexBuilder.add(statistics); - currentEncodings.add(dataEncoding); - encodingStatsBuilder.addDataEncoding(dataEncoding); - - BytesInput.concat(repetitionLevels, definitionLevels, compressedData) - .writeAllTo(out); - - offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount); - } - - /** - * Writes a column chunk at once - * @param descriptor the descriptor of the column - * @param valueCount the value count in this column - * @param compressionCodecName the name of the compression codec used for compressing the pages - * @param dictionaryPage the dictionary page for this column chunk (might be null) - * @param bytes the encoded pages including page headers to be written as is - * @param uncompressedTotalPageSize total uncompressed size (without page headers) - * @param compressedTotalPageSize total compressed size (without page headers) - * @param totalStats accumulated statistics for the column chunk - * @param columnIndexBuilder the builder object for the column index - * @param offsetIndexBuilder the builder object for the offset index - * @param bloomFilter the bloom filter for this column - * @param rlEncodings the RL encodings used in this column chunk - * @param dlEncodings the DL encodings used in this column chunk - * @param dataEncodings the data encodings used in this column chunk - * @throws IOException if there is an error while writing - */ - void writeColumnChunk(ColumnDescriptor descriptor, - long valueCount, - CompressionCodecName compressionCodecName, - DictionaryPage dictionaryPage, - BytesInput bytes, - long uncompressedTotalPageSize, - long compressedTotalPageSize, - Statistics totalStats, - ColumnIndexBuilder columnIndexBuilder, - OffsetIndexBuilder offsetIndexBuilder, - BloomFilter bloomFilter, - Set rlEncodings, - Set dlEncodings, - List dataEncodings) throws IOException { - writeColumnChunk(descriptor, valueCount, compressionCodecName, dictionaryPage, bytes, - uncompressedTotalPageSize, compressedTotalPageSize, totalStats, columnIndexBuilder, offsetIndexBuilder, - bloomFilter, rlEncodings, dlEncodings, dataEncodings, null, 0, 0, null); - } - - void writeColumnChunk(ColumnDescriptor descriptor, - long valueCount, - CompressionCodecName compressionCodecName, - DictionaryPage dictionaryPage, - BytesInput bytes, - long uncompressedTotalPageSize, - long compressedTotalPageSize, - Statistics totalStats, - ColumnIndexBuilder columnIndexBuilder, - OffsetIndexBuilder offsetIndexBuilder, - BloomFilter bloomFilter, - Set rlEncodings, - Set dlEncodings, - List dataEncodings, - BlockCipher.Encryptor headerBlockEncryptor, - int rowGroupOrdinal, - int columnOrdinal, - byte[] fileAAD) throws IOException { - startColumn(descriptor, valueCount, compressionCodecName); - - state = state.write(); - if (dictionaryPage != null) { - byte[] dictonaryPageHeaderAAD = null; - if (null != headerBlockEncryptor) { - dictonaryPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader, - rowGroupOrdinal, columnOrdinal, -1); - } - writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD); - } + if (mode == Mode.OVERWRITE) { + this.out = file.createOrOverwrite(blockSize); + } else { + this.out = file.create(blockSize); + } - if (bloomFilter != null) { - // write bloom filter if one of data pages is not dictionary encoded - boolean isWriteBloomFilter = false; - for (Encoding encoding : dataEncodings) { - if (encoding != Encoding.RLE_DICTIONARY) { - isWriteBloomFilter = true; - break; - } - } - if (isWriteBloomFilter) { - currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); - } - } - LOG.debug("{}: write data pages", out.getPos()); - long headersSize = bytes.size() - compressedTotalPageSize; - this.uncompressedLength += uncompressedTotalPageSize + headersSize; - this.compressedLength += compressedTotalPageSize + headersSize; - LOG.debug("{}: write data pages content", out.getPos()); - currentChunkFirstDataPage = out.getPos(); - bytes.writeAllTo(out); - encodingStatsBuilder.addDataEncodings(dataEncodings); - if (rlEncodings.isEmpty()) { - encodingStatsBuilder.withV2Pages(); - } - currentEncodings.addAll(rlEncodings); - currentEncodings.addAll(dlEncodings); - currentEncodings.addAll(dataEncodings); - currentStatistics = totalStats; - - this.columnIndexBuilder = columnIndexBuilder; - this.offsetIndexBuilder = offsetIndexBuilder; - - endColumn(); - } - - /** - * end a column (once all rep, def and data have been written) - * @throws IOException if there is an error while writing - */ - public void endColumn() throws IOException { - state = state.endColumn(); - LOG.debug("{}: end column", out.getPos()); - if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) { - currentColumnIndexes.add(null); - } else { - currentColumnIndexes.add(columnIndexBuilder.build()); - } - currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage)); - currentBlock.addColumn(ColumnChunkMetaData.get( - currentChunkPath, - currentChunkType, - currentChunkCodec, - encodingStatsBuilder.build(), - currentEncodings, - currentStatistics, - currentChunkFirstDataPage, - currentChunkDictionaryPageOffset, - currentChunkValueCount, - compressedLength, - uncompressedLength)); - this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); - this.uncompressedLength = 0; - this.compressedLength = 0; - columnIndexBuilder = null; - offsetIndexBuilder = null; - } - - /** - * ends a block once all column chunks have been written - * @throws IOException if there is an error while writing - */ - public void endBlock() throws IOException { -// if (currentRecordCount == 0) { -// throw new ParquetEncodingException("End block with zero record"); -// } - - state = state.endBlock(); - LOG.debug("{}: end block", out.getPos()); - currentBlock.setRowCount(currentRecordCount); - currentBlock.setOrdinal(blocks.size()); - blocks.add(currentBlock); - columnIndexes.add(currentColumnIndexes); - offsetIndexes.add(currentOffsetIndexes); - bloomFilters.add(currentBloomFilters); - currentColumnIndexes = null; - currentOffsetIndexes = null; - currentBloomFilters = null; - currentBlock = null; - } - - /** - * @param conf a configuration - * @param file a file path to append the contents of to this file - * @throws IOException if there is an error while reading or writing - * @deprecated will be removed in 2.0.0; use {@link #appendFile(InputFile)} instead - */ - @Deprecated - public void appendFile(Configuration conf, Path file) throws IOException { - ParquetFileReader.open(conf, file).appendTo(this); - } - - public void appendFile(InputFile file) throws IOException { - try (ParquetFileReader reader = ParquetFileReader.open(file)) { - reader.appendTo(this); - } + this.encodingStatsBuilder = new EncodingStats.Builder(); + this.columnIndexTruncateLength = columnIndexTruncateLength; + this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; + this.crc = pageWriteChecksumEnabled ? new CRC32() : null; + this.crcAllocator = pageWriteChecksumEnabled + ? ReusingByteBufferAllocator.strict(allocator == null ? new HeapByteBufferAllocator() : allocator) + : null; + + this.metadataConverter = new ParquetMetadataConverter(statisticsTruncateLength); + + if (null == encryptionProperties && null == encryptor) { + this.fileEncryptor = null; + return; } - /** - * @param file a file stream to read from - * @param rowGroups row groups to copy - * @param dropColumns whether to drop columns from the file that are not in this file's schema - * @throws IOException if there is an error while reading or writing - * @deprecated will be removed in 2.0.0; - * use {@link #appendRowGroups(SeekableInputStream,List,boolean)} instead - */ - @Deprecated - public void appendRowGroups(FSDataInputStream file, - List rowGroups, - boolean dropColumns) throws IOException { - appendRowGroups(HadoopStreams.wrap(file), rowGroups, dropColumns); - } - - public void appendRowGroups(SeekableInputStream file, - List rowGroups, - boolean dropColumns) throws IOException { - for (BlockMetaData block : rowGroups) { - appendRowGroup(file, block, dropColumns); - } + if (null == encryptionProperties) { + encryptionProperties = encryptor.getEncryptionProperties(); } - /** - * @param from a file stream to read from - * @param rowGroup row group to copy - * @param dropColumns whether to drop columns from the file that are not in this file's schema - * @throws IOException if there is an error while reading or writing - * @deprecated will be removed in 2.0.0; - * use {@link #appendRowGroup(SeekableInputStream,BlockMetaData,boolean)} instead - */ - @Deprecated - public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup, - boolean dropColumns) throws IOException { - appendRowGroup(HadoopStreams.wrap(from), rowGroup, dropColumns); - } - - public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, - boolean dropColumns) throws IOException { - startBlock(rowGroup.getRowCount()); - - Map columnsToCopy = - new HashMap(); - for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { - columnsToCopy.put(chunk.getPath().toDotString(), chunk); + // Verify that every encrypted column is in file schema + Map columnEncryptionProperties = + encryptionProperties.getEncryptedColumns(); + if (null != columnEncryptionProperties) { // if null, every column in file schema will be encrypted with footer + // key + for (Map.Entry entry : columnEncryptionProperties.entrySet()) { + String[] path = entry.getKey().toArray(); + if (!schema.containsPath(path)) { + StringBuilder columnList = new StringBuilder(); + columnList.append("["); + for (String[] columnPath : schema.getPaths()) { + columnList + .append(ColumnPath.get(columnPath).toDotString()) + .append("], ["); + } + throw new ParquetCryptoRuntimeException( + "Encrypted column [" + entry.getKey().toDotString() + "] not in file schema column list: " + + columnList.substring(0, columnList.length() - 3)); } + } + } - List columnsInOrder = - new ArrayList(); - - for (ColumnDescriptor descriptor : schema.getColumns()) { - String path = ColumnPath.get(descriptor.getPath()).toDotString(); - ColumnChunkMetaData chunk = columnsToCopy.remove(path); - if (chunk != null) { - columnsInOrder.add(chunk); - } else { - throw new IllegalArgumentException(String.format( - "Missing column '%s', cannot copy row group: %s", path, rowGroup)); - } - } + if (null == encryptor) { + this.fileEncryptor = new InternalFileEncryptor(encryptionProperties); + } else { + this.fileEncryptor = encryptor; + } + } + + /** + * FOR TESTING ONLY. This supports testing block padding behavior on the local FS. + * + * @param configuration Hadoop configuration + * @param schema the schema of the data + * @param file the file to write to + * @param rowAndBlockSize the row group size + * @param maxPaddingSize the maximum padding + * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to + * @param allocator allocator to potentially allocate {@link java.nio.ByteBuffer} objects + * @throws IOException if the file can not be created + */ + ParquetFileWriter( + Configuration configuration, + MessageType schema, + Path file, + long rowAndBlockSize, + int maxPaddingSize, + int columnIndexTruncateLength, + ByteBufferAllocator allocator) + throws IOException { + FileSystem fs = file.getFileSystem(configuration); + this.schema = schema; + this.alignment = PaddingAlignment.get(rowAndBlockSize, rowAndBlockSize, maxPaddingSize); + this.out = HadoopStreams.wrap(fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize)); + this.encodingStatsBuilder = new EncodingStats.Builder(); + this.columnIndexTruncateLength = columnIndexTruncateLength; + this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration); + this.crc = pageWriteChecksumEnabled ? new CRC32() : null; + this.crcAllocator = pageWriteChecksumEnabled + ? ReusingByteBufferAllocator.strict(allocator == null ? new HeapByteBufferAllocator() : allocator) + : null; + this.metadataConverter = new ParquetMetadataConverter(ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH); + this.fileEncryptor = null; + } + + /** + * start the file + * + * @throws IOException if there is an error while writing + */ + public void start() throws IOException { + state = state.start(); + LOG.debug("{}: start", out.getPos()); + byte[] magic = MAGIC; + if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) { + magic = EFMAGIC; + } + out.write(magic); + } + + public InternalFileEncryptor getEncryptor() { + return fileEncryptor; + } + + /** + * start a block + * + * @param recordCount the record count in this block + * @throws IOException if there is an error while writing + */ + public void startBlock(long recordCount) throws IOException { + state = state.startBlock(); + LOG.debug("{}: start block", out.getPos()); + // out.write(MAGIC); // TODO: add a magic delimiter + + alignment.alignForRowGroup(out); + + currentBlock = new BlockMetaData(); + currentRecordCount = recordCount; + + currentColumnIndexes = new ArrayList<>(); + currentOffsetIndexes = new ArrayList<>(); + + currentBloomFilters = new HashMap<>(); + } + + /** + * start a column inside a block + * + * @param descriptor the column descriptor + * @param valueCount the value count in this column + * @param compressionCodecName a compression codec name + * @throws IOException if there is an error while writing + */ + public void startColumn(ColumnDescriptor descriptor, long valueCount, CompressionCodecName compressionCodecName) + throws IOException { + state = state.startColumn(); + encodingStatsBuilder.clear(); + currentEncodings = new HashSet(); + currentChunkPath = ColumnPath.get(descriptor.getPath()); + currentChunkType = descriptor.getPrimitiveType(); + currentChunkCodec = compressionCodecName; + currentChunkValueCount = valueCount; + currentChunkFirstDataPage = -1; + compressedLength = 0; + uncompressedLength = 0; + // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one + currentStatistics = null; + currentSizeStatistics = SizeStatistics.newBuilder( + descriptor.getPrimitiveType(), + descriptor.getMaxRepetitionLevel(), + descriptor.getMaxDefinitionLevel()) + .build(); + + columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); + offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + } + + /** + * writes a dictionary page + * + * @param dictionaryPage the dictionary page + * @throws IOException if there is an error while writing + */ + public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { + writeDictionaryPage(dictionaryPage, null, null); + } + + public void writeDictionaryPage( + DictionaryPage dictionaryPage, BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws IOException { + state = state.write(); + LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize()); + currentChunkDictionaryPageOffset = out.getPos(); + int uncompressedSize = dictionaryPage.getUncompressedSize(); + int compressedPageSize = Math.toIntExact(dictionaryPage.getBytes().size()); + if (pageWriteChecksumEnabled) { + crc.reset(); + crcUpdate(dictionaryPage.getBytes()); + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + (int) crc.getValue(), + out, + headerBlockEncryptor, + AAD); + } else { + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + out, + headerBlockEncryptor, + AAD); + } + long headerSize = out.getPos() - currentChunkDictionaryPageOffset; + this.uncompressedLength += uncompressedSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize); + dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted + encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding()); + currentEncodings.add(dictionaryPage.getEncoding()); + } + + /** + * writes a single page + * + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @throws IOException if there is an error while writing + */ + @Deprecated + public void writeDataPage( + int valueCount, + int uncompressedPageSize, + BytesInput bytes, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) + throws IOException { + state = state.write(); + // We are unable to build indexes without rowCount so skip them for this column + offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder(); + columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); + long beforeHeader = out.getPos(); + LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); + int compressedPageSize = toIntWithCheck(bytes.size(), "page"); + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, compressedPageSize, valueCount, rlEncoding, dlEncoding, valuesEncoding, out); + long headerSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedPageSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); + bytes.writeAllTo(out); + encodingStatsBuilder.addDataEncoding(valuesEncoding); + currentEncodings.add(rlEncoding); + currentEncodings.add(dlEncoding); + currentEncodings.add(valuesEncoding); + if (currentChunkFirstDataPage < 0) { + currentChunkFirstDataPage = beforeHeader; + } + } + + /** + * writes a single page + * + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param statistics statistics for the page + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @throws IOException if there is an error while writing + * @deprecated this method does not support writing column indexes; Use + * {@link #writeDataPage(int, int, BytesInput, Statistics, long, Encoding, Encoding, Encoding)} instead + */ + @Deprecated + public void writeDataPage( + int valueCount, + int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) + throws IOException { + // We are unable to build indexes without rowCount so skip them for this column + offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder(); + columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); + innerWriteDataPage( + valueCount, + uncompressedPageSize, + bytes, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + null, + null, + null); + } + + /** + * Writes a single page + * + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param statistics the statistics of the page + * @param rowCount the number of rows in the page + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPage( + int valueCount, + int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + long rowCount, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) + throws IOException { + writeDataPage( + valueCount, + uncompressedPageSize, + bytes, + statistics, + rowCount, + rlEncoding, + dlEncoding, + valuesEncoding, + null, + null, + null); + } + + /** + * Writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param statistics the statistics of the page + * @param rowCount the number of rows in the page + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPage( + int valueCount, + int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + long rowCount, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD) + throws IOException { + writeDataPage( + valueCount, + uncompressedPageSize, + bytes, + statistics, + rowCount, + rlEncoding, + dlEncoding, + valuesEncoding, + metadataBlockEncryptor, + pageHeaderAAD, + null); + } + + /** + * Writes a single page + * + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param statistics the statistics of the page + * @param rowCount the number of rows in the page + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @param sizeStatistics size statistics for the page + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPage( + int valueCount, + int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + long rowCount, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) + throws IOException { + long beforeHeader = out.getPos(); + innerWriteDataPage( + valueCount, + uncompressedPageSize, + bytes, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + metadataBlockEncryptor, + pageHeaderAAD, + sizeStatistics); + offsetIndexBuilder.add( + toIntWithCheck(out.getPos() - beforeHeader, "page"), + rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + } + + private void innerWriteDataPage( + int valueCount, + int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) + throws IOException { + writeDataPage( + valueCount, + uncompressedPageSize, + bytes, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + metadataBlockEncryptor, + pageHeaderAAD, + sizeStatistics); + } + + /** + * writes a single page + * + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param statistics statistics for the page + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @throws IOException if there is an error while writing + */ + public void writeDataPage( + int valueCount, + int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD) + throws IOException { + writeDataPage( + valueCount, + uncompressedPageSize, + bytes, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + metadataBlockEncryptor, + pageHeaderAAD, + null); + } + + /** + * writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param statistics statistics for the page + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @param sizeStatistics size statistics for the page + * @throws IOException if there is an error while writing + */ + public void writeDataPage( + int valueCount, + int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) + throws IOException { + state = state.write(); + long beforeHeader = out.getPos(); + if (currentChunkFirstDataPage < 0) { + currentChunkFirstDataPage = beforeHeader; + } + LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); + int compressedPageSize = toIntWithCheck(bytes.size(), "page"); + if (pageWriteChecksumEnabled) { + crc.reset(); + crcUpdate(bytes); + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, + compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + (int) crc.getValue(), + out, + metadataBlockEncryptor, + pageHeaderAAD); + } else { + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, + compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + out, + metadataBlockEncryptor, + pageHeaderAAD); + } + long headerSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedPageSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); + bytes.writeAllTo(out); + + mergeColumnStatistics(statistics, sizeStatistics); + + encodingStatsBuilder.addDataEncoding(valuesEncoding); + currentEncodings.add(rlEncoding); + currentEncodings.add(dlEncoding); + currentEncodings.add(valuesEncoding); + } + + /** + * Add a Bloom filter that will be written out. + * + * @param column the column name + * @param bloomFilter the bloom filter of column values + */ + public void addBloomFilter(String column, BloomFilter bloomFilter) { + currentBloomFilters.put(column, bloomFilter); + } + + /** + * Writes a single v2 data page + * + * @param rowCount count of rows + * @param nullCount count of nulls + * @param valueCount count of values + * @param repetitionLevels repetition level bytes + * @param definitionLevels definition level bytes + * @param dataEncoding encoding for data + * @param compressedData compressed data bytes + * @param uncompressedDataSize the size of uncompressed data + * @param statistics the statistics of the page + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPageV2( + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput compressedData, + int uncompressedDataSize, + Statistics statistics) + throws IOException { + writeDataPageV2( + rowCount, + nullCount, + valueCount, + repetitionLevels, + definitionLevels, + dataEncoding, + compressedData, + uncompressedDataSize, + statistics, + null, + null, + null); + } + + /** + * Writes a single v2 data page + * + * @param rowCount count of rows + * @param nullCount count of nulls + * @param valueCount count of values + * @param repetitionLevels repetition level bytes + * @param definitionLevels definition level bytes + * @param dataEncoding encoding for data + * @param compressedData compressed data bytes + * @param uncompressedDataSize the size of uncompressed data + * @param statistics the statistics of the page + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPageV2( + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput compressedData, + int uncompressedDataSize, + Statistics statistics, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD) + throws IOException { + writeDataPageV2( + rowCount, + nullCount, + valueCount, + repetitionLevels, + definitionLevels, + dataEncoding, + compressedData, + uncompressedDataSize, + statistics, + metadataBlockEncryptor, + pageHeaderAAD, + null); + } + + /** + * Writes a single v2 data page + * + * @param rowCount count of rows + * @param nullCount count of nulls + * @param valueCount count of values + * @param repetitionLevels repetition level bytes + * @param definitionLevels definition level bytes + * @param dataEncoding encoding for data + * @param compressedData compressed data bytes + * @param uncompressedDataSize the size of uncompressed data + * @param statistics the statistics of the page + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @param sizeStatistics size statistics for the page + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPageV2( + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput compressedData, + int uncompressedDataSize, + Statistics statistics, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) + throws IOException { + state = state.write(); + int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels"); + int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels"); + + int compressedSize = + toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size(), "page"); + + int uncompressedSize = + toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page"); + + long beforeHeader = out.getPos(); + if (currentChunkFirstDataPage < 0) { + currentChunkFirstDataPage = beforeHeader; + } - // complain if some columns would be dropped and that's not okay - if (!dropColumns && !columnsToCopy.isEmpty()) { - throw new IllegalArgumentException(String.format( - "Columns cannot be copied (missing from target schema): %s", - String.join(", ", columnsToCopy.keySet()))); - } + if (pageWriteChecksumEnabled) { + crc.reset(); + if (repetitionLevels.size() > 0) { + crcUpdate(repetitionLevels); + } + if (definitionLevels.size() > 0) { + crcUpdate(definitionLevels); + } + if (compressedData.size() > 0) { + crcUpdate(compressedData); + } + metadataConverter.writeDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + (int) crc.getValue(), + out, + metadataBlockEncryptor, + pageHeaderAAD); + } else { + metadataConverter.writeDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + out, + metadataBlockEncryptor, + pageHeaderAAD); + } - // copy the data for all chunks - long start = -1; - long length = 0; - long blockUncompressedSize = 0L; - for (int i = 0; i < columnsInOrder.size(); i += 1) { - ColumnChunkMetaData chunk = columnsInOrder.get(i); - - // get this chunk's start position in the new file - long newChunkStart = out.getPos() + length; - - // add this chunk to be copied with any previous chunks - if (start < 0) { - // no previous chunk included, start at this chunk's starting pos - start = chunk.getStartingPos(); - } - length += chunk.getTotalSize(); - - if ((i + 1) == columnsInOrder.size() || - columnsInOrder.get(i + 1).getStartingPos() != (start + length)) { - // not contiguous. do the copy now. - copy(from, out, start, length); - // reset to start at the next column chunk - start = -1; - length = 0; - } - - // TODO: column/offset indexes are not copied - // (it would require seeking to the end of the file for each row groups) - currentColumnIndexes.add(null); - currentOffsetIndexes.add(null); - - Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); - currentBlock.addColumn(ColumnChunkMetaData.get( - chunk.getPath(), - chunk.getPrimitiveType(), - chunk.getCodec(), - chunk.getEncodingStats(), - chunk.getEncodings(), - chunk.getStatistics(), - offsets.firstDataPageOffset, - offsets.dictionaryPageOffset, - chunk.getValueCount(), - chunk.getTotalSize(), - chunk.getTotalUncompressedSize())); - - blockUncompressedSize += chunk.getTotalUncompressedSize(); - } + long headersSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedSize + headersSize; + this.compressedLength += compressedSize + headersSize; - currentBlock.setTotalByteSize(blockUncompressedSize); + mergeColumnStatistics(statistics, sizeStatistics); - endBlock(); - } + currentEncodings.add(dataEncoding); + encodingStatsBuilder.addDataEncoding(dataEncoding); - /** - * @param descriptor the descriptor for the target column - * @param from a file stream to read from - * @param chunk the column chunk to be copied - * @param bloomFilter the bloomFilter for this chunk - * @param columnIndex the column index for this chunk - * @param offsetIndex the offset index for this chunk - * @throws IOException - */ - public void appendColumnChunk(ColumnDescriptor descriptor, SeekableInputStream from, ColumnChunkMetaData chunk, - BloomFilter bloomFilter, ColumnIndex columnIndex, OffsetIndex offsetIndex) throws IOException { - long start = chunk.getStartingPos(); - long length = chunk.getTotalSize(); - long newChunkStart = out.getPos(); + BytesInput.concat(repetitionLevels, definitionLevels, compressedData).writeAllTo(out); - copy(from, out, start, length); + offsetIndexBuilder.add( + toIntWithCheck(out.getPos() - beforeHeader, "page"), + rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + } - currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); - currentColumnIndexes.add(columnIndex); - currentOffsetIndexes.add(offsetIndex); - - Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); - currentBlock.addColumn(ColumnChunkMetaData.get( - chunk.getPath(), - chunk.getPrimitiveType(), - chunk.getCodec(), - chunk.getEncodingStats(), - chunk.getEncodings(), - chunk.getStatistics(), - offsets.firstDataPageOffset, - offsets.dictionaryPageOffset, - chunk.getValueCount(), - chunk.getTotalSize(), - chunk.getTotalUncompressedSize())); - - currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); - } - - // Buffers for the copy function. - private static final ThreadLocal COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]); - - /** - * Copy from a FS input stream to an output stream. Thread-safe - * - * @param from a {@link SeekableInputStream} - * @param to any {@link PositionOutputStream} - * @param start where in the from stream to start copying - * @param length the number of bytes to copy - * @throws IOException if there is an error while reading or writing - */ - private static void copy(SeekableInputStream from, PositionOutputStream to, - long start, long length) throws IOException{ - LOG.debug("Copying {} bytes at {} to {}", length, start, to.getPos()); - from.seek(start); - long bytesCopied = 0; - byte[] buffer = COPY_BUFFER.get(); - while (bytesCopied < length) { - long bytesLeft = length - bytesCopied; - int bytesRead = from.read(buffer, 0, - (buffer.length < bytesLeft ? buffer.length : (int) bytesLeft)); - if (bytesRead < 0) { - throw new IllegalArgumentException( - "Unexpected end of input file at " + start + bytesCopied); - } - to.write(buffer, 0, bytesRead); - bytesCopied += bytesRead; - } + private void crcUpdate(BytesInput bytes) { + try (ByteBufferReleaser releaser = crcAllocator.getReleaser()) { + crc.update(bytes.toByteBuffer(releaser)); } - - /** - * ends a file once all blocks have been written. - * closes the file. - * @param extraMetaData the extra meta data to write in the footer - * @throws IOException if there is an error while writing - */ - public void end(Map extraMetaData) throws IOException { - state = state.end(); - serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor); - serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor); - serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor); - LOG.debug("{}: end", out.getPos()); - this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); - serializeFooter(footer, out, fileEncryptor); - out.close(); - } - - private static void serializeColumnIndexes( - List> columnIndexes, - List blocks, - PositionOutputStream out, - InternalFileEncryptor fileEncryptor) throws IOException { - LOG.debug("{}: column indexes", out.getPos()); - for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) { - BlockMetaData block = blocks.get(bIndex); - List columns = block.getColumns(); - List blockColumnIndexes = columnIndexes.get(bIndex); - for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) { - ColumnChunkMetaData column = columns.get(cIndex); - org.apache.parquet.format.ColumnIndex columnIndex = ParquetMetadataConverter - .toParquetColumnIndex(column.getPrimitiveType(), blockColumnIndexes.get(cIndex)); - if (columnIndex == null) { - continue; - } - BlockCipher.Encryptor columnIndexEncryptor = null; - byte[] columnIndexAAD = null; - if (null != fileEncryptor) { - InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex); - if (columnEncryptionSetup.isEncrypted()) { - columnIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor(); - columnIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.ColumnIndex, - block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1); - } - } - long offset = out.getPos(); - Util.writeColumnIndex(columnIndex, out, columnIndexEncryptor, columnIndexAAD); - column.setColumnIndexReference(new IndexReference(offset, (int) (out.getPos() - offset))); - } - } + } + + /** + * Writes a column chunk at once + * + * @param descriptor the descriptor of the column + * @param valueCount the value count in this column + * @param compressionCodecName the name of the compression codec used for compressing the pages + * @param dictionaryPage the dictionary page for this column chunk (might be null) + * @param bytes the encoded pages including page headers to be written as is + * @param uncompressedTotalPageSize total uncompressed size (without page headers) + * @param compressedTotalPageSize total compressed size (without page headers) + * @param totalStats accumulated statistics for the column chunk + * @param columnIndexBuilder the builder object for the column index + * @param offsetIndexBuilder the builder object for the offset index + * @param bloomFilter the bloom filter for this column + * @param rlEncodings the RL encodings used in this column chunk + * @param dlEncodings the DL encodings used in this column chunk + * @param dataEncodings the data encodings used in this column chunk + * @throws IOException if there is an error while writing + */ + void writeColumnChunk( + ColumnDescriptor descriptor, + long valueCount, + CompressionCodecName compressionCodecName, + DictionaryPage dictionaryPage, + BytesInput bytes, + long uncompressedTotalPageSize, + long compressedTotalPageSize, + Statistics totalStats, + SizeStatistics totalSizeStats, + ColumnIndexBuilder columnIndexBuilder, + OffsetIndexBuilder offsetIndexBuilder, + BloomFilter bloomFilter, + Set rlEncodings, + Set dlEncodings, + List dataEncodings) + throws IOException { + writeColumnChunk( + descriptor, + valueCount, + compressionCodecName, + dictionaryPage, + bytes, + uncompressedTotalPageSize, + compressedTotalPageSize, + totalStats, + totalSizeStats, + columnIndexBuilder, + offsetIndexBuilder, + bloomFilter, + rlEncodings, + dlEncodings, + dataEncodings, + null, + 0, + 0, + null); + } + + void writeColumnChunk( + ColumnDescriptor descriptor, + long valueCount, + CompressionCodecName compressionCodecName, + DictionaryPage dictionaryPage, + BytesInput bytes, + long uncompressedTotalPageSize, + long compressedTotalPageSize, + Statistics totalStats, + SizeStatistics totalSizeStats, + ColumnIndexBuilder columnIndexBuilder, + OffsetIndexBuilder offsetIndexBuilder, + BloomFilter bloomFilter, + Set rlEncodings, + Set dlEncodings, + List dataEncodings, + BlockCipher.Encryptor headerBlockEncryptor, + int rowGroupOrdinal, + int columnOrdinal, + byte[] fileAAD) + throws IOException { + startColumn(descriptor, valueCount, compressionCodecName); + + state = state.write(); + if (dictionaryPage != null) { + byte[] dictonaryPageHeaderAAD = null; + if (null != headerBlockEncryptor) { + dictonaryPageHeaderAAD = AesCipher.createModuleAAD( + fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1); + } + writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD); } - private int toIntWithCheck(long size) { - if ((int)size != size) { - throw new ParquetEncodingException("Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + size); - } - return (int)size; - } - - private static void serializeOffsetIndexes( - List> offsetIndexes, - List blocks, - PositionOutputStream out, - InternalFileEncryptor fileEncryptor) throws IOException { - LOG.debug("{}: offset indexes", out.getPos()); - for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) { - BlockMetaData block = blocks.get(bIndex); - List columns = block.getColumns(); - List blockOffsetIndexes = offsetIndexes.get(bIndex); - for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) { - OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex); - if (offsetIndex == null) { - continue; - } - ColumnChunkMetaData column = columns.get(cIndex); - BlockCipher.Encryptor offsetIndexEncryptor = null; - byte[] offsetIndexAAD = null; - if (null != fileEncryptor) { - InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex); - if (columnEncryptionSetup.isEncrypted()) { - offsetIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor(); - offsetIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.OffsetIndex, - block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1); - } - } - long offset = out.getPos(); - Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), out, offsetIndexEncryptor, offsetIndexAAD); - column.setOffsetIndexReference(new IndexReference(offset, (int) (out.getPos() - offset))); - } + if (bloomFilter != null) { + // write bloom filter if one of data pages is not dictionary encoded + boolean isWriteBloomFilter = false; + for (Encoding encoding : dataEncodings) { + // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in parquet v2 + if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) { + isWriteBloomFilter = true; + break; } + } + if (isWriteBloomFilter) { + currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); + } else { + LOG.info( + "No need to write bloom filter because column {} data pages are all encoded as dictionary.", + descriptor.getPath()); + } + } + LOG.debug("{}: write data pages", out.getPos()); + long headersSize = bytes.size() - compressedTotalPageSize; + this.uncompressedLength += uncompressedTotalPageSize + headersSize; + this.compressedLength += compressedTotalPageSize + headersSize; + LOG.debug("{}: write data pages content", out.getPos()); + currentChunkFirstDataPage = out.getPos(); + bytes.writeAllTo(out); + encodingStatsBuilder.addDataEncodings(dataEncodings); + if (rlEncodings.isEmpty()) { + encodingStatsBuilder.withV2Pages(); + } + currentEncodings.addAll(rlEncodings); + currentEncodings.addAll(dlEncodings); + currentEncodings.addAll(dataEncodings); + currentStatistics = totalStats; + currentSizeStatistics = totalSizeStats; + + this.columnIndexBuilder = columnIndexBuilder; + this.offsetIndexBuilder = offsetIndexBuilder; + + endColumn(); + } + + /** + * Overwrite the column total statistics. This special used when the column total statistics + * is known while all the page statistics are invalid, for example when rewriting the column. + * + * @param totalStatistics the column total statistics + */ + public void invalidateStatistics(Statistics totalStatistics) { + Preconditions.checkArgument(totalStatistics != null, "Column total statistics can not be null"); + currentStatistics = totalStatistics; + // Invalid the ColumnIndex + columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); + } + + /** + * end a column (once all rep, def and data have been written) + * + * @throws IOException if there is an error while writing + */ + public void endColumn() throws IOException { + state = state.endColumn(); + LOG.debug("{}: end column", out.getPos()); + if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) { + currentColumnIndexes.add(null); + } else { + currentColumnIndexes.add(columnIndexBuilder.build()); } + currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage)); + currentBlock.addColumn(ColumnChunkMetaData.get( + currentChunkPath, + currentChunkType, + currentChunkCodec, + encodingStatsBuilder.build(), + currentEncodings, + currentStatistics, + currentChunkFirstDataPage, + currentChunkDictionaryPageOffset, + currentChunkValueCount, + compressedLength, + uncompressedLength, + currentSizeStatistics)); + this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); + this.uncompressedLength = 0; + this.compressedLength = 0; + this.currentChunkDictionaryPageOffset = 0; + columnIndexBuilder = null; + offsetIndexBuilder = null; + } + + /** + * ends a block once all column chunks have been written + * + * @throws IOException if there is an error while writing + */ + public void endBlock() throws IOException { +// if (currentRecordCount == 0) { +// throw new ParquetEncodingException("End block with zero record"); +// } + + state = state.endBlock(); + LOG.debug("{}: end block", out.getPos()); + currentBlock.setRowCount(currentRecordCount); + currentBlock.setOrdinal(blocks.size()); + blocks.add(currentBlock); + columnIndexes.add(currentColumnIndexes); + offsetIndexes.add(currentOffsetIndexes); + bloomFilters.add(currentBloomFilters); + currentColumnIndexes = null; + currentOffsetIndexes = null; + currentBloomFilters = null; + currentBlock = null; + } + + /** + * @param conf a configuration + * @param file a file path to append the contents of to this file + * @throws IOException if there is an error while reading or writing + * @deprecated will be removed in 2.0.0; use {@link #appendFile(InputFile)} instead + */ + @Deprecated + public void appendFile(Configuration conf, Path file) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(conf, file)) { + reader.appendTo(this); + } + } - private static void serializeBloomFilters( - List> bloomFilters, - List blocks, - PositionOutputStream out, - InternalFileEncryptor fileEncryptor) throws IOException { - LOG.debug("{}: bloom filters", out.getPos()); - for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) { - BlockMetaData block = blocks.get(bIndex); - List columns = block.getColumns(); - Map blockBloomFilters = bloomFilters.get(bIndex); - if (blockBloomFilters.isEmpty()) { - continue; - } - for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) { - ColumnChunkMetaData column = columns.get(cIndex); - BloomFilter bloomFilter = blockBloomFilters.get(column.getPath().toDotString()); - if (bloomFilter == null) { - continue; - } - - long offset = out.getPos(); - column.setBloomFilterOffset(offset); - - BlockCipher.Encryptor bloomFilterEncryptor = null; - byte[] bloomFilterHeaderAAD = null; - byte[] bloomFilterBitsetAAD = null; - if (null != fileEncryptor) { - InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex); - if (columnEncryptionSetup.isEncrypted()) { - bloomFilterEncryptor = columnEncryptionSetup.getMetaDataEncryptor(); - int columnOrdinal = columnEncryptionSetup.getOrdinal(); - bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterHeader, - block.getOrdinal(), columnOrdinal, -1); - bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterBitset, - block.getOrdinal(), columnOrdinal, -1); - } - } - - Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter), out, - bloomFilterEncryptor, bloomFilterHeaderAAD); - - ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream(); - bloomFilter.writeTo(tempOutStream); - byte[] serializedBitset = tempOutStream.toByteArray(); - if (null != bloomFilterEncryptor) { - serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, bloomFilterBitsetAAD); - } - out.write(serializedBitset); - } - } + public void appendFile(InputFile file) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(file)) { + reader.appendTo(this); + } + } + + /** + * @param file a file stream to read from + * @param rowGroups row groups to copy + * @param dropColumns whether to drop columns from the file that are not in this file's schema + * @throws IOException if there is an error while reading or writing + * @deprecated will be removed in 2.0.0; + * use {@link #appendRowGroups(SeekableInputStream, List, boolean)} instead + */ + @Deprecated + public void appendRowGroups(FSDataInputStream file, List rowGroups, boolean dropColumns) + throws IOException { + appendRowGroups(HadoopStreams.wrap(file), rowGroups, dropColumns); + } + + public void appendRowGroups(SeekableInputStream file, List rowGroups, boolean dropColumns) + throws IOException { + for (BlockMetaData block : rowGroups) { + appendRowGroup(file, block, dropColumns); + } + } + + /** + * @param from a file stream to read from + * @param rowGroup row group to copy + * @param dropColumns whether to drop columns from the file that are not in this file's schema + * @throws IOException if there is an error while reading or writing + * @deprecated will be removed in 2.0.0; + * use {@link #appendRowGroup(SeekableInputStream, BlockMetaData, boolean)} instead + */ + @Deprecated + public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup, boolean dropColumns) throws IOException { + appendRowGroup(HadoopStreams.wrap(from), rowGroup, dropColumns); + } + + public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, boolean dropColumns) + throws IOException { + startBlock(rowGroup.getRowCount()); + + Map columnsToCopy = new HashMap(); + for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { + columnsToCopy.put(chunk.getPath().toDotString(), chunk); } - private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out, - InternalFileEncryptor fileEncryptor) throws IOException { + List columnsInOrder = new ArrayList(); + + for (ColumnDescriptor descriptor : schema.getColumns()) { + String path = ColumnPath.get(descriptor.getPath()).toDotString(); + ColumnChunkMetaData chunk = columnsToCopy.remove(path); + if (chunk != null) { + columnsInOrder.add(chunk); + } else { + throw new IllegalArgumentException( + String.format("Missing column '%s', cannot copy row group: %s", path, rowGroup)); + } + } - ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); + // complain if some columns would be dropped and that's not okay + if (!dropColumns && !columnsToCopy.isEmpty()) { + throw new IllegalArgumentException(String.format( + "Columns cannot be copied (missing from target schema): %s", + String.join(", ", columnsToCopy.keySet()))); + } - // Unencrypted file - if (null == fileEncryptor) { - long footerIndex = out.getPos(); - org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer); - writeFileMetaData(parquetMetadata, out); - LOG.debug("{}: footer length = {}", out.getPos(), (out.getPos() - footerIndex)); - BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex)); - out.write(MAGIC); - return; - } + // copy the data for all chunks + long start = -1; + long length = 0; + long blockUncompressedSize = 0L; + for (int i = 0; i < columnsInOrder.size(); i += 1) { + ColumnChunkMetaData chunk = columnsInOrder.get(i); + + // get this chunk's start position in the new file + long newChunkStart = out.getPos() + length; + + // add this chunk to be copied with any previous chunks + if (start < 0) { + // no previous chunk included, start at this chunk's starting pos + start = chunk.getStartingPos(); + } + length += chunk.getTotalSize(); + + if ((i + 1) == columnsInOrder.size() || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) { + // not contiguous. do the copy now. + copy(from, out, start, length); + // reset to start at the next column chunk + start = -1; + length = 0; + } + + // TODO: column/offset indexes are not copied + // (it would require seeking to the end of the file for each row groups) + currentColumnIndexes.add(null); + currentOffsetIndexes.add(null); + + Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); + currentBlock.addColumn(ColumnChunkMetaData.get( + chunk.getPath(), + chunk.getPrimitiveType(), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + offsets.firstDataPageOffset, + offsets.dictionaryPageOffset, + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize())); + + blockUncompressedSize += chunk.getTotalUncompressedSize(); + } - org.apache.parquet.format.FileMetaData parquetMetadata = - metadataConverter.toParquetMetadata(CURRENT_VERSION, footer, fileEncryptor); - - // Encrypted file with plaintext footer - if (!fileEncryptor.isFooterEncrypted()) { - long footerIndex = out.getPos(); - parquetMetadata.setEncryption_algorithm(fileEncryptor.getEncryptionAlgorithm()); - // create footer signature (nonce + tag of encrypted footer) - byte[] footerSigningKeyMetaData = fileEncryptor.getFooterSigningKeyMetaData(); - if (null != footerSigningKeyMetaData) { - parquetMetadata.setFooter_signing_key_metadata(footerSigningKeyMetaData); - } - ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream(); - writeFileMetaData(parquetMetadata, tempOutStream); - byte[] serializedFooter = tempOutStream.toByteArray(); - byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD()); - byte[] encryptedFooter = fileEncryptor.getSignedFooterEncryptor().encrypt(serializedFooter, footerAAD); - byte[] signature = new byte[AesCipher.NONCE_LENGTH + AesCipher.GCM_TAG_LENGTH]; - System.arraycopy(encryptedFooter, ModuleCipherFactory.SIZE_LENGTH, signature, 0, AesCipher.NONCE_LENGTH); // copy Nonce - System.arraycopy(encryptedFooter, encryptedFooter.length - AesCipher.GCM_TAG_LENGTH, - signature, AesCipher.NONCE_LENGTH, AesCipher.GCM_TAG_LENGTH); // copy GCM Tag - out.write(serializedFooter); - out.write(signature); - LOG.debug("{}: footer and signature length = {}", out.getPos(), (out.getPos() - footerIndex)); - BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex)); - out.write(MAGIC); - return; - } + currentBlock.setTotalByteSize(blockUncompressedSize); + + endBlock(); + } + + /** + * @param descriptor the descriptor for the target column + * @param from a file stream to read from + * @param chunk the column chunk to be copied + * @param bloomFilter the bloomFilter for this chunk + * @param columnIndex the column index for this chunk + * @param offsetIndex the offset index for this chunk + * @throws IOException + */ + public void appendColumnChunk( + ColumnDescriptor descriptor, + SeekableInputStream from, + ColumnChunkMetaData chunk, + BloomFilter bloomFilter, + ColumnIndex columnIndex, + OffsetIndex offsetIndex) + throws IOException { + long start = chunk.getStartingPos(); + long length = chunk.getTotalSize(); + long newChunkStart = out.getPos(); + + if (offsetIndex != null && newChunkStart != start) { + offsetIndex = + OffsetIndexBuilder.getBuilder().fromOffsetIndex(offsetIndex).build(newChunkStart - start); + } - // Encrypted file with encrypted footer - long cryptoFooterIndex = out.getPos(); - writeFileCryptoMetaData(fileEncryptor.getFileCryptoMetaData(), out); - byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD()); - writeFileMetaData(parquetMetadata, out, fileEncryptor.getFooterEncryptor(), footerAAD); - int combinedMetaDataLength = (int)(out.getPos() - cryptoFooterIndex); - LOG.debug("{}: crypto metadata and footer length = {}", out.getPos(), combinedMetaDataLength); - BytesUtils.writeIntLittleEndian(out, combinedMetaDataLength); - out.write(EFMAGIC); - } - - public ParquetMetadata getFooter() { - Preconditions.checkState(state == STATE.ENDED, "Cannot return unfinished footer."); - return footer; - } - - /** - * Given a list of metadata files, merge them into a single ParquetMetadata - * Requires that the schemas be compatible, and the extraMetadata be exactly equal. - * @param files a list of files to merge metadata from - * @param conf a configuration - * @return merged parquet metadata for the files - * @throws IOException if there is an error while writing - * @deprecated metadata files are not recommended and will be removed in 2.0.0 - */ - @Deprecated - public static ParquetMetadata mergeMetadataFiles(List files, Configuration conf) throws IOException { - return mergeMetadataFiles(files, conf, new StrictKeyValueMetadataMergeStrategy()); - } - - /** - * Given a list of metadata files, merge them into a single ParquetMetadata - * Requires that the schemas be compatible, and the extraMetadata be exactly equal. - * @param files a list of files to merge metadata from - * @param conf a configuration - * @param keyValueMetadataMergeStrategy strategy to merge values for same key, if there are multiple - * @return merged parquet metadata for the files - * @throws IOException if there is an error while writing - * @deprecated metadata files are not recommended and will be removed in 2.0.0 - */ - @Deprecated - public static ParquetMetadata mergeMetadataFiles(List files, Configuration conf, - KeyValueMetadataMergeStrategy keyValueMetadataMergeStrategy) throws IOException { - Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list of metadata"); - - GlobalMetaData globalMetaData = null; - List blocks = new ArrayList(); - - for (Path p : files) { - ParquetMetadata pmd = ParquetFileReader.readFooter(conf, p, ParquetMetadataConverter.NO_FILTER); - FileMetaData fmd = pmd.getFileMetaData(); - globalMetaData = mergeInto(fmd, globalMetaData, true); - blocks.addAll(pmd.getBlocks()); - } + copy(from, out, start, length); + + currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); + currentColumnIndexes.add(columnIndex); + currentOffsetIndexes.add(offsetIndex); + + Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); + currentBlock.addColumn(ColumnChunkMetaData.get( + chunk.getPath(), + chunk.getPrimitiveType(), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + offsets.firstDataPageOffset, + offsets.dictionaryPageOffset, + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize())); + + currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); + } + + // Buffers for the copy function. + private static final ThreadLocal COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]); + + /** + * Copy from a FS input stream to an output stream. Thread-safe + * + * @param from a {@link SeekableInputStream} + * @param to any {@link PositionOutputStream} + * @param start where in the from stream to start copying + * @param length the number of bytes to copy + * @throws IOException if there is an error while reading or writing + */ + private static void copy(SeekableInputStream from, PositionOutputStream to, long start, long length) + throws IOException { + LOG.debug("Copying {} bytes at {} to {}", length, start, to.getPos()); + from.seek(start); + long bytesCopied = 0; + byte[] buffer = COPY_BUFFER.get(); + while (bytesCopied < length) { + int bytesLeft = Math.toIntExact(length - bytesCopied); + int bytesRead = from.read(buffer, 0, (Math.min(buffer.length, bytesLeft))); + if (bytesRead < 0) { + throw new IllegalArgumentException("Unexpected end of input file at " + start + bytesCopied); + } + to.write(buffer, 0, bytesRead); + bytesCopied += bytesRead; + } + } + + /** + * ends a file once all blocks have been written. + * closes the file. + * + * @param extraMetaData the extra meta data to write in the footer + * @throws IOException if there is an error while writing + */ + public void end(Map extraMetaData) throws IOException { + try { + state = state.end(); + serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor); + serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor); + serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor); + LOG.debug("{}: end", out.getPos()); + this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); + serializeFooter(footer, out, fileEncryptor, metadataConverter); + } finally { + close(); + } + } - // collapse GlobalMetaData into a single FileMetaData, which will throw if they are not compatible - return new ParquetMetadata(globalMetaData.merge(keyValueMetadataMergeStrategy), blocks); - } - - /** - * Given a list of metadata files, merge them into a single metadata file. - * Requires that the schemas be compatible, and the extraMetaData be exactly equal. - * This is useful when merging 2 directories of parquet files into a single directory, as long - * as both directories were written with compatible schemas and equal extraMetaData. - * @param files a list of files to merge metadata from - * @param outputPath path to write merged metadata to - * @param conf a configuration - * @throws IOException if there is an error while reading or writing - * @deprecated metadata files are not recommended and will be removed in 2.0.0 - */ - @Deprecated - public static void writeMergedMetadataFile(List files, Path outputPath, Configuration conf) throws IOException { - ParquetMetadata merged = mergeMetadataFiles(files, conf); - writeMetadataFile(outputPath, merged, outputPath.getFileSystem(conf)); - } - - /** - * writes a _metadata and _common_metadata file - * @param configuration the configuration to use to get the FileSystem - * @param outputPath the directory to write the _metadata file to - * @param footers the list of footers to merge - * @throws IOException if there is an error while writing - * @deprecated metadata files are not recommended and will be removed in 2.0.0 - */ - @Deprecated - public static void writeMetadataFile(Configuration configuration, Path outputPath, List