Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public interface PageReadStore {
*/
long getRowCount();

/**
* @return the optional of the long representing the row index offset of this row group or an empty optional if the
* related data is not available
*/
default Optional<Long> getRowIndexOffset() {
return Optional.empty();
}

/**
* Returns the indexes of the rows to be read/built if the related data is available. All the rows which index is not
* returned shall be skipped.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1400,34 +1400,67 @@ public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilte
return readParquetMetadata(from, filter, null, false, 0);
}

private Map<RowGroup, Long> generateRowGroupOffsets(FileMetaData metaData) {
Map<RowGroup, Long> rowGroupOrdinalToRowIdx = new HashMap<>();
List<RowGroup> rowGroups = metaData.getRow_groups();
if (rowGroups != null) {
long rowIdxSum = 0;
for (int i = 0; i < rowGroups.size(); i++) {
rowGroupOrdinalToRowIdx.put(rowGroups.get(i), rowIdxSum);
rowIdxSum += rowGroups.get(i).getNum_rows();
}
}
return rowGroupOrdinalToRowIdx;
}

/**
* A container for [[FileMetaData]] and [[RowGroup]] to ROW_INDEX offset map.
*/
private class FileMetaDataAndRowGroupOffsetInfo {
final FileMetaData fileMetadata;
final Map<RowGroup, Long> rowGroupToRowIndexOffsetMap;
public FileMetaDataAndRowGroupOffsetInfo(FileMetaData fileMetadata, Map<RowGroup, Long> rowGroupToRowIndexOffsetMap) {
this.fileMetadata = fileMetadata;
this.rowGroupToRowIndexOffsetMap = rowGroupToRowIndexOffsetMap;
}
}

public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter,
final InternalFileDecryptor fileDecryptor, final boolean encryptedFooter,
final int combinedFooterLength) throws IOException {

final BlockCipher.Decryptor footerDecryptor = (encryptedFooter? fileDecryptor.fetchFooterDecryptor() : null);
final byte[] encryptedFooterAAD = (encryptedFooter? AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null);

FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo = filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
@Override
public FileMetaData visit(NoFilter filter) throws IOException {
return readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));
}

@Override
public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
return readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {
FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));
}

@Override
public FileMetaData visit(OffsetMetadataFilter filter) throws IOException {
return filterFileMetaDataByStart(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter);
public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata));
}

@Override
public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
return filterFileMetaDataByMidpoint(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter);
public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata));
}
});
FileMetaData fileMetaData = fileMetaDataAndRowGroupInfo.fileMetadata;
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = fileMetaDataAndRowGroupInfo.rowGroupToRowIndexOffsetMap;
LOG.debug("{}", fileMetaData);

if (!encryptedFooter && null != fileDecryptor) {
Expand All @@ -1447,7 +1480,7 @@ public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
}
}

ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, fileDecryptor, encryptedFooter);
ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, fileDecryptor, encryptedFooter, rowGroupToRowIndexOffsetMap);
if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
return parquetMetadata;
}
Expand Down Expand Up @@ -1476,6 +1509,13 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws

public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata,
InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws IOException {
return fromParquetMetadata(parquetMetadata, fileDecryptor, encryptedFooter, new HashMap<RowGroup, Long>());
}

public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata,
InternalFileDecryptor fileDecryptor,
boolean encryptedFooter,
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap) throws IOException {
MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), parquetMetadata.getColumn_orders());
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
List<RowGroup> row_groups = parquetMetadata.getRow_groups();
Expand All @@ -1485,6 +1525,9 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata,
BlockMetaData blockMetaData = new BlockMetaData();
blockMetaData.setRowCount(rowGroup.getNum_rows());
blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
if (rowGroupToRowIndexOffsetMap.containsKey(rowGroup)) {
blockMetaData.setRowIndexOffset(rowGroupToRowIndexOffsetMap.get(rowGroup));
}
// not set in legacy files
if (rowGroup.isSetOrdinal()) {
blockMetaData.setOrdinal(rowGroup.getOrdinal());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,26 @@ public DictionaryPage readDictionaryPage() {

private final Map<ColumnDescriptor, ColumnChunkPageReader> readers = new HashMap<ColumnDescriptor, ColumnChunkPageReader>();
private final long rowCount;
private final long rowIndexOffset;
private final RowRanges rowRanges;

public ColumnChunkPageReadStore(long rowCount) {
this(rowCount, -1);
}

ColumnChunkPageReadStore(RowRanges rowRanges) {
this(rowRanges, -1);
}

ColumnChunkPageReadStore(long rowCount, long rowIndexOffset) {
this.rowCount = rowCount;
this.rowIndexOffset = rowIndexOffset;
rowRanges = null;
}

ColumnChunkPageReadStore(RowRanges rowRanges) {
ColumnChunkPageReadStore(RowRanges rowRanges, long rowIndexOffset) {
this.rowRanges = rowRanges;
this.rowIndexOffset = rowIndexOffset;
rowCount = rowRanges.rowCount();
}

Expand All @@ -265,6 +276,11 @@ public long getRowCount() {
return rowCount;
}

@Override
public Optional<Long> getRowIndexOffset() {
return rowIndexOffset < 0 ? Optional.empty() : Optional.of(rowIndexOffset);
}

@Override
public PageReader getPageReader(ColumnDescriptor path) {
final PageReader pageReader = readers.get(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.stream.LongStream;

import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -69,6 +71,8 @@ class InternalParquetRecordReader<T> {
private long current = 0;
private int currentBlock = -1;
private ParquetFileReader reader;
private long currentRowIdx = -1;
private PrimitiveIterator.OfLong rowIdxInFileItr;
private org.apache.parquet.io.RecordReader<T> recordReader;
private boolean strictTypeChecking;

Expand Down Expand Up @@ -127,6 +131,7 @@ private void checkRead() throws IOException {
if (pages == null) {
throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
}
resetRowIndexIterator(pages);
long timeSpentReading = System.currentTimeMillis() - t0;
totalTimeSpentReadingBytes += timeSpentReading;
BenchmarkCounter.incrementTime(timeSpentReading);
Expand Down Expand Up @@ -227,6 +232,11 @@ public boolean nextKeyValue() throws IOException, InterruptedException {

try {
currentValue = recordReader.read();
if (rowIdxInFileItr != null && rowIdxInFileItr.hasNext()) {
currentRowIdx = rowIdxInFileItr.next();
} else {
currentRowIdx = -1;
}
} catch (RecordMaterializationException e) {
// this might throw, but it's fatal if it does.
unmaterializableRecordCounter.incErrors(e);
Expand Down Expand Up @@ -265,4 +275,47 @@ private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
return Collections.unmodifiableMap(setMultiMap);
}

/**
* Returns the row index of the current row. If no row has been processed or if the
* row index information is unavailable from the underlying @{@link PageReadStore}, returns -1.
*/
public long getCurrentRowIndex() {
if (current == 0L || rowIdxInFileItr == null) {
return -1;
}
return currentRowIdx;
}

/**
* Resets the row index iterator based on the current processed row group.
*/
private void resetRowIndexIterator(PageReadStore pages) {
Optional<Long> rowGroupRowIdxOffset = pages.getRowIndexOffset();
if (!rowGroupRowIdxOffset.isPresent()) {
this.rowIdxInFileItr = null;
return;
}

currentRowIdx = -1;
final PrimitiveIterator.OfLong rowIdxInRowGroupItr;
if (pages.getRowIndexes().isPresent()) {
rowIdxInRowGroupItr = pages.getRowIndexes().get();
} else {
rowIdxInRowGroupItr = LongStream.range(0, pages.getRowCount()).iterator();
}
// Adjust the row group offset in the `rowIndexWithinRowGroupIterator` iterator.
this.rowIdxInFileItr = new PrimitiveIterator.OfLong() {
public long nextLong() {
return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.nextLong();
}

public boolean hasNext() {
return rowIdxInRowGroupItr.hasNext();
}

public Long next() {
return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.next();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE
if (block.getRowCount() == 0) {
throw new RuntimeException("Illegal row group of 0 rows");
}
ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount());
ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount(), block.getRowIndexOffset());
// prepare the list of consecutive parts to read them in one scan
List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
ConsecutivePartList currentParts = null;
Expand Down Expand Up @@ -1044,7 +1044,7 @@ public PageReadStore readNextFilteredRowGroup() throws IOException {
}

private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData block, RowRanges rowRanges, ColumnIndexStore ciStore) throws IOException {
ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges);
ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges, block.getRowIndexOffset());
// prepare the list of consecutive parts to read them in one scan
ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
List<ConsecutivePartList> allParts = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ public T read() throws IOException {
}
}

/**
* @return the row index of the last read row. If no row has been processed, returns -1.
*/
public long getCurrentRowIndex() {
if (reader == null) {
return -1;
}
return reader.getCurrentRowIndex();
}

private void initReader() throws IOException {
if (reader != null) {
reader.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
return internalReader.nextKeyValue();
}

/**
* @return the row index of the current row. If no row has been processed, returns -1.
*/
public long getCurrentRowIndex() throws IOException {
return internalReader.getCurrentRowIndex();
}

private ParquetInputSplit toParquetSplit(InputSplit split) throws IOException {
if (split instanceof ParquetInputSplit) {
return (ParquetInputSplit) split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class BlockMetaData {
private long totalByteSize;
private String path;
private int ordinal;
private long rowIndexOffset = -1;

public BlockMetaData() {
}
Expand Down Expand Up @@ -65,6 +66,18 @@ public void setRowCount(long rowCount) {
this.rowCount = rowCount;
}

/**
* @return -1 if the rowIndexOffset for the {@link BlockMetaData} is unavailable else returns the actual rowIndexOffset
*/
public long getRowIndexOffset() { return rowIndexOffset; }

/**
* @param rowIndexOffset the rowIndexOffset to set
*/
public void setRowIndexOffset(long rowIndexOffset) {
this.rowIndexOffset = rowIndexOffset;
}

/**
* @return the totalByteSize
*/
Expand Down Expand Up @@ -105,7 +118,11 @@ public long getStartingPos() {

@Override
public String toString() {
return "BlockMetaData{" + rowCount + ", " + totalByteSize + " " + columns + "}";
String rowIndexOffsetStr = "";
if (rowIndexOffset != -1) {
rowIndexOffsetStr = ", rowIndexOffset = " + rowIndexOffset;
}
return "BlockMetaData{" + rowCount + ", " + totalByteSize + rowIndexOffsetStr + " " + columns + "}";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.parquet.filter2.recordlevel;

import static org.junit.Assert.assertEquals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -315,7 +317,7 @@ public static void write(ParquetWriter.Builder<Group, ?> builder, List<User> use
}
}

private static ParquetReader<Group> createReader(Path file, Filter filter) throws IOException {
public static ParquetReader<Group> createReader(Path file, Filter filter) throws IOException {
Configuration conf = new Configuration();
GroupWriteSupport.setSchema(schema, conf);

Expand All @@ -341,11 +343,24 @@ public static List<Group> readFile(File f, Filter filter) throws IOException {
}

public static List<User> readUsers(ParquetReader.Builder<Group> builder) throws IOException {
return readUsers(builder, false);
}

/**
* Returns a list of users from the underlying [[ParquetReader]] builder.
* If `validateRowIndexes` is set to true, this method will also validate the ROW_INDEXes for the
* rows read from ParquetReader - ROW_INDEX for a row should be same as underlying user id.
*/
public static List<User> readUsers(ParquetReader.Builder<Group> builder, boolean validateRowIndexes) throws IOException {
ParquetReader<Group> reader = builder.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()).build();

List<User> users = new ArrayList<>();
for (Group group = reader.read(); group != null; group = reader.read()) {
users.add(userFromGroup(group));
User u = userFromGroup(group);
users.add(u);
if (validateRowIndexes) {
assertEquals(reader.getCurrentRowIndex(), u.id);
}
}
return users;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private List<PhoneBookWriter.User> readUsers(FilterPredicate filter, boolean use
.useStatsFilter(useOtherFiltering)
.useRecordFilter(useOtherFiltering)
.useBloomFilter(useBloomFilter)
.useColumnIndexFilter(useOtherFiltering));
.useColumnIndexFilter(useOtherFiltering), true);
}

// Assumes that both lists are in the same order
Expand Down
Loading