Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,36 @@
public class QueryableIndexCursorFactory implements CursorFactory
{
private final QueryableIndex index;
private final TimeBoundaryInspector timeBoundaryInspector;

public QueryableIndexCursorFactory(QueryableIndex index)
/**
* Constructor that accepts a {@link QueryableIndexTimeBoundaryInspector} that is in use elsewhere, promoting
* efficient re-use.
*/
public QueryableIndexCursorFactory(QueryableIndex index, TimeBoundaryInspector timeBoundaryInspector)
{
this.index = index;
this.timeBoundaryInspector = timeBoundaryInspector;
}

/**
* Constructor that creates a new {@link QueryableIndexTimeBoundaryInspector}.
*/
public QueryableIndexCursorFactory(QueryableIndex index)
{
this(index, QueryableIndexTimeBoundaryInspector.create(index));
}

@Override
public CursorHolder makeCursorHolder(CursorBuildSpec spec)
{
QueryableProjection<QueryableIndex> projection = index.getProjection(spec);
if (projection != null) {
return new QueryableIndexCursorHolder(projection.getRowSelector(), projection.getCursorBuildSpec())
return new QueryableIndexCursorHolder(
projection.getRowSelector(),
projection.getCursorBuildSpec(),
timeBoundaryInspector
)
{
@Override
protected ColumnSelectorFactory makeColumnSelectorFactoryForOffset(
Expand Down Expand Up @@ -86,7 +104,7 @@ public List<AggregatorFactory> getAggregatorsForPreAggregated()
}
};
}
return new QueryableIndexCursorHolder(index, CursorBuildSpec.builder(spec).build());
return new QueryableIndexCursorHolder(index, CursorBuildSpec.builder(spec).build(), timeBoundaryInspector);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.BaseQuery;
Expand Down Expand Up @@ -58,6 +57,7 @@
import org.apache.druid.segment.vector.VectorCursor;
import org.apache.druid.segment.vector.VectorOffset;
import org.apache.druid.utils.CloseableUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -86,7 +86,8 @@ public class QueryableIndexCursorHolder implements CursorHolder

public QueryableIndexCursorHolder(
QueryableIndex index,
CursorBuildSpec cursorBuildSpec
CursorBuildSpec cursorBuildSpec,
TimeBoundaryInspector timeBoundaryInspector
)
{
this.index = index;
Expand All @@ -109,6 +110,7 @@ public QueryableIndexCursorHolder(
this.resourcesSupplier = Suppliers.memoize(
() -> new CursorResources(
index,
timeBoundaryInspector,
virtualColumns,
Cursors.getTimeOrdering(ordering),
interval,
Expand Down Expand Up @@ -159,9 +161,8 @@ public Cursor asCursor()
final CursorResources resources = resourcesSupplier.get();
final FilterBundle filterBundle = resources.filterBundle;
final int numRows = resources.numRows;
final long minDataTimestamp = resources.minDataTimestamp;
final long maxDataTimestamp = resources.maxDataTimestamp;
final NumericColumn timestamps = resources.timestamps;
final long minDataTimestamp = resources.timeBoundaryInspector.getMinTime().getMillis();
final long maxDataTimestamp = resources.timeBoundaryInspector.getMaxTime().getMillis();
final ColumnCache columnCache = resources.columnCache;
final Order timeOrder = resources.timeOrder;

Expand All @@ -180,13 +181,13 @@ public Cursor asCursor()

if (timeOrder == Order.ASCENDING) {
for (; baseOffset.withinBounds(); baseOffset.increment()) {
if (timestamps.getLongSingleValueRow(baseOffset.getOffset()) >= timeStart) {
if (resources.getTimestampsColumn().getLongSingleValueRow(baseOffset.getOffset()) >= timeStart) {
break;
}
}
} else if (timeOrder == Order.DESCENDING) {
for (; baseOffset.withinBounds(); baseOffset.increment()) {
if (timestamps.getLongSingleValueRow(baseOffset.getOffset()) < timeEnd) {
if (resources.getTimestampsColumn().getLongSingleValueRow(baseOffset.getOffset()) < timeEnd) {
break;
}
}
Expand All @@ -197,14 +198,14 @@ public Cursor asCursor()
if (timeOrder == Order.ASCENDING) {
offset = new AscendingTimestampCheckingOffset(
baseOffset,
timestamps,
resources.getTimestampsColumn(),
timeEnd,
maxDataTimestamp < timeEnd
);
} else if (timeOrder == Order.DESCENDING) {
offset = new DescendingTimestampCheckingOffset(
baseOffset,
timestamps,
resources.getTimestampsColumn(),
timeStart,
minDataTimestamp >= timeStart
);
Expand Down Expand Up @@ -244,9 +245,8 @@ public VectorCursor asVectorCursor()
{
final CursorResources resources = resourcesSupplier.get();
final FilterBundle filterBundle = resources.filterBundle;
final long minDataTimestamp = resources.minDataTimestamp;
final long maxDataTimestamp = resources.maxDataTimestamp;
final NumericColumn timestamps = resources.timestamps;
final long minDataTimestamp = resources.timeBoundaryInspector.getMinTime().getMillis();
final long maxDataTimestamp = resources.timeBoundaryInspector.getMaxTime().getMillis();
final ColumnCache columnCache = resources.columnCache;
final Order timeOrder = resources.timeOrder;

Expand All @@ -265,13 +265,13 @@ public VectorCursor asVectorCursor()
final int endOffset;

if (timeOrder != Order.NONE && interval.getStartMillis() > minDataTimestamp) {
startOffset = timeSearch(timestamps, interval.getStartMillis(), 0, index.getNumRows());
startOffset = timeSearch(resources.getTimestampsColumn(), interval.getStartMillis(), 0, index.getNumRows());
} else {
startOffset = 0;
}

if (timeOrder != Order.NONE && interval.getEndMillis() <= maxDataTimestamp) {
endOffset = timeSearch(timestamps, interval.getEndMillis(), startOffset, index.getNumRows());
endOffset = timeSearch(resources.getTimestampsColumn(), interval.getEndMillis(), startOffset, index.getNumRows());
} else {
endOffset = index.getNumRows();
}
Expand Down Expand Up @@ -660,17 +660,18 @@ public Offset clone()
private static final class CursorResources implements Closeable
{
private final Closer closer;
private final long minDataTimestamp;
private final long maxDataTimestamp;
private final TimeBoundaryInspector timeBoundaryInspector;
private final int numRows;
@Nullable
private final FilterBundle filterBundle;
private final NumericColumn timestamps;
private final Order timeOrder;
private final ColumnCache columnCache;
@MonotonicNonNull
private NumericColumn timestamps;

private CursorResources(
QueryableIndex index,
TimeBoundaryInspector timeBoundaryInspector,
VirtualColumns virtualColumns,
Order timeOrder,
Interval interval,
Expand All @@ -681,21 +682,18 @@ private CursorResources(
{
this.closer = Closer.create();
this.columnCache = new ColumnCache(index, closer);
this.timeBoundaryInspector = timeBoundaryInspector;
final ColumnSelectorColumnIndexSelector bitmapIndexSelector = new ColumnSelectorColumnIndexSelector(
index.getBitmapFactoryForDimensions(),
virtualColumns,
columnCache
);
try {
this.numRows = index.getNumRows();
this.timestamps = (NumericColumn) columnCache.getColumn(ColumnHolder.TIME_COLUMN_NAME);
this.minDataTimestamp = DateTimes.utc(timestamps.getLongSingleValueRow(0)).getMillis();
this.maxDataTimestamp = DateTimes.utc(timestamps.getLongSingleValueRow(timestamps.length() - 1)).getMillis();
this.filterBundle = makeFilterBundle(
computeFilterWithIntervalIfNeeded(
timeBoundaryInspector,
timeOrder,
this.minDataTimestamp,
this.maxDataTimestamp,
interval,
filter
),
Expand All @@ -711,6 +709,14 @@ private CursorResources(
}
}

public NumericColumn getTimestampsColumn()
{
if (timestamps == null) {
timestamps = (NumericColumn) columnCache.getColumn(ColumnHolder.TIME_COLUMN_NAME);
}
return timestamps;
}

@Override
public void close() throws IOException
{
Expand Down Expand Up @@ -781,20 +787,20 @@ private static FilterBundle makeFilterBundle(
*/
@Nullable
private static Filter computeFilterWithIntervalIfNeeded(
final TimeBoundaryInspector timeBoundaryInspector,
final Order timeOrder,
final long minDataTimestamp,
final long maxDataTimestamp,
final Interval interval,
@Nullable final Filter filter
)
{
if (timeOrder == Order.NONE
&& (minDataTimestamp < interval.getStartMillis() || maxDataTimestamp >= interval.getEndMillis())) {
&& (timeBoundaryInspector.getMinTime().getMillis() < interval.getStartMillis()
|| timeBoundaryInspector.getMaxTime().getMillis() >= interval.getEndMillis())) {
final RangeFilter timeFilter = new RangeFilter(
ColumnHolder.TIME_COLUMN_NAME,
ColumnType.LONG,
minDataTimestamp < interval.getStartMillis() ? interval.getStartMillis() : null,
maxDataTimestamp >= interval.getEndMillis() ? interval.getEndMillis() : null,
timeBoundaryInspector.getMinTime().getMillis() < interval.getStartMillis() ? interval.getStartMillis() : null,
timeBoundaryInspector.getMaxTime().getMillis() >= interval.getEndMillis() ? interval.getEndMillis() : null,
false,
true,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public class QueryableIndexSegment implements Segment
public QueryableIndexSegment(QueryableIndex index, final SegmentId segmentId)
{
this.index = index;
this.cursorFactory = new QueryableIndexCursorFactory(index);
this.timeBoundaryInspector = QueryableIndexTimeBoundaryInspector.create(index);
this.cursorFactory = new QueryableIndexCursorFactory(index, timeBoundaryInspector);
this.segmentId = segmentId;
}

Expand Down