Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ public void setPageReader(PageReader pageReader) throws IOException {
}
}

/** This method is called from Apache Iceberg. */
public void setRowGroupReader(RowGroupReader rowGroupReader, ParquetColumnSpec columnSpec)
Comment on lines +129 to +130
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have some unit tests in Comet for the methods intended to be called from Iceberg, so that we catch any regressions in behavior. This could be added as a separate PR so that we don't slow down progress on the integration work.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #1928

throws IOException {
ColumnDescriptor descriptor = Utils.buildColumnDescriptor(columnSpec);
setPageReader(rowGroupReader.getPageReader(descriptor));
}

@Override
public void readBatch(int total) {
LOG.debug("Start to batch of size = " + total);
Expand Down
106 changes: 103 additions & 3 deletions common/src/main/java/org/apache/comet/parquet/FileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand All @@ -40,6 +42,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferInputStream;
Expand All @@ -53,6 +58,7 @@
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.crypto.AesCipher;
import org.apache.parquet.crypto.EncryptionPropertiesFactory;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.crypto.InternalColumnDecryptionSetup;
import org.apache.parquet.crypto.InternalFileDecryptor;
Expand All @@ -67,6 +73,7 @@
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
Expand Down Expand Up @@ -128,6 +135,48 @@ public FileReader(InputFile file, ParquetReadOptions options, ReadOptions cometO
this(file, null, options, cometOptions, null);
}

/** This constructor is called from Apache Iceberg. */
public FileReader(
Path path,
Configuration conf,
ReadOptions cometOptions,
Map<String, String> properties,
Long start,
Long length,
byte[] fileEncryptionKey,
byte[] fileAADPrefix)
throws IOException {
ParquetReadOptions options =
buildParquetReadOptions(conf, properties, start, length, fileEncryptionKey, fileAADPrefix);
this.converter = new ParquetMetadataConverter(options);
this.file = CometInputFile.fromPath(path, conf);
this.f = file.newStream();
this.options = options;
this.cometOptions = cometOptions;
this.metrics = null;
try {
this.footer = readFooter(file, options, f, converter);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The footer should be available already in Iceberg? Can we avoid having to read it twice?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the footer is already available from iceberg, but I can't pass ParquetMetadata from Iceberg to Comet. Maybe I can convert ParquetMetadata to JSON string and pass it to Comet, and then construct a ParquetMetadata from JSON string.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought I can do something like this:

       ParquetMetadata footer = reader.getFooter();
       String footerInJson = ParquetMetadata.toJSON(footer);
       ParquetMetadata footer2 = ParquetMetadata.fromJSON(footerInJson);

but this doesn't work. I got

java.lang.RuntimeException: shaded.parquet.com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.apache.parquet.hadoop.metadata.ParquetMetadata` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: (StringReader); line: 1, column: 2]

I will see if there are other ways to re-construct ParquetMetadata.

} catch (Exception e) {
// In case that reading footer throws an exception in the constructor, the new stream
// should be closed. Otherwise, there's no way to close this outside.
f.close();
throw e;
}
this.fileMetaData = footer.getFileMetaData();
this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups!
if (null != fileDecryptor && fileDecryptor.plaintextFile()) {
this.fileDecryptor = null; // Plaintext file. No need in decryptor
}

this.blocks = footer.getBlocks(); // filter row group in iceberg
this.blockIndexStores = listWithNulls(this.blocks.size());
this.blockRowRanges = listWithNulls(this.blocks.size());
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}

public FileReader(
InputFile file,
ParquetReadOptions options,
Expand Down Expand Up @@ -209,6 +258,57 @@ public void setRequestedSchema(List<ColumnDescriptor> projection) {
}
}

/** This method is called from Apache Iceberg. */
public void setRequestedSchemaFromSpecs(List<ParquetColumnSpec> specList) {
paths.clear();
for (ParquetColumnSpec colSpec : specList) {
ColumnDescriptor descriptor = Utils.buildColumnDescriptor(colSpec);
paths.put(ColumnPath.get(colSpec.getPath()), descriptor);
}
}

private static ParquetReadOptions buildParquetReadOptions(
Configuration conf,
Map<String, String> properties,
Long start,
Long length,
byte[] fileEncryptionKey,
byte[] fileAADPrefix) {

// Iceberg remove these read properties when building the ParquetReadOptions.
// We want build the exact same ParquetReadOptions as Iceberg's.
Collection<String> readPropertiesToRemove =
Set.of(
ParquetInputFormat.UNBOUND_RECORD_FILTER,
ParquetInputFormat.FILTER_PREDICATE,
ParquetInputFormat.READ_SUPPORT_CLASS,
EncryptionPropertiesFactory.CRYPTO_FACTORY_CLASS_PROPERTY_NAME);

for (String property : readPropertiesToRemove) {
conf.unset(property);
}

ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(conf);
for (Map.Entry<String, String> entry : properties.entrySet()) {
optionsBuilder.set(entry.getKey(), entry.getValue());
}

if (start != null && length != null) {
optionsBuilder.withRange(start, start + length);
}

if (fileEncryptionKey != null) {
FileDecryptionProperties fileDecryptionProperties =
FileDecryptionProperties.builder()
.withFooterKey(fileEncryptionKey)
.withAADPrefix(fileAADPrefix)
.build();
optionsBuilder.withDecryption(fileDecryptionProperties);
}

return optionsBuilder.build();
}

/**
* Gets the total number of records across all row groups (after applying row group filtering).
*/
Expand Down Expand Up @@ -245,15 +345,15 @@ public boolean skipNextRowGroup() {
* Returns the next row group to read (after applying row group filtering), or null if there's no
* more row group.
*/
public PageReadStore readNextRowGroup() throws IOException {
public RowGroupReader readNextRowGroup() throws IOException {
if (currentBlock == blocks.size()) {
return null;
}
BlockMetaData block = blocks.get(currentBlock);
if (block.getRowCount() == 0) {
throw new RuntimeException("Illegal row group of 0 rows");
}
this.currentRowGroup = new RowGroupReader(block.getRowCount());
this.currentRowGroup = new RowGroupReader(block.getRowCount(), block.getRowIndexOffset());
// prepare the list of consecutive parts to read them in one scan
List<ConsecutivePartList> allParts = new ArrayList<>();
ConsecutivePartList currentParts = null;
Expand Down Expand Up @@ -362,7 +462,7 @@ ColumnIndexReader getColumnIndexReader(int blockIndex) {
return ciStore;
}

private PageReadStore readChunks(
private RowGroupReader readChunks(
BlockMetaData block, List<ConsecutivePartList> allParts, ChunkListBuilder builder)
throws IOException {
if (shouldReadParallel()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.parquet;

public class ParquetColumnSpec {

private final String[] path;
private final String physicalType;
private final int typeLength;
private final boolean isRepeated;
private final int maxDefinitionLevel;
private final int maxRepetitionLevel;

public ParquetColumnSpec(
String[] path,
String physicalType,
int typeLength,
boolean isRepeated,
int maxDefinitionLevel,
int maxRepetitionLevel) {
this.path = path;
this.physicalType = physicalType;
this.typeLength = typeLength;
this.isRepeated = isRepeated;
this.maxDefinitionLevel = maxDefinitionLevel;
this.maxRepetitionLevel = maxRepetitionLevel;
}

public String[] getPath() {
return path;
}

public String getPhysicalType() {
return physicalType;
}

public int getTypeLength() {
return typeLength;
}

public boolean isRepeated() {
return isRepeated;
}

public int getMaxRepetitionLevel() {
return maxRepetitionLevel;
}

public int getMaxDefinitionLevel() {
return maxDefinitionLevel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ public class RowGroupReader implements PageReadStore {
private final Map<ColumnDescriptor, PageReader> readers = new HashMap<>();
private final long rowCount;
private final RowRanges rowRanges;
private final long rowIndexOffset;

public RowGroupReader(long rowCount) {
public RowGroupReader(long rowCount, long rowIndexOffset) {
this.rowCount = rowCount;
this.rowRanges = null;
this.rowIndexOffset = rowIndexOffset;
}

RowGroupReader(RowRanges rowRanges) {
this.rowRanges = rowRanges;
this.rowCount = rowRanges.rowCount();
this.rowIndexOffset = -1;
}

@Override
Expand All @@ -64,6 +67,11 @@ public Optional<PrimitiveIterator.OfLong> getRowIndexes() {
return rowRanges == null ? Optional.empty() : Optional.of(rowRanges.iterator());
}

@Override
public Optional<Long> getRowIndexOffset() {
return this.rowIndexOffset < 0L ? Optional.empty() : Optional.of(this.rowIndexOffset);
}

void addColumn(ColumnDescriptor path, ColumnPageReader reader) {
if (readers.put(path, reader) != null) {
throw new IllegalStateException(path + " was already added");
Expand Down
41 changes: 41 additions & 0 deletions common/src/main/java/org/apache/comet/parquet/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,29 @@

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.spark.sql.types.*;

import org.apache.comet.CometSchemaImporter;

public class Utils {

/** This method is called from Apache Iceberg. */
public static ColumnReader getColumnReader(
DataType type,
ParquetColumnSpec columnSpec,
CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLazyMaterialization) {

ColumnDescriptor descriptor = buildColumnDescriptor(columnSpec);
return getColumnReader(
type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true);
}

public static ColumnReader getColumnReader(
DataType type,
ColumnDescriptor descriptor,
Expand Down Expand Up @@ -260,4 +275,30 @@ static int getTimeUnitId(LogicalTypeAnnotation.TimeUnit tu) {
throw new UnsupportedOperationException("Unsupported TimeUnit " + tu);
}
}

public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) {
PrimitiveType.PrimitiveTypeName primType =
PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType());

Type.Repetition repetition;
if (columnSpec.getMaxRepetitionLevel() > 0) {
repetition = Type.Repetition.REPEATED;
} else if (columnSpec.getMaxDefinitionLevel() > 0) {
repetition = Type.Repetition.OPTIONAL;
} else {
repetition = Type.Repetition.REQUIRED;
}

String name = columnSpec.getPath()[columnSpec.getPath().length - 1];

PrimitiveType primitiveType;
if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
primitiveType = new PrimitiveType(repetition, primType, columnSpec.getTypeLength(), name);
} else {
primitiveType = new PrimitiveType(repetition, primType, name);
}

MessageType schema = new MessageType("root", primitiveType);
return schema.getColumnDescription(columnSpec.getPath());
}
}
Loading