diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index 9502aa265d..b2fe965e2e 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -126,6 +126,13 @@ public void setPageReader(PageReader pageReader) throws IOException { } } + /** This method is called from Apache Iceberg. */ + public void setRowGroupReader(RowGroupReader rowGroupReader, ParquetColumnSpec columnSpec) + throws IOException { + ColumnDescriptor descriptor = Utils.buildColumnDescriptor(columnSpec); + setPageReader(rowGroupReader.getPageReader(descriptor)); + } + @Override public void readBatch(int total) { LOG.debug("Start to batch of size = " + total); diff --git a/common/src/main/java/org/apache/comet/parquet/FileReader.java b/common/src/main/java/org/apache/comet/parquet/FileReader.java index a85e0ebe76..af6c5b3c0b 100644 --- a/common/src/main/java/org/apache/comet/parquet/FileReader.java +++ b/common/src/main/java/org/apache/comet/parquet/FileReader.java @@ -27,9 +27,11 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -40,6 +42,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; @@ -53,6 +58,7 @@ import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.EncryptionPropertiesFactory; import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.crypto.InternalColumnDecryptionSetup; import org.apache.parquet.crypto.InternalFileDecryptor; @@ -67,6 +73,7 @@ import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.Util; import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; @@ -128,6 +135,48 @@ public FileReader(InputFile file, ParquetReadOptions options, ReadOptions cometO this(file, null, options, cometOptions, null); } + /** This constructor is called from Apache Iceberg. */ + public FileReader( + Path path, + Configuration conf, + ReadOptions cometOptions, + Map properties, + Long start, + Long length, + byte[] fileEncryptionKey, + byte[] fileAADPrefix) + throws IOException { + ParquetReadOptions options = + buildParquetReadOptions(conf, properties, start, length, fileEncryptionKey, fileAADPrefix); + this.converter = new ParquetMetadataConverter(options); + this.file = CometInputFile.fromPath(path, conf); + this.f = file.newStream(); + this.options = options; + this.cometOptions = cometOptions; + this.metrics = null; + try { + this.footer = readFooter(file, options, f, converter); + } catch (Exception e) { + // In case that reading footer throws an exception in the constructor, the new stream + // should be closed. Otherwise, there's no way to close this outside. + f.close(); + throw e; + } + this.fileMetaData = footer.getFileMetaData(); + this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups! + if (null != fileDecryptor && fileDecryptor.plaintextFile()) { + this.fileDecryptor = null; // Plaintext file. No need in decryptor + } + + this.blocks = footer.getBlocks(); // filter row group in iceberg + this.blockIndexStores = listWithNulls(this.blocks.size()); + this.blockRowRanges = listWithNulls(this.blocks.size()); + for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { + paths.put(ColumnPath.get(col.getPath()), col); + } + this.crc = options.usePageChecksumVerification() ? new CRC32() : null; + } + public FileReader( InputFile file, ParquetReadOptions options, @@ -209,6 +258,57 @@ public void setRequestedSchema(List projection) { } } + /** This method is called from Apache Iceberg. */ + public void setRequestedSchemaFromSpecs(List specList) { + paths.clear(); + for (ParquetColumnSpec colSpec : specList) { + ColumnDescriptor descriptor = Utils.buildColumnDescriptor(colSpec); + paths.put(ColumnPath.get(colSpec.getPath()), descriptor); + } + } + + private static ParquetReadOptions buildParquetReadOptions( + Configuration conf, + Map properties, + Long start, + Long length, + byte[] fileEncryptionKey, + byte[] fileAADPrefix) { + + // Iceberg remove these read properties when building the ParquetReadOptions. + // We want build the exact same ParquetReadOptions as Iceberg's. + Collection readPropertiesToRemove = + Set.of( + ParquetInputFormat.UNBOUND_RECORD_FILTER, + ParquetInputFormat.FILTER_PREDICATE, + ParquetInputFormat.READ_SUPPORT_CLASS, + EncryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME); + + for (String property : readPropertiesToRemove) { + conf.unset(property); + } + + ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(conf); + for (Map.Entry entry : properties.entrySet()) { + optionsBuilder.set(entry.getKey(), entry.getValue()); + } + + if (start != null && length != null) { + optionsBuilder.withRange(start, start + length); + } + + if (fileEncryptionKey != null) { + FileDecryptionProperties fileDecryptionProperties = + FileDecryptionProperties.builder() + .withFooterKey(fileEncryptionKey) + .withAADPrefix(fileAADPrefix) + .build(); + optionsBuilder.withDecryption(fileDecryptionProperties); + } + + return optionsBuilder.build(); + } + /** * Gets the total number of records across all row groups (after applying row group filtering). */ @@ -245,7 +345,7 @@ public boolean skipNextRowGroup() { * Returns the next row group to read (after applying row group filtering), or null if there's no * more row group. */ - public PageReadStore readNextRowGroup() throws IOException { + public RowGroupReader readNextRowGroup() throws IOException { if (currentBlock == blocks.size()) { return null; } @@ -253,7 +353,7 @@ public PageReadStore readNextRowGroup() throws IOException { if (block.getRowCount() == 0) { throw new RuntimeException("Illegal row group of 0 rows"); } - this.currentRowGroup = new RowGroupReader(block.getRowCount()); + this.currentRowGroup = new RowGroupReader(block.getRowCount(), block.getRowIndexOffset()); // prepare the list of consecutive parts to read them in one scan List allParts = new ArrayList<>(); ConsecutivePartList currentParts = null; @@ -362,7 +462,7 @@ ColumnIndexReader getColumnIndexReader(int blockIndex) { return ciStore; } - private PageReadStore readChunks( + private RowGroupReader readChunks( BlockMetaData block, List allParts, ChunkListBuilder builder) throws IOException { if (shouldReadParallel()) { diff --git a/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java b/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java new file mode 100644 index 0000000000..7faa6e62b3 --- /dev/null +++ b/common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet; + +public class ParquetColumnSpec { + + private final String[] path; + private final String physicalType; + private final int typeLength; + private final boolean isRepeated; + private final int maxDefinitionLevel; + private final int maxRepetitionLevel; + + public ParquetColumnSpec( + String[] path, + String physicalType, + int typeLength, + boolean isRepeated, + int maxDefinitionLevel, + int maxRepetitionLevel) { + this.path = path; + this.physicalType = physicalType; + this.typeLength = typeLength; + this.isRepeated = isRepeated; + this.maxDefinitionLevel = maxDefinitionLevel; + this.maxRepetitionLevel = maxRepetitionLevel; + } + + public String[] getPath() { + return path; + } + + public String getPhysicalType() { + return physicalType; + } + + public int getTypeLength() { + return typeLength; + } + + public boolean isRepeated() { + return isRepeated; + } + + public int getMaxRepetitionLevel() { + return maxRepetitionLevel; + } + + public int getMaxDefinitionLevel() { + return maxDefinitionLevel; + } +} diff --git a/common/src/main/java/org/apache/comet/parquet/RowGroupReader.java b/common/src/main/java/org/apache/comet/parquet/RowGroupReader.java index d5d73b0783..1c7de2fe90 100644 --- a/common/src/main/java/org/apache/comet/parquet/RowGroupReader.java +++ b/common/src/main/java/org/apache/comet/parquet/RowGroupReader.java @@ -33,15 +33,18 @@ public class RowGroupReader implements PageReadStore { private final Map readers = new HashMap<>(); private final long rowCount; private final RowRanges rowRanges; + private final long rowIndexOffset; - public RowGroupReader(long rowCount) { + public RowGroupReader(long rowCount, long rowIndexOffset) { this.rowCount = rowCount; this.rowRanges = null; + this.rowIndexOffset = rowIndexOffset; } RowGroupReader(RowRanges rowRanges) { this.rowRanges = rowRanges; this.rowCount = rowRanges.rowCount(); + this.rowIndexOffset = -1; } @Override @@ -64,6 +67,11 @@ public Optional getRowIndexes() { return rowRanges == null ? Optional.empty() : Optional.of(rowRanges.iterator()); } + @Override + public Optional getRowIndexOffset() { + return this.rowIndexOffset < 0L ? Optional.empty() : Optional.of(this.rowIndexOffset); + } + void addColumn(ColumnDescriptor path, ColumnPageReader reader) { if (readers.put(path, reader) != null) { throw new IllegalStateException(path + " was already added"); diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index 2f9c507366..d64ab371a3 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -21,7 +21,9 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; import org.apache.spark.sql.types.*; import org.apache.comet.CometSchemaImporter; @@ -29,6 +31,19 @@ public class Utils { /** This method is called from Apache Iceberg. */ + public static ColumnReader getColumnReader( + DataType type, + ParquetColumnSpec columnSpec, + CometSchemaImporter importer, + int batchSize, + boolean useDecimal128, + boolean useLazyMaterialization) { + + ColumnDescriptor descriptor = buildColumnDescriptor(columnSpec); + return getColumnReader( + type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true); + } + public static ColumnReader getColumnReader( DataType type, ColumnDescriptor descriptor, @@ -260,4 +275,30 @@ static int getTimeUnitId(LogicalTypeAnnotation.TimeUnit tu) { throw new UnsupportedOperationException("Unsupported TimeUnit " + tu); } } + + public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) { + PrimitiveType.PrimitiveTypeName primType = + PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType()); + + Type.Repetition repetition; + if (columnSpec.getMaxRepetitionLevel() > 0) { + repetition = Type.Repetition.REPEATED; + } else if (columnSpec.getMaxDefinitionLevel() > 0) { + repetition = Type.Repetition.OPTIONAL; + } else { + repetition = Type.Repetition.REQUIRED; + } + + String name = columnSpec.getPath()[columnSpec.getPath().length - 1]; + + PrimitiveType primitiveType; + if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { + primitiveType = new PrimitiveType(repetition, primType, columnSpec.getTypeLength(), name); + } else { + primitiveType = new PrimitiveType(repetition, primType, name); + } + + MessageType schema = new MessageType("root", primitiveType); + return schema.getColumnDescription(columnSpec.getPath()); + } }