Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -731,7 +732,7 @@ public void testBSIAndBitmapIndexInDisk() throws Exception {
public void testBitmapIndexResultFilterParquetRowRanges() throws Exception {
RowType rowType =
RowType.builder()
.field("id", DataTypes.INT())
.field("id", DataTypes.STRING())
.field("event", DataTypes.STRING())
.field("price", DataTypes.INT())
.build();
Expand All @@ -749,26 +750,26 @@ public void testBitmapIndexResultFilterParquetRowRanges() throws Exception {
+ "."
+ CoreOptions.COLUMNS,
"price");
options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576");
options.set(
ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
});

int bound = 3000;
int bound = 300000;
Random random = new Random();
Map<Integer, Integer> expectedMap = new HashMap<>();
for (int i = 0; i < 5; i++) {
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
for (int j = 0; j < 10000; j++) {
int next = random.nextInt(bound);
expectedMap.compute(next, (key, value) -> value == null ? 1 : value + 1);
write.write(GenericRow.of(1, BinaryString.fromString("A"), next));
}
commit.commit(i, write.prepareCommit(true, i));
write.close();
commit.close();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
for (int j = 0; j < 1000000; j++) {
int next = random.nextInt(bound);
BinaryString uuid = BinaryString.fromString(UUID.randomUUID().toString());
expectedMap.compute(next, (key, value) -> value == null ? 1 : value + 1);
write.write(GenericRow.of(uuid, uuid, next));
}
commit.commit(0, write.prepareCommit(true, 0));
write.close();
commit.close();

// test eq
for (int i = 0; i < 10; i++) {
Expand All @@ -789,7 +790,7 @@ public void testBitmapIndexResultFilterParquetRowRanges() throws Exception {

// test between
for (int i = 0; i < 10; i++) {
int max = random.nextInt(bound);
int max = random.nextInt(bound) + 1;
int min = random.nextInt(max);
Predicate predicate =
PredicateBuilder.and(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ private RowRanges getRowRanges(int blockIndex) {
getColumnIndexStore(blockIndex),
paths.keySet(),
blocks.get(blockIndex).getRowCount(),
blocks.get(blockIndex).getRowIndexOffset(),
fileIndexResult);
blockRowRanges.set(blockIndex, rowRanges);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class ColumnIndexFilter implements Visitor<RowRanges> {
private final ColumnIndexStore columnIndexStore;
private final Set<ColumnPath> columns;
private final long rowCount;
private final long rowIndexOffset;
@Nullable private final FileIndexResult fileIndexResult;
private RowRanges allRows;

Expand All @@ -89,6 +90,7 @@ public static RowRanges calculateRowRanges(
ColumnIndexStore columnIndexStore,
Set<ColumnPath> paths,
long rowCount,
long rowIndexOffset,
@Nullable FileIndexResult fileIndexResult) {
return filter.accept(
new FilterCompat.Visitor<RowRanges>() {
Expand All @@ -102,6 +104,7 @@ public RowRanges visit(FilterPredicateCompat filterPredicateCompat) {
columnIndexStore,
paths,
rowCount,
rowIndexOffset,
fileIndexResult));
} catch (MissingOffsetIndexException e) {
LOGGER.info(e.getMessage());
Expand All @@ -125,10 +128,12 @@ private ColumnIndexFilter(
ColumnIndexStore columnIndexStore,
Set<ColumnPath> paths,
long rowCount,
long rowIndexOffset,
@Nullable FileIndexResult fileIndexResult) {
this.columnIndexStore = columnIndexStore;
this.columns = paths;
this.rowCount = rowCount;
this.rowIndexOffset = rowIndexOffset;
this.fileIndexResult = fileIndexResult;
}

Expand Down Expand Up @@ -227,7 +232,7 @@ private RowRanges applyPredicate(
return allRows();
}

return RowRanges.create(rowCount, func.apply(ci), oi, fileIndexResult);
return RowRanges.create(rowCount, rowIndexOffset, func.apply(ci), oi, fileIndexResult);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@
* row-group, retrieve the count of the matching rows or check overlapping of a row index range.
*
* <p>Note: The class was copied over to support using {@link FileIndexResult} to filter {@link
* RowRanges}. Added a new method {@link RowRanges#create(long, PrimitiveIterator.OfInt,
* RowRanges}. Added a new method {@link RowRanges#create(long, long, PrimitiveIterator.OfInt,
* OffsetIndex, FileIndexResult)}
*
* @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, FileIndexResult)
* @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, long,
* FileIndexResult)
*/
public class RowRanges {

Expand Down Expand Up @@ -165,6 +166,7 @@ public static RowRanges create(
/** Support using {@link FileIndexResult} to filter the row ranges. */
public static RowRanges create(
long rowCount,
long rowIndexOffset,
PrimitiveIterator.OfInt pageIndexes,
OffsetIndex offsetIndex,
@Nullable FileIndexResult fileIndexResult) {
Expand All @@ -178,13 +180,14 @@ public static RowRanges create(
if (fileIndexResult instanceof BitmapIndexResult) {
RoaringBitmap32 bitmap = ((BitmapIndexResult) fileIndexResult).get();
RoaringBitmap32 range =
RoaringBitmap32.bitmapOfRange(firstRowIndex, lastRowIndex + 1);
RoaringBitmap32.bitmapOfRange(
rowIndexOffset + firstRowIndex, rowIndexOffset + lastRowIndex + 1);
RoaringBitmap32 result = RoaringBitmap32.and(bitmap, range);
if (result.isEmpty()) {
continue;
}
firstRowIndex = result.first();
lastRowIndex = result.last();
firstRowIndex = result.first() - rowIndexOffset;
lastRowIndex = result.last() - rowIndexOffset;
}

ranges.add(new Range(firstRowIndex, lastRowIndex));
Expand Down
Loading