diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java index 5f352f61cd3c..93e02925a55b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java @@ -72,6 +72,10 @@ public long rangeCardinality(long start, long end) { return roaringBitmap.rangeCardinality(start, end); } + public int first() { + return roaringBitmap.first(); + } + public int last() { return roaringBitmap.last(); } @@ -138,6 +142,10 @@ public static RoaringBitmap32 bitmapOf(int... dat) { return roaringBitmap32; } + public static RoaringBitmap32 bitmapOfRange(long min, long max) { + return new RoaringBitmap32(RoaringBitmap.bitmapOfRange(min, max)); + } + public static RoaringBitmap32 and(final RoaringBitmap32 x1, final RoaringBitmap32 x2) { return new RoaringBitmap32(RoaringBitmap.and(x1.roaringBitmap, x2.roaringBitmap)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 01d4e89af95d..9cb8036180dd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -58,6 +58,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.parquet.hadoop.ParquetOutputFormat; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -70,6 +71,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -79,8 +81,11 @@ import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.BUCKET_KEY; import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY; +import static org.apache.paimon.CoreOptions.FILE_FORMAT; +import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET; import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD; import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE; +import static org.apache.paimon.CoreOptions.WRITE_ONLY; import static org.apache.paimon.io.DataFileTestUtils.row; import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket; import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode; @@ -722,6 +727,94 @@ public void testBSIAndBitmapIndexInDisk() throws Exception { }); } + @Test + public void testBitmapIndexResultFilterParquetRowRanges() throws Exception { + RowType rowType = + RowType.builder() + .field("id", DataTypes.INT()) + .field("event", DataTypes.STRING()) + .field("price", DataTypes.INT()) + .build(); + // in unaware-bucket mode, we split files into splits all the time + FileStoreTable table = + createUnawareBucketFileStoreTable( + rowType, + options -> { + options.set(FILE_FORMAT, FILE_FORMAT_PARQUET); + options.set(WRITE_ONLY, true); + options.set( + FileIndexOptions.FILE_INDEX + + "." + + BitSliceIndexBitmapFileIndexFactory.BSI_INDEX + + "." + + CoreOptions.COLUMNS, + "price"); + options.set( + ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100"); + options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300"); + }); + + int bound = 3000; + Random random = new Random(); + Map expectedMap = new HashMap<>(); + for (int i = 0; i < 5; i++) { + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + for (int j = 0; j < 10000; j++) { + int next = random.nextInt(bound); + expectedMap.compute(next, (key, value) -> value == null ? 1 : value + 1); + write.write(GenericRow.of(1, BinaryString.fromString("A"), next)); + } + commit.commit(i, write.prepareCommit(true, i)); + write.close(); + commit.close(); + } + + // test eq + for (int i = 0; i < 10; i++) { + int key = random.nextInt(bound); + Predicate predicate = new PredicateBuilder(rowType).equal(2, key); + TableScan.Plan plan = table.newScan().plan(); + RecordReader reader = + table.newRead().withFilter(predicate).createReader(plan.splits()); + AtomicInteger cnt = new AtomicInteger(0); + reader.forEachRemaining( + row -> { + cnt.incrementAndGet(); + assertThat(row.getInt(2)).isEqualTo(key); + }); + assertThat(cnt.get()).isEqualTo(expectedMap.getOrDefault(key, 0)); + reader.close(); + } + + // test between + for (int i = 0; i < 10; i++) { + int max = random.nextInt(bound); + int min = random.nextInt(max); + Predicate predicate = + PredicateBuilder.and( + new PredicateBuilder(rowType).greaterOrEqual(2, min), + new PredicateBuilder(rowType).lessOrEqual(2, max)); + TableScan.Plan plan = table.newScan().plan(); + RecordReader reader = + table.newRead().withFilter(predicate).createReader(plan.splits()); + AtomicInteger cnt = new AtomicInteger(0); + reader.forEachRemaining( + row -> { + cnt.addAndGet(1); + assertThat(row.getInt(2)).isGreaterThanOrEqualTo(min); + assertThat(row.getInt(2)).isLessThanOrEqualTo(max); + }); + Optional reduce = + expectedMap.entrySet().stream() + .filter(x -> x.getKey() >= min && x.getKey() <= max) + .map(Map.Entry::getValue) + .reduce(Integer::sum); + assertThat(cnt.get()).isEqualTo(reduce.orElse(0)); + reader.close(); + } + } + @Test public void testWithShardAppendTable() throws Exception { FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, -1)); diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index e3fc118ad674..e9f757126afa 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -760,7 +760,8 @@ private RowRanges getRowRanges(int blockIndex) { options.getRecordFilter(), getColumnIndexStore(blockIndex), paths.keySet(), - blocks.get(blockIndex).getRowCount()); + blocks.get(blockIndex).getRowCount(), + fileIndexResult); blockRowRanges.set(blockIndex, rowRanges); } return rowRanges; diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java new file mode 100644 index 000000000000..b2c9365bd646 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.internal.filter2.columnindex; + +import org.apache.paimon.fileindex.FileIndexResult; + +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat; +import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter; +import org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat; +import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor; +import org.apache.parquet.filter2.predicate.Operators; +import org.apache.parquet.filter2.predicate.Operators.And; +import org.apache.parquet.filter2.predicate.Operators.Column; +import org.apache.parquet.filter2.predicate.Operators.Eq; +import org.apache.parquet.filter2.predicate.Operators.Gt; +import org.apache.parquet.filter2.predicate.Operators.GtEq; +import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined; +import org.apache.parquet.filter2.predicate.Operators.Lt; +import org.apache.parquet.filter2.predicate.Operators.LtEq; +import org.apache.parquet.filter2.predicate.Operators.Not; +import org.apache.parquet.filter2.predicate.Operators.NotEq; +import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.UserDefined; +import org.apache.parquet.filter2.predicate.UserDefinedPredicate; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore.MissingOffsetIndexException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.PrimitiveIterator; +import java.util.Set; +import java.util.function.Function; + +/** + * Filter implementation based on column indexes. No filtering will be applied for columns where no + * column index is available. Offset index is required for all the columns in the projection, + * therefore a {@link MissingOffsetIndexException} will be thrown from any {@code visit} methods if + * any of the required offset indexes is missing. + * + *

Note: The class was copied over to support using {@link FileIndexResult} to filter {@link + * RowRanges}. + */ +public class ColumnIndexFilter implements Visitor { + + private static final Logger LOGGER = LoggerFactory.getLogger(ColumnIndexFilter.class); + private final ColumnIndexStore columnIndexStore; + private final Set columns; + private final long rowCount; + @Nullable private final FileIndexResult fileIndexResult; + private RowRanges allRows; + + /** + * Calculates the row ranges containing the indexes of the rows might match the specified + * filter. + * + * @param filter to be used for filtering the rows + * @param columnIndexStore the store for providing column/offset indexes + * @param paths the paths of the columns used in the actual projection; a column not being part + * of the projection will be handled as containing {@code null} values only even if the + * column has values written in the file + * @param fileIndexResult the file index result; it will use to filter row ranges + * @param rowCount the total number of rows in the row-group + * @return the ranges of the possible matching row indexes; the returned ranges will contain all + * the rows if any of the required offset index is missing + */ + public static RowRanges calculateRowRanges( + FilterCompat.Filter filter, + ColumnIndexStore columnIndexStore, + Set paths, + long rowCount, + @Nullable FileIndexResult fileIndexResult) { + return filter.accept( + new FilterCompat.Visitor() { + @Override + public RowRanges visit(FilterPredicateCompat filterPredicateCompat) { + try { + return filterPredicateCompat + .getFilterPredicate() + .accept( + new ColumnIndexFilter( + columnIndexStore, + paths, + rowCount, + fileIndexResult)); + } catch (MissingOffsetIndexException e) { + LOGGER.info(e.getMessage()); + return RowRanges.createSingle(rowCount); + } + } + + @Override + public RowRanges visit(UnboundRecordFilterCompat unboundRecordFilterCompat) { + return RowRanges.createSingle(rowCount); + } + + @Override + public RowRanges visit(NoOpFilter noOpFilter) { + return RowRanges.createSingle(rowCount); + } + }); + } + + private ColumnIndexFilter( + ColumnIndexStore columnIndexStore, + Set paths, + long rowCount, + @Nullable FileIndexResult fileIndexResult) { + this.columnIndexStore = columnIndexStore; + this.columns = paths; + this.rowCount = rowCount; + this.fileIndexResult = fileIndexResult; + } + + private RowRanges allRows() { + if (allRows == null) { + allRows = RowRanges.createSingle(rowCount); + } + return allRows; + } + + @Override + public > RowRanges visit(Eq eq) { + return applyPredicate( + eq.getColumn(), + ci -> ci.visit(eq), + eq.getValue() == null ? allRows() : RowRanges.EMPTY); + } + + @Override + public > RowRanges visit(NotEq notEq) { + return applyPredicate( + notEq.getColumn(), + ci -> ci.visit(notEq), + notEq.getValue() == null ? RowRanges.EMPTY : allRows()); + } + + @Override + public > RowRanges visit(Lt lt) { + return applyPredicate(lt.getColumn(), ci -> ci.visit(lt), RowRanges.EMPTY); + } + + @Override + public > RowRanges visit(LtEq ltEq) { + return applyPredicate(ltEq.getColumn(), ci -> ci.visit(ltEq), RowRanges.EMPTY); + } + + @Override + public > RowRanges visit(Gt gt) { + return applyPredicate(gt.getColumn(), ci -> ci.visit(gt), RowRanges.EMPTY); + } + + @Override + public > RowRanges visit(GtEq gtEq) { + return applyPredicate(gtEq.getColumn(), ci -> ci.visit(gtEq), RowRanges.EMPTY); + } + + @Override + public > RowRanges visit(Operators.In in) { + boolean isNull = in.getValues().contains(null); + return applyPredicate( + in.getColumn(), ci -> ci.visit(in), isNull ? allRows() : RowRanges.EMPTY); + } + + @Override + public > RowRanges visit(Operators.NotIn notIn) { + boolean isNull = notIn.getValues().contains(null); + return applyPredicate( + notIn.getColumn(), ci -> ci.visit(notIn), isNull ? RowRanges.EMPTY : allRows()); + } + + @Override + public , U extends UserDefinedPredicate> RowRanges visit( + UserDefined udp) { + return applyPredicate( + udp.getColumn(), + ci -> ci.visit(udp), + udp.getUserDefinedPredicate().acceptsNullValue() ? allRows() : RowRanges.EMPTY); + } + + @Override + public , U extends UserDefinedPredicate> RowRanges visit( + LogicalNotUserDefined udp) { + return applyPredicate( + udp.getUserDefined().getColumn(), + ci -> ci.visit(udp), + udp.getUserDefined().getUserDefinedPredicate().acceptsNullValue() + ? RowRanges.EMPTY + : allRows()); + } + + private RowRanges applyPredicate( + Column column, + Function func, + RowRanges rangesForMissingColumns) { + ColumnPath columnPath = column.getColumnPath(); + if (!columns.contains(columnPath)) { + return rangesForMissingColumns; + } + + OffsetIndex oi = columnIndexStore.getOffsetIndex(columnPath); + ColumnIndex ci = columnIndexStore.getColumnIndex(columnPath); + if (ci == null) { + LOGGER.info( + "No column index for column {} is available; Unable to filter on this column", + columnPath); + return allRows(); + } + + return RowRanges.create(rowCount, func.apply(ci), oi, fileIndexResult); + } + + @Override + public RowRanges visit(And and) { + RowRanges leftResult = and.getLeft().accept(this); + if (leftResult.getRanges().size() == 0) { + return leftResult; + } + + return RowRanges.intersection(leftResult, and.getRight().accept(this)); + } + + @Override + public RowRanges visit(Or or) { + RowRanges leftResult = or.getLeft().accept(this); + if (leftResult.getRanges().size() == 1 && leftResult.rowCount() == rowCount) { + return leftResult; + } + + return RowRanges.union(leftResult, or.getRight().accept(this)); + } + + @Override + public RowRanges visit(Not not) { + throw new IllegalArgumentException( + "Predicates containing a NOT must be run through LogicalInverseRewriter. " + not); + } +} diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java new file mode 100644 index 000000000000..6963814831d4 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.internal.filter2.columnindex; + +import org.apache.paimon.fileindex.FileIndexResult; +import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; +import org.apache.paimon.utils.RoaringBitmap32; + +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.PrimitiveIterator; +import java.util.Set; + +/** + * Class representing row ranges in a row-group. These row ranges are calculated as a result of the + * column index based filtering. To be used iterate over the matching row indexes to be read from a + * row-group, retrieve the count of the matching rows or check overlapping of a row index range. + * + *

Note: The class was copied over to support using {@link FileIndexResult} to filter {@link + * RowRanges}. Added a new method {@link RowRanges#create(long, PrimitiveIterator.OfInt, + * OffsetIndex, FileIndexResult)} + * + * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, FileIndexResult) + */ +public class RowRanges { + + /** Make it public because some uppler layer application need to access it. */ + public static class Range { + + // Returns the union of the two ranges or null if there are elements between them. + private static Range union(Range left, Range right) { + if (left.from <= right.from) { + if (left.to + 1 >= right.from) { + return new Range(left.from, Math.max(left.to, right.to)); + } + } else if (right.to + 1 >= left.from) { + return new Range(right.from, Math.max(left.to, right.to)); + } + return null; + } + + // Returns the intersection of the two ranges of null if they are not overlapped. + private static Range intersection(Range left, Range right) { + if (left.from <= right.from) { + if (left.to >= right.from) { + return new Range(right.from, Math.min(left.to, right.to)); + } + } else if (right.to >= left.from) { + return new Range(left.from, Math.min(left.to, right.to)); + } + return null; + } + + public final long from; + public final long to; + + // Creates a range of [from, to] (from and to are inclusive; empty ranges are not valid) + Range(long from, long to) { + assert from <= to; + this.from = from; + this.to = to; + } + + long count() { + return to - from + 1; + } + + boolean isBefore(Range other) { + return to < other.from; + } + + boolean isAfter(Range other) { + return from > other.to; + } + + @Override + public String toString() { + return "[" + from + ", " + to + ']'; + } + } + + public static final RowRanges EMPTY = new RowRanges(Collections.emptyList()); + + private final List ranges; + + private RowRanges() { + this(new ArrayList<>()); + } + + private RowRanges(Range range) { + this(Collections.singletonList(range)); + } + + private RowRanges(List ranges) { + this.ranges = ranges; + } + + /** + * Creates an immutable RowRanges object with the single range [0, rowCount - 1]. + * + * @param rowCount a single row count + * @return an immutable RowRanges + */ + public static RowRanges createSingle(long rowCount) { + return new RowRanges(new Range(0L, rowCount - 1L)); + } + + /** + * Creates a mutable RowRanges object with the following ranges: + * + *

+     * [firstRowIndex[0], lastRowIndex[0]],
+     * [firstRowIndex[1], lastRowIndex[1]],
+     * ...,
+     * [firstRowIndex[n], lastRowIndex[n]]
+     * 
+ * + *

(See OffsetIndex.getFirstRowIndex and OffsetIndex.getLastRowIndex for details.) + * + *

The union of the ranges are calculated so the result ranges always contain the disjunct + * ranges. See union for details. + * + * @param rowCount row count + * @param pageIndexes pageIndexes + * @param offsetIndex offsetIndex + * @return a mutable RowRanges + */ + public static RowRanges create( + long rowCount, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex) { + RowRanges ranges = new RowRanges(); + while (pageIndexes.hasNext()) { + int pageIndex = pageIndexes.nextInt(); + ranges.add( + new Range( + offsetIndex.getFirstRowIndex(pageIndex), + offsetIndex.getLastRowIndex(pageIndex, rowCount))); + } + return ranges; + } + + /** Support using {@link FileIndexResult} to filter the row ranges. */ + public static RowRanges create( + long rowCount, + PrimitiveIterator.OfInt pageIndexes, + OffsetIndex offsetIndex, + @Nullable FileIndexResult fileIndexResult) { + RowRanges ranges = new RowRanges(); + while (pageIndexes.hasNext()) { + int pageIndex = pageIndexes.nextInt(); + long firstRowIndex = offsetIndex.getFirstRowIndex(pageIndex); + long lastRowIndex = offsetIndex.getLastRowIndex(pageIndex, rowCount); + + // using file index result to filter or narrow the row ranges + if (fileIndexResult instanceof BitmapIndexResult) { + RoaringBitmap32 bitmap = ((BitmapIndexResult) fileIndexResult).get(); + RoaringBitmap32 range = + RoaringBitmap32.bitmapOfRange(firstRowIndex, lastRowIndex + 1); + RoaringBitmap32 result = RoaringBitmap32.and(bitmap, range); + if (result.isEmpty()) { + continue; + } + firstRowIndex = result.first(); + lastRowIndex = result.last(); + } + + ranges.add(new Range(firstRowIndex, lastRowIndex)); + } + return ranges; + } + + /** + * Calculates the union of the two specified RowRanges object. The union of two range is + * calculated if there are no elements between them. Otherwise, the two disjunct ranges are + * stored separately. + * + *

+     * For example:
+     * [113, 241] ∪ [221, 340] = [113, 340]
+     * [113, 230] ∪ [231, 340] = [113, 340]
+     * while
+     * [113, 230] ∪ [232, 340] = [113, 230], [232, 340]
+     * 
+ * + *

The result RowRanges object will contain all the row indexes that were contained in one of + * the specified objects. + * + * @param left left RowRanges + * @param right right RowRanges + * @return a mutable RowRanges contains all the row indexes that were contained in one of the + * specified objects + */ + public static RowRanges union(RowRanges left, RowRanges right) { + RowRanges result = new RowRanges(); + Iterator it1 = left.ranges.iterator(); + Iterator it2 = right.ranges.iterator(); + if (it2.hasNext()) { + Range range2 = it2.next(); + while (it1.hasNext()) { + Range range1 = it1.next(); + if (range1.isAfter(range2)) { + result.add(range2); + range2 = range1; + Iterator tmp = it1; + it1 = it2; + it2 = tmp; + } else { + result.add(range1); + } + } + result.add(range2); + } else { + it2 = it1; + } + while (it2.hasNext()) { + result.add(it2.next()); + } + + return result; + } + + /** + * Calculates the intersection of the two specified RowRanges object. Two ranges intersect if + * they have common elements otherwise the result is empty. + * + *

+     * For example:
+     * [113, 241] ∩ [221, 340] = [221, 241]
+     * while
+     * [113, 230] ∩ [231, 340] = <EMPTY>
+     * 
+ * + * @param left left RowRanges + * @param right right RowRanges + * @return a mutable RowRanges contains all the row indexes that were contained in both of the + * specified objects + */ + public static RowRanges intersection(RowRanges left, RowRanges right) { + RowRanges result = new RowRanges(); + + int rightIndex = 0; + for (Range l : left.ranges) { + for (int i = rightIndex, n = right.ranges.size(); i < n; ++i) { + Range r = right.ranges.get(i); + if (l.isBefore(r)) { + break; + } else if (l.isAfter(r)) { + rightIndex = i + 1; + continue; + } + result.add(Range.intersection(l, r)); + } + } + + return result; + } + + /* + * Adds a range to the end of the list of ranges. It maintains the disjunct ascending order(*) of the ranges by + * trying to union the specified range to the last ranges in the list. The specified range shall be larger(*) than + * the last one or might be overlapped with some of the last ones. + * (*) [a, b] < [c, d] if b < c + */ + private void add(Range range) { + Range rangeToAdd = range; + for (int i = ranges.size() - 1; i >= 0; --i) { + Range last = ranges.get(i); + assert !last.isAfter(range); + Range u = Range.union(last, rangeToAdd); + if (u == null) { + break; + } + rangeToAdd = u; + ranges.remove(i); + } + ranges.add(rangeToAdd); + } + + /** @return the number of rows in the ranges */ + public long rowCount() { + long cnt = 0; + for (Range range : ranges) { + cnt += range.count(); + } + return cnt; + } + + /** @return the ascending iterator of the row indexes contained in the ranges */ + public PrimitiveIterator.OfLong iterator() { + return new PrimitiveIterator.OfLong() { + private int currentRangeIndex = -1; + private Range currentRange; + private long next = findNext(); + + private long findNext() { + if (currentRange == null || next + 1 > currentRange.to) { + if (currentRangeIndex + 1 < ranges.size()) { + currentRange = ranges.get(++currentRangeIndex); + next = currentRange.from; + } else { + return -1; + } + } else { + ++next; + } + return next; + } + + @Override + public boolean hasNext() { + return next >= 0; + } + + @Override + public long nextLong() { + long ret = next; + if (ret < 0) { + throw new NoSuchElementException(); + } + next = findNext(); + return ret; + } + }; + } + + /** + * @param from the first row of the range to be checked for connection + * @param to the last row of the range to be checked for connection + * @return {@code true} if the specified range is overlapping (have common elements) with one of + * the ranges + */ + public boolean isOverlapping(long from, long to) { + return Collections.binarySearch( + ranges, + new Range(from, to), + (r1, r2) -> r1.isBefore(r2) ? -1 : r1.isAfter(r2) ? 1 : 0) + >= 0; + } + + public List getRanges() { + return ranges; + } + + @Override + public String toString() { + return ranges.toString(); + } +}