diff --git a/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java index 5a15f55e8148..af9642e1c1a1 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java @@ -508,7 +508,7 @@ public void readComplexOrFilterCNF(Blackhole blackhole) throws Exception private Sequence makeCursors(StorageAdapter sa, Filter filter) { - return sa.makeCursors(filter, schemaInfo.getDataInterval(), VirtualColumns.EMPTY, Granularities.ALL, false); + return sa.makeCursors(filter, schemaInfo.getDataInterval(), VirtualColumns.EMPTY, Granularities.ALL, false, null); } private Sequence> readCursors(Sequence cursors, final Blackhole blackhole) diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java index a0b67105e01d..03c45f7de7d3 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java @@ -205,7 +205,8 @@ private Sequence makeCursors(IncrementalIndexStorageAdapter sa, DimFilte schemaInfo.getDataInterval(), VirtualColumns.EMPTY, Granularities.ALL, - false + false, + null ); } } diff --git a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java index 9a82a171d17c..1e6b6d100681 100644 --- a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java +++ b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java @@ -112,10 +112,7 @@ public ByteBuffer get() .build(); final Iterable> results = Sequences.toList( - engine.query( - query, - new IncrementalIndexStorageAdapter(index) - ), + engine.query(query, new IncrementalIndexStorageAdapter(index), null), Lists.>newLinkedList() ); diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 57385f50c116..3011f7b2b4bd 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -116,7 +116,8 @@ public Sequence process( intervals.get(0), VirtualColumns.EMPTY, Granularities.ALL, - query.isDescending() + query.isDescending(), + null ), new Function>() { diff --git a/processing/src/main/java/io/druid/query/BitmapResultFactory.java b/processing/src/main/java/io/druid/query/BitmapResultFactory.java new file mode 100644 index 000000000000..2eade4ef7821 --- /dev/null +++ b/processing/src/main/java/io/druid/query/BitmapResultFactory.java @@ -0,0 +1,107 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import io.druid.collections.bitmap.BitmapFactory; +import io.druid.collections.bitmap.ImmutableBitmap; + +/** + * BitmapResultFactory is an abstraction that allows to record something along with preFilter bitmap construction, and + * emit this information as dimension(s) of query metrics. BitmapResultFactory is similar to {@link + * io.druid.collections.bitmap.BitmapFactory}: it has the same methods with the exception that it accepts generic type + * T (bitmap wrapper type) instead of {@link ImmutableBitmap}. + * + * {@link DefaultBitmapResultFactory} is a no-op implementation, where "wrapper" type is {@code ImmutableBitmap} itself. + * + * BitmapResultFactory delegates actual operations on bitmaps to a {@code BitmapFactory}, which it accepts in + * constructor, called from {@link QueryMetrics#makeBitmapResultFactory(BitmapFactory)}. + * + * Emitting of query metric dimension(s) should be done from {@link #toImmutableBitmap(Object)}, the "unwrapping" + * method, called only once to obtain the final preFilter bitmap. + * + * Implementors expectations + * ------------------------- + * BitmapResultFactory is a part of the {@link QueryMetrics} subsystem, so this interface could be changed often, in + * every Druid release (including "patch" releases). Users who create their custom implementations of + * BitmapResultFactory should be ready to fix the code of their code to accommodate interface changes (e. g. implement + * new methods) when they update Druid. See {@link QueryMetrics} Javadoc for more info. + * + * @param the bitmap result (wrapper) type + * @see QueryMetrics#makeBitmapResultFactory(BitmapFactory) + * @see QueryMetrics#reportBitmapConstructionTime(long) + */ +public interface BitmapResultFactory +{ + /** + * Wraps a bitmap of unknown nature. + */ + T wrapUnknown(ImmutableBitmap bitmap); + + /** + * Wraps a bitmap which designates rows in a segment with some specific dimension value. + */ + T wrapDimensionValue(ImmutableBitmap bitmap); + + /** + * Wraps a bitmap which is a result of {@link BitmapFactory#makeEmptyImmutableBitmap()} call. + */ + T wrapAllFalse(ImmutableBitmap allFalseBitmap); + + /** + * Wraps a bitmap which is a result of {@link BitmapFactory#complement(ImmutableBitmap)} called with + * {@link BitmapFactory#makeEmptyImmutableBitmap()} as argument. + */ + T wrapAllTrue(ImmutableBitmap allTrueBitmap); + + /** + * Checks that the wrapped bitmap is empty, see {@link ImmutableBitmap#isEmpty()}. + */ + boolean isEmpty(T bitmapResult); + + /** + * Delegates to {@link BitmapFactory#intersection(Iterable)} on the wrapped bitmaps, and returns a bitmap result + * wrapping the resulting intersection ImmutableBitmap. + */ + T intersection(Iterable bitmapResults); + + /** + * Delegates to {@link BitmapFactory#union(Iterable)} on the wrapped bitmaps, and returns a bitmap result wrapping + * the resulting union ImmutableBitmap. + */ + T union(Iterable bitmapResults); + + /** + * Equivalent of intersection(Iterables.transform(dimensionValueBitmaps, factory::wrapDimensionValue)), but doesn't + * create a lot of bitmap result objects. + */ + T unionDimensionValueBitmaps(Iterable dimensionValueBitmaps); + + /** + * Delegates to {@link BitmapFactory#complement(ImmutableBitmap, int)} on the wrapped bitmap, and returns a bitmap + * result wrapping the resulting complement ImmutableBitmap. + */ + T complement(T bitmapResult, int numRows); + + /** + * Unwraps bitmapResult back to ImmutableBitmap. BitmapResultFactory should emit query metric dimension(s) in the + * implementation of this method. + */ + ImmutableBitmap toImmutableBitmap(T bitmapResult); +} diff --git a/processing/src/main/java/io/druid/query/DefaultBitmapResultFactory.java b/processing/src/main/java/io/druid/query/DefaultBitmapResultFactory.java new file mode 100644 index 000000000000..0e80b9da7619 --- /dev/null +++ b/processing/src/main/java/io/druid/query/DefaultBitmapResultFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import io.druid.collections.bitmap.BitmapFactory; +import io.druid.collections.bitmap.ImmutableBitmap; + +public final class DefaultBitmapResultFactory implements BitmapResultFactory +{ + private final BitmapFactory factory; + + public DefaultBitmapResultFactory(BitmapFactory factory) + { + this.factory = factory; + } + + @Override + public ImmutableBitmap wrapUnknown(ImmutableBitmap bitmap) + { + return bitmap; + } + + @Override + public ImmutableBitmap wrapDimensionValue(ImmutableBitmap bitmap) + { + return bitmap; + } + + @Override + public ImmutableBitmap wrapAllFalse(ImmutableBitmap allFalseBitmap) + { + return allFalseBitmap; + } + + @Override + public ImmutableBitmap wrapAllTrue(ImmutableBitmap allTrueBitmap) + { + return allTrueBitmap; + } + + @Override + public boolean isEmpty(ImmutableBitmap bitmapResult) + { + return bitmapResult.isEmpty(); + } + + @Override + public ImmutableBitmap intersection(Iterable bitmapResults) + { + return factory.intersection(bitmapResults); + } + + @Override + public ImmutableBitmap union(Iterable bitmapResults) + { + return factory.union(bitmapResults); + } + + @Override + public ImmutableBitmap unionDimensionValueBitmaps(Iterable dimensionValueBitmaps) + { + return factory.union(dimensionValueBitmaps); + } + + @Override + public ImmutableBitmap complement(ImmutableBitmap bitmapResult, int numRows) + { + return factory.complement(bitmapResult, numRows); + } + + @Override + public ImmutableBitmap toImmutableBitmap(ImmutableBitmap bitmapResult) + { + return bitmapResult; + } +} diff --git a/processing/src/main/java/io/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/io/druid/query/DefaultQueryMetrics.java index 57c6099f23bd..8cc363a41c04 100644 --- a/processing/src/main/java/io/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/io/druid/query/DefaultQueryMetrics.java @@ -25,9 +25,12 @@ import com.google.common.collect.ImmutableMap; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.collections.bitmap.BitmapFactory; +import io.druid.query.filter.Filter; import org.joda.time.Interval; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -168,6 +171,24 @@ public void chunkInterval(Interval interval) setDimension("chunkInterval", interval.toString()); } + @Override + public void preFilters(List preFilters) + { + // Emit nothing by default. + } + + @Override + public void postFilters(List postFilters) + { + // Emit nothing by default. + } + + @Override + public BitmapResultFactory makeBitmapResultFactory(BitmapFactory factory) + { + return new DefaultBitmapResultFactory(factory); + } + @Override public QueryMetrics reportQueryTime(long timeNs) { @@ -240,6 +261,27 @@ public QueryMetrics reportNodeBytes(long byteCount) return reportMetric("query/node/bytes", byteCount); } + @Override + public QueryMetrics reportBitmapConstructionTime(long timeNs) + { + // Don't emit by default. + return this; + } + + @Override + public QueryMetrics reportSegmentRows(long numRows) + { + // Don't emit by default. + return this; + } + + @Override + public QueryMetrics reportPreFilteredRows(long numRows) + { + // Don't emit by default. + return this; + } + @Override public void emit(ServiceEmitter emitter) { diff --git a/processing/src/main/java/io/druid/query/QueryMetrics.java b/processing/src/main/java/io/druid/query/QueryMetrics.java index a184de96a54c..86c29fe8b726 100644 --- a/processing/src/main/java/io/druid/query/QueryMetrics.java +++ b/processing/src/main/java/io/druid/query/QueryMetrics.java @@ -20,8 +20,12 @@ package io.druid.query; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.collections.bitmap.BitmapFactory; +import io.druid.query.filter.Filter; import org.joda.time.Interval; +import java.util.List; + /** * Abstraction wrapping {@link com.metamx.emitter.service.ServiceMetricEvent.Builder} and allowing to control what * metrics are actually emitted, what dimensions do they have, etc. @@ -196,6 +200,18 @@ public interface QueryMetrics> void chunkInterval(Interval interval); + void preFilters(List preFilters); + + void postFilters(List postFilters); + + /** + * Creates a {@link BitmapResultFactory} which may record some information along bitmap construction from {@link + * #preFilters(List)}. The returned BitmapResultFactory may add some dimensions to this QueryMetrics from it's {@link + * BitmapResultFactory#toImmutableBitmap(Object)} method. See {@link BitmapResultFactory} Javadoc for more + * information. + */ + BitmapResultFactory makeBitmapResultFactory(BitmapFactory factory); + /** * Registers "query time" metric. */ @@ -246,6 +262,23 @@ public interface QueryMetrics> */ QueryMetrics reportNodeBytes(long byteCount); + /** + * Reports the time spent constructing bitmap from {@link #preFilters(List)} of the query. Not reported, if there are + * no preFilters. + */ + QueryMetrics reportBitmapConstructionTime(long timeNs); + + /** + * Reports the total number of rows in the processed segment. + */ + QueryMetrics reportSegmentRows(long numRows); + + /** + * Reports the number of rows to scan in the segment after applying {@link #preFilters(List)}. If the are no + * preFilters, this metric is equal to {@link #reportSegmentRows(long)}. + */ + QueryMetrics reportPreFilteredRows(long numRows); + /** * Emits all metrics, registered since the last {@code emit()} call on this QueryMetrics object. */ diff --git a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java index ed619f2711cf..f224cb4e98ad 100644 --- a/processing/src/main/java/io/druid/query/QueryRunnerHelper.java +++ b/processing/src/main/java/io/druid/query/QueryRunnerHelper.java @@ -58,7 +58,7 @@ public static Sequence> makeCursorBasedQuery( return Sequences.filter( Sequences.map( - adapter.makeCursors(filter, queryIntervals.get(0), virtualColumns, granularity, descending), + adapter.makeCursors(filter, queryIntervals.get(0), virtualColumns, granularity, descending, null), new Function>() { @Override diff --git a/processing/src/main/java/io/druid/query/filter/Filter.java b/processing/src/main/java/io/druid/query/filter/Filter.java index 0aabd4beeef3..1c2d540cabb8 100644 --- a/processing/src/main/java/io/druid/query/filter/Filter.java +++ b/processing/src/main/java/io/druid/query/filter/Filter.java @@ -20,10 +20,16 @@ package io.druid.query.filter; import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.query.BitmapResultFactory; +import io.druid.query.DefaultBitmapResultFactory; import io.druid.segment.ColumnSelector; import io.druid.segment.ColumnSelectorFactory; /** + * {@link #getBitmapIndex} and {@link #getBitmapResult} methods both have default implementations, delegating to each + * other. Every implementation of {@link Filter} should override {@link #getBitmapResult}, currently it has a default + * implementation for compatibility with Filters in extensions. In Druid 0.11 {@link #getBitmapResult} is going to + * become an abstract method without a default implementation. */ public interface Filter { @@ -36,8 +42,15 @@ public interface Filter * * @see Filter#estimateSelectivity(BitmapIndexSelector) */ - ImmutableBitmap getBitmapIndex(BitmapIndexSelector selector); + default ImmutableBitmap getBitmapIndex(BitmapIndexSelector selector) + { + return getBitmapResult(selector, new DefaultBitmapResultFactory(selector.getBitmapFactory())); + } + default T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) + { + return bitmapResultFactory.wrapUnknown(getBitmapIndex(selector)); + } /** * Estimate selectivity of this filter. diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index 621298a34c99..32da5b3b268e 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -103,7 +103,8 @@ public Sequence process(final GroupByQuery query, final StorageAdapter stor intervals.get(0), query.getVirtualColumns(), query.getGranularity(), - false + false, + null ); final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index a0f82b996304..dd7e0aa1c4a3 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -107,7 +107,8 @@ public static Sequence process( intervals.get(0), query.getVirtualColumns(), query.getGranularity(), - false + false, + null ); diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java index ec2e3f83cf8b..e9706f06d4f4 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentAnalyzer.java @@ -254,7 +254,8 @@ private ColumnAnalysis analyzeStringColumn( new Interval(start, end), VirtualColumns.EMPTY, Granularities.ALL, - false + false, + null ); size = cursors.accumulate( diff --git a/processing/src/main/java/io/druid/query/search/search/CursorOnlyStrategy.java b/processing/src/main/java/io/druid/query/search/search/CursorOnlyStrategy.java index 6bee733e7184..88c253cd76f5 100644 --- a/processing/src/main/java/io/druid/query/search/search/CursorOnlyStrategy.java +++ b/processing/src/main/java/io/druid/query/search/search/CursorOnlyStrategy.java @@ -95,7 +95,8 @@ public Object2IntRBTreeMap execute(final int limit) interval, VirtualColumns.EMPTY, query.getGranularity(), - query.isDescending() + query.isDescending(), + null ); final Object2IntRBTreeMap retVal = new Object2IntRBTreeMap<>(query.getSort().getComparator()); diff --git a/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java b/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java index 0965281c3bcd..e826d2cd81b0 100644 --- a/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java @@ -29,6 +29,7 @@ import io.druid.segment.Capabilities; import io.druid.segment.Cursor; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; @@ -67,7 +68,10 @@ public TopNParams makeInitParams( @Override public void run( - TopNParams params, TopNResultBuilder resultBuilder, int[] ints + TopNParams params, + TopNResultBuilder resultBuilder, + int[] ints, + @Nullable TopNQueryMetrics queryMetrics ) { final String metric = query.getTopNMetricSpec().getMetricName(query.getDimensionSpec()); @@ -95,7 +99,8 @@ public void run( singleMetricAlgo.run( singleMetricParam, singleMetricResultBuilder, - null + null, + null // Don't collect metrics during the preparation run. ); // Get only the topN dimension values @@ -113,7 +118,8 @@ public void run( allMetricAlgo.run( allMetricsParam, resultBuilder, - dimValSelector + dimValSelector, + queryMetrics ); } finally { diff --git a/processing/src/main/java/io/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/BaseTopNAlgorithm.java index 0122fefdc440..8430c3878a89 100644 --- a/processing/src/main/java/io/druid/query/topn/BaseTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/BaseTopNAlgorithm.java @@ -30,6 +30,7 @@ import io.druid.segment.DimensionSelector; import io.druid.segment.IdLookup; +import javax.annotation.Nullable; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -72,26 +73,32 @@ protected BaseTopNAlgorithm(Capabilities capabilities) public void run( Parameters params, TopNResultBuilder resultBuilder, - DimValSelector dimValSelector + DimValSelector dimValSelector, + @Nullable TopNQueryMetrics queryMetrics ) { if (params.getCardinality() != TopNColumnSelectorStrategy.CARDINALITY_UNKNOWN) { - runWithCardinalityKnown(params, resultBuilder, dimValSelector); + runWithCardinalityKnown(params, resultBuilder, dimValSelector, queryMetrics); } else { - runWithCardinalityUnknown(params, resultBuilder); + runWithCardinalityUnknown(params, resultBuilder, queryMetrics); } } private void runWithCardinalityKnown( Parameters params, TopNResultBuilder resultBuilder, - DimValSelector dimValSelector + DimValSelector dimValSelector, + @Nullable TopNQueryMetrics queryMetrics ) { + if (queryMetrics != null) { + queryMetrics.startRecordingScanTime(); + } boolean hasDimValSelector = (dimValSelector != null); int cardinality = params.getCardinality(); int numProcessed = 0; + long processedRows = 0; while (numProcessed < cardinality) { final int numToProcess; int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed); @@ -108,7 +115,7 @@ private void runWithCardinalityKnown( DimValAggregateStore aggregatesStore = makeDimValAggregateStore(params); - scanAndAggregate(params, theDimValSelector, aggregatesStore, numProcessed); + processedRows = scanAndAggregate(params, theDimValSelector, aggregatesStore, numProcessed); updateResults(params, theDimValSelector, aggregatesStore, resultBuilder); @@ -117,6 +124,10 @@ private void runWithCardinalityKnown( numProcessed += numToProcess; params.getCursor().reset(); } + if (queryMetrics != null) { + queryMetrics.addProcessedRows(processedRows); + queryMetrics.stopRecordingScanTime(); + } } /** @@ -128,14 +139,22 @@ private void runWithCardinalityKnown( */ private void runWithCardinalityUnknown( Parameters params, - TopNResultBuilder resultBuilder + TopNResultBuilder resultBuilder, + @Nullable TopNQueryMetrics queryMetrics ) { DimValAggregateStore aggregatesStore = makeDimValAggregateStore(params); - scanAndAggregate(params, null, aggregatesStore, 0); + if (queryMetrics != null) { + queryMetrics.startRecordingScanTime(); + } + long processedRows = scanAndAggregate(params, null, aggregatesStore, 0); updateResults(params, null, aggregatesStore, resultBuilder); closeAggregators(aggregatesStore); params.getCursor().reset(); + if (queryMetrics != null) { + queryMetrics.addProcessedRows(processedRows); + queryMetrics.stopRecordingScanTime(); + } } protected abstract DimValSelector makeDimValSelector(Parameters params, int numProcessed, int numToProcess); @@ -162,7 +181,10 @@ protected abstract DimValSelector updateDimValSelector( protected abstract DimValAggregateStore makeDimValAggregateStore(Parameters params); - protected abstract void scanAndAggregate( + /** + * Returns the number of processed rows (i. e. after postFilters are applied inside the cursor being processed). + */ + protected abstract long scanAndAggregate( Parameters params, DimValSelector dimValSelector, DimValAggregateStore dimValAggregateStore, diff --git a/processing/src/main/java/io/druid/query/topn/DefaultTopNQueryMetrics.java b/processing/src/main/java/io/druid/query/topn/DefaultTopNQueryMetrics.java index 9dadbbcfd480..0ad35e3c0731 100644 --- a/processing/src/main/java/io/druid/query/topn/DefaultTopNQueryMetrics.java +++ b/processing/src/main/java/io/druid/query/topn/DefaultTopNQueryMetrics.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.druid.query.DefaultQueryMetrics; import io.druid.query.DruidMetrics; +import io.druid.segment.ColumnValueSelector; +import io.druid.segment.Cursor; public class DefaultTopNQueryMetrics extends DefaultQueryMetrics implements TopNQueryMetrics { @@ -65,4 +67,54 @@ public void numComplexMetrics(TopNQuery query) int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()); setDimension("numComplexMetrics", String.valueOf(numComplexAggs)); } + + @Override + public void dimensionCardinality(int cardinality) + { + // Don't emit by default. + } + + @Override + public void algorithm(TopNAlgorithm algorithm) + { + // Emit nothing by default. + } + + @Override + public void cursor(Cursor cursor) + { + // Emit nothing by default. + } + + @Override + public void columnValueSelector(ColumnValueSelector columnValueSelector) + { + // Emit nothing by default. + } + + @Override + public void numValuesPerPass(TopNParams params) + { + // Don't emit by default. + } + + @Override + public TopNQueryMetrics addProcessedRows(long numRows) + { + // Emit nothing by default. + return this; + } + + @Override + public void startRecordingScanTime() + { + // Don't record scan time by default. + } + + @Override + public TopNQueryMetrics stopRecordingScanTime() + { + // Emit nothing by default. + return this; + } } diff --git a/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java index 807e4c54cc5a..404f363fdc6d 100644 --- a/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java @@ -82,7 +82,7 @@ protected Map makeDimValAggregateStore(TopNParams para } @Override - public void scanAndAggregate( + public long scanAndAggregate( TopNParams params, Aggregator[][] rowSelector, Map aggregatesStore, @@ -92,7 +92,7 @@ public void scanAndAggregate( final Cursor cursor = params.getCursor(); final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); - selectorPlus.getColumnSelectorStrategy().dimExtractionScanAndAggregate( + return selectorPlus.getColumnSelectorStrategy().dimExtractionScanAndAggregate( query, selectorPlus.getSelector(), cursor, diff --git a/processing/src/main/java/io/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java b/processing/src/main/java/io/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java index e5269fd1706f..d98d1fbd2ee0 100644 --- a/processing/src/main/java/io/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java +++ b/processing/src/main/java/io/druid/query/topn/Generic1AggPooledTopNScannerPrototype.java @@ -38,7 +38,7 @@ public long scanAndAggregate( ByteBuffer resultsBuffer ) { - long scannedRows = 0; + long processedRows = 0; int positionToAllocate = 0; while (!cursor.isDoneOrInterrupted()) { final IndexedInts dimValues = dimensionSelector.getRow(); @@ -56,9 +56,9 @@ public long scanAndAggregate( positionToAllocate += aggregatorSize; } } - scannedRows++; + processedRows++; cursor.advanceUninterruptibly(); } - return scannedRows; + return processedRows; } } diff --git a/processing/src/main/java/io/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java b/processing/src/main/java/io/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java index d6a039e0f540..491cb71f7669 100644 --- a/processing/src/main/java/io/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java +++ b/processing/src/main/java/io/druid/query/topn/Generic2AggPooledTopNScannerPrototype.java @@ -41,7 +41,7 @@ public long scanAndAggregate( ) { int totalAggregatorsSize = aggregator1Size + aggregator2Size; - long scannedRows = 0; + long processedRows = 0; int positionToAllocate = 0; while (!cursor.isDoneOrInterrupted()) { final IndexedInts dimValues = dimensionSelector.getRow(); @@ -63,9 +63,9 @@ public long scanAndAggregate( positionToAllocate += totalAggregatorsSize; } } - scannedRows++; + processedRows++; cursor.advanceUninterruptibly(); } - return scannedRows; + return processedRows; } } diff --git a/processing/src/main/java/io/druid/query/topn/Historical1SimpleDoubleAggPooledTopNScannerPrototype.java b/processing/src/main/java/io/druid/query/topn/Historical1SimpleDoubleAggPooledTopNScannerPrototype.java index e7d8aaf52a67..d6f4fb84fcc5 100644 --- a/processing/src/main/java/io/druid/query/topn/Historical1SimpleDoubleAggPooledTopNScannerPrototype.java +++ b/processing/src/main/java/io/druid/query/topn/Historical1SimpleDoubleAggPooledTopNScannerPrototype.java @@ -48,7 +48,7 @@ public long scanAndAggregate( { // See TopNUtils.copyOffset() for explanation Offset offset = (Offset) TopNUtils.copyOffset(cursor); - long scannedRows = 0; + long processedRows = 0; int positionToAllocate = 0; while (offset.withinBounds() && !Thread.currentThread().isInterrupted()) { int rowNum = offset.getOffset(); @@ -66,9 +66,9 @@ public long scanAndAggregate( positionToAllocate += aggregatorSize; } } - scannedRows++; + processedRows++; offset.increment(); } - return scannedRows; + return processedRows; } } diff --git a/processing/src/main/java/io/druid/query/topn/HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype.java b/processing/src/main/java/io/druid/query/topn/HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype.java index 91f77c4a8091..b35c9400c792 100644 --- a/processing/src/main/java/io/druid/query/topn/HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype.java +++ b/processing/src/main/java/io/druid/query/topn/HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype.java @@ -47,7 +47,7 @@ public long scanAndAggregate( { // See TopNUtils.copyOffset() for explanation Offset offset = (Offset) TopNUtils.copyOffset(cursor); - long scannedRows = 0; + long processedRows = 0; int positionToAllocate = 0; while (offset.withinBounds() && !Thread.currentThread().isInterrupted()) { int rowNum = offset.getOffset(); @@ -60,9 +60,9 @@ public long scanAndAggregate( aggregator.putFirst(resultsBuffer, positionToAllocate, metricSelector.get(rowNum)); positionToAllocate += aggregatorSize; } - scannedRows++; + processedRows++; offset.increment(); } - return scannedRows; + return processedRows; } } diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java index 4d5974e2e597..27fea230d68c 100644 --- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java @@ -105,10 +105,10 @@ static void setSpecializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledT private interface ScanAndAggregate { /** - * If this implementation of ScanAndAggregate is executable with the given parameters, run it and return true. - * Otherwise return false (scanning and aggregation is not done). + * If this implementation of ScanAndAggregate is executable with the given parameters, run it and return the number + * of processed rows. Otherwise return -1 (scanning and aggregation is not done). */ - boolean scanAndAggregate( + long scanAndAggregate( final PooledTopNParams params, final int[] positions, final BufferAggregator[] theAggregators @@ -132,18 +132,17 @@ private static void computeSpecializedScanAndAggregateImplementations() if (cursor instanceof HistoricalCursor && aggregator instanceof SimpleDoubleBufferAggregator) { if (params.getDimSelector() instanceof SingleValueHistoricalDimensionSelector && ((SimpleDoubleBufferAggregator) aggregator).getSelector() instanceof HistoricalFloatColumnSelector) { - scanAndAggregateHistorical1SimpleDoubleAgg( + return scanAndAggregateHistorical1SimpleDoubleAgg( params, positions, (SimpleDoubleBufferAggregator) aggregator, (HistoricalCursor) cursor, defaultHistoricalSingleValueDimSelector1SimpleDoubleAggScanner ); - return true; } } } - return false; + return -1; }); } if (specializeHistorical1SimpleDoubleAggPooledTopN) { @@ -154,36 +153,33 @@ private static void computeSpecializedScanAndAggregateImplementations() if (cursor instanceof HistoricalCursor && aggregator instanceof SimpleDoubleBufferAggregator) { if (params.getDimSelector() instanceof HistoricalDimensionSelector && ((SimpleDoubleBufferAggregator) aggregator).getSelector() instanceof HistoricalFloatColumnSelector) { - scanAndAggregateHistorical1SimpleDoubleAgg( + return scanAndAggregateHistorical1SimpleDoubleAgg( params, positions, (SimpleDoubleBufferAggregator) aggregator, (HistoricalCursor) cursor, defaultHistorical1SimpleDoubleAggScanner ); - return true; } } } - return false; + return -1; }); } if (specializeGeneric1AggPooledTopN) { specializedScanAndAggregateImplementations.add((params, positions, theAggregators) -> { if (theAggregators.length == 1) { - scanAndAggregateGeneric1Agg(params, positions, theAggregators[0], params.getCursor()); - return true; + return scanAndAggregateGeneric1Agg(params, positions, theAggregators[0], params.getCursor()); } - return false; + return -1; }); } if (specializeGeneric2AggPooledTopN) { specializedScanAndAggregateImplementations.add((params, positions, theAggregators) -> { if (theAggregators.length == 2) { - scanAndAggregateGeneric2Agg(params, positions, theAggregators, params.getCursor()); - return true; + return scanAndAggregateGeneric2Agg(params, positions, theAggregators, params.getCursor()); } - return false; + return -1; }); } } @@ -313,7 +309,7 @@ protected BufferAggregator[] makeDimValAggregateStore(PooledTopNParams params) } @Override - protected void scanAndAggregate( + protected long scanAndAggregate( final PooledTopNParams params, final int[] positions, final BufferAggregator[] theAggregators, @@ -321,16 +317,18 @@ protected void scanAndAggregate( ) { for (ScanAndAggregate specializedScanAndAggregate : specializedScanAndAggregateImplementations) { - if (specializedScanAndAggregate.scanAndAggregate(params, positions, theAggregators)) { + long processedRows = specializedScanAndAggregate.scanAndAggregate(params, positions, theAggregators); + if (processedRows >= 0) { BaseQuery.checkInterrupted(); - return; + return processedRows; } } - scanAndAggregateDefault(params, positions, theAggregators); + long processedRows = scanAndAggregateDefault(params, positions, theAggregators); BaseQuery.checkInterrupted(); + return processedRows; } - private static void scanAndAggregateHistorical1SimpleDoubleAgg( + private static long scanAndAggregateHistorical1SimpleDoubleAgg( PooledTopNParams params, int[] positions, SimpleDoubleBufferAggregator aggregator, @@ -347,7 +345,7 @@ private static void scanAndAggregateHistorical1SimpleDoubleAgg( ); Historical1AggPooledTopNScanner scanner = specializationState.getSpecializedOrDefault(prototypeScanner); - long scannedRows = scanner.scanAndAggregate( + long processedRows = scanner.scanAndAggregate( (HistoricalDimensionSelector) params.getDimSelector(), aggregator.getSelector(), aggregator, @@ -356,10 +354,11 @@ private static void scanAndAggregateHistorical1SimpleDoubleAgg( positions, params.getResultsBuf() ); - specializationState.accountLoopIterations(scannedRows); + specializationState.accountLoopIterations(processedRows); + return processedRows; } - private static void scanAndAggregateGeneric1Agg( + private static long scanAndAggregateGeneric1Agg( PooledTopNParams params, int[] positions, BufferAggregator aggregator, @@ -371,7 +370,7 @@ private static void scanAndAggregateGeneric1Agg( SpecializationState specializationState = SpecializationService .getSpecializationState(prototypeClass, runtimeShape); Generic1AggPooledTopNScanner scanner = specializationState.getSpecializedOrDefault(defaultGeneric1AggScanner); - long scannedRows = scanner.scanAndAggregate( + long processedRows = scanner.scanAndAggregate( params.getDimSelector(), aggregator, params.getAggregatorSizes()[0], @@ -379,10 +378,11 @@ private static void scanAndAggregateGeneric1Agg( positions, params.getResultsBuf() ); - specializationState.accountLoopIterations(scannedRows); + specializationState.accountLoopIterations(processedRows); + return processedRows; } - private static void scanAndAggregateGeneric2Agg( + private static long scanAndAggregateGeneric2Agg( PooledTopNParams params, int[] positions, BufferAggregator[] theAggregators, @@ -395,7 +395,7 @@ private static void scanAndAggregateGeneric2Agg( .getSpecializationState(prototypeClass, runtimeShape); Generic2AggPooledTopNScanner scanner = specializationState.getSpecializedOrDefault(defaultGeneric2AggScanner); int[] aggregatorSizes = params.getAggregatorSizes(); - long scannedRows = scanner.scanAndAggregate( + long processedRows = scanner.scanAndAggregate( params.getDimSelector(), theAggregators[0], aggregatorSizes[0], @@ -405,7 +405,8 @@ private static void scanAndAggregateGeneric2Agg( positions, params.getResultsBuf() ); - specializationState.accountLoopIterations(scannedRows); + specializationState.accountLoopIterations(processedRows); + return processedRows; } /** @@ -426,7 +427,7 @@ private static void scanAndAggregateGeneric2Agg( * still optimizes the high quantity of aggregate queries which benefit greatly from any speed improvements * (they simply take longer to start with). */ - private static void scanAndAggregateDefault( + private static long scanAndAggregateDefault( final PooledTopNParams params, final int[] positions, final BufferAggregator[] theAggregators @@ -451,6 +452,7 @@ private static void scanAndAggregateDefault( final int aggSize = theAggregators.length; final int aggExtra = aggSize % AGG_UNROLL_COUNT; int currentPosition = 0; + long processedRows = 0; while (!cursor.isDoneOrInterrupted()) { final IndexedInts dimValues = dimSelector.getRow(); @@ -639,7 +641,9 @@ private static void scanAndAggregateDefault( ); } cursor.advanceUninterruptibly(); + processedRows++; } + return processedRows; } /** diff --git a/processing/src/main/java/io/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/TimeExtractionTopNAlgorithm.java index 4bad1c3e6ec8..9f07cf3a97a3 100644 --- a/processing/src/main/java/io/druid/query/topn/TimeExtractionTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/TimeExtractionTopNAlgorithm.java @@ -69,7 +69,7 @@ protected Map makeDimValAggregateStore(TopNParams params) } @Override - protected void scanAndAggregate( + protected long scanAndAggregate( TopNParams params, int[] dimValSelector, Map aggregatesStore, int numProcessed ) { @@ -80,6 +80,7 @@ protected void scanAndAggregate( final Cursor cursor = params.getCursor(); final DimensionSelector dimSelector = params.getDimSelector(); + long processedRows = 0; while (!cursor.isDone()) { final String key = dimSelector.lookupName(dimSelector.getRow().get(0)); @@ -94,7 +95,9 @@ protected void scanAndAggregate( } cursor.advance(); + processedRows++; } + return processedRows; } @Override diff --git a/processing/src/main/java/io/druid/query/topn/TopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/TopNAlgorithm.java index 03473091e1ae..4f0792a539a2 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/TopNAlgorithm.java @@ -24,6 +24,8 @@ import io.druid.query.topn.types.TopNColumnSelectorStrategy; import io.druid.segment.Cursor; +import javax.annotation.Nullable; + /** */ public interface TopNAlgorithm @@ -37,7 +39,8 @@ public interface TopNAlgorithm public void run( Parameters params, TopNResultBuilder resultBuilder, - DimValSelector dimValSelector + DimValSelector dimValSelector, + @Nullable TopNQueryMetrics queryMetrics ); public void cleanup(Parameters params); diff --git a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java index e49dec0a045b..b9516dd0c3b8 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java @@ -28,9 +28,10 @@ import io.druid.segment.DimensionHandlerUtils; import io.druid.segment.column.ValueType; +import javax.annotation.Nullable; import java.util.Objects; -public class TopNMapFn implements Function> +public class TopNMapFn { public static Function getValueTransformer(ValueType outputType) { @@ -89,9 +90,8 @@ public TopNMapFn( this.topNAlgorithm = topNAlgorithm; } - @Override @SuppressWarnings("unchecked") - public Result apply(Cursor cursor) + public Result apply(final Cursor cursor, final @Nullable TopNQueryMetrics queryMetrics) { final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus( STRATEGY_FACTORY, @@ -106,10 +106,14 @@ public Result apply(Cursor cursor) TopNParams params = null; try { params = topNAlgorithm.makeInitParams(selectorPlus, cursor); + if (queryMetrics != null) { + queryMetrics.columnValueSelector(selectorPlus.getSelector()); + queryMetrics.numValuesPerPass(params); + } TopNResultBuilder resultBuilder = BaseTopNAlgorithm.makeResultBuilder(params, query); - topNAlgorithm.run(params, resultBuilder, null); + topNAlgorithm.run(params, resultBuilder, null, queryMetrics); return resultBuilder.build(); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java index 0d4eff411cf0..a9640ec82be9 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java @@ -41,6 +41,7 @@ import io.druid.segment.filter.Filters; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.List; @@ -57,7 +58,11 @@ public TopNQueryEngine(StupidPool bufferPool) this.bufferPool = bufferPool; } - public Sequence> query(final TopNQuery query, final StorageAdapter adapter) + public Sequence> query( + final TopNQuery query, + final StorageAdapter adapter, + final @Nullable TopNQueryMetrics queryMetrics + ) { if (adapter == null) { throw new SegmentMissingException( @@ -68,7 +73,7 @@ public Sequence> query(final TopNQuery query, final Stor final List queryIntervals = query.getQuerySegmentSpec().getIntervals(); final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter())); final Granularity granularity = query.getGranularity(); - final Function> mapFn = getMapFn(query, adapter); + final TopNMapFn mapFn = getMapFn(query, adapter, queryMetrics); Preconditions.checkArgument( queryIntervals.size() == 1, "Can only handle a single interval, got[%s]", queryIntervals @@ -81,7 +86,8 @@ public Sequence> query(final TopNQuery query, final Stor queryIntervals.get(0), query.getVirtualColumns(), granularity, - query.isDescending() + query.isDescending(), + queryMetrics ), new Function>() { @@ -89,7 +95,10 @@ public Sequence> query(final TopNQuery query, final Stor public Result apply(Cursor input) { log.debug("Running over cursor[%s]", adapter.getInterval(), input.getTime()); - return mapFn.apply(input); + if (queryMetrics != null) { + queryMetrics.cursor(input); + } + return mapFn.apply(input, queryMetrics); } } ), @@ -97,11 +106,18 @@ public Result apply(Cursor input) ); } - private Function> getMapFn(TopNQuery query, final StorageAdapter adapter) + private TopNMapFn getMapFn( + final TopNQuery query, + final StorageAdapter adapter, + final @Nullable TopNQueryMetrics queryMetrics + ) { final Capabilities capabilities = adapter.getCapabilities(); final String dimension = query.getDimensionSpec().getDimension(); final int cardinality = adapter.getDimensionCardinality(dimension); + if (queryMetrics != null) { + queryMetrics.dimensionCardinality(cardinality); + } int numBytesPerRecord = 0; for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) { @@ -137,6 +153,9 @@ private Function> getMapFn(TopNQuery query, fina } else { topNAlgorithm = new PooledTopNAlgorithm(capabilities, query, bufferPool); } + if (queryMetrics != null) { + queryMetrics.algorithm(topNAlgorithm); + } return new TopNMapFn(query, topNAlgorithm); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryMetrics.java b/processing/src/main/java/io/druid/query/topn/TopNQueryMetrics.java index a3492f717429..aedb47d6eebd 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryMetrics.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryMetrics.java @@ -20,6 +20,10 @@ package io.druid.query.topn; import io.druid.query.QueryMetrics; +import io.druid.segment.ColumnValueSelector; +import io.druid.segment.Cursor; + +import java.util.List; /** * Specialization of {@link QueryMetrics} for {@link TopNQuery}. @@ -47,4 +51,42 @@ public interface TopNQueryMetrics extends QueryMetrics * metric is a metric of not long or double type, but it could be redefined in the implementation of this method. */ void numComplexMetrics(TopNQuery query); + + void dimensionCardinality(int cardinality); + + void algorithm(TopNAlgorithm algorithm); + + /** + * This method is called exactly once with each cursor, processed for the query. + */ + void cursor(Cursor cursor); + + /** + * This method is called exactly once with the columnValueSelector object of each cursor, processed for the query. + */ + void columnValueSelector(ColumnValueSelector columnValueSelector); + + /** + * This method may set {@link TopNParams#getNumValuesPerPass()} of the query as dimension. + */ + void numValuesPerPass(TopNParams params); + + /** + * Called with the number of rows, processed via each cursor, processed for the query within the segment. The total + * number of processed rows, reported via this method for a TopNQueryMetrics instance, is smaller or equal to + * {@link #reportPreFilteredRows(long)}, because {@link #postFilters(List)} are additionally applied. If there + * are no postFilters, preFilteredRows and processedRows are equal. + */ + TopNQueryMetrics addProcessedRows(long numRows); + + /** + * Calls to this method and {@link #stopRecordingScanTime()} wrap scanning of each cursor, processed for the + * query. + */ + void startRecordingScanTime(); + + /** + * Calls of {@link #startRecordingScanTime()} and this method wrap scanning of each cursor, processed for the query. + */ + TopNQueryMetrics stopRecordingScanTime(); } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java index 2e392f0b75e9..f28077bb6c29 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java @@ -73,7 +73,8 @@ public Sequence> run( throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TopNQuery.class); } - return queryEngine.query((TopNQuery) input.getQuery(), segment.asStorageAdapter()); + TopNQuery query = (TopNQuery) input.getQuery(); + return queryEngine.query(query, segment.asStorageAdapter(), (TopNQueryMetrics) input.getQueryMetrics()); } }; diff --git a/processing/src/main/java/io/druid/query/topn/types/FloatTopNColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/topn/types/FloatTopNColumnSelectorStrategy.java index e6aa27a6440d..cf972a251aa1 100644 --- a/processing/src/main/java/io/druid/query/topn/types/FloatTopNColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/topn/types/FloatTopNColumnSelectorStrategy.java @@ -62,7 +62,7 @@ public Int2ObjectMap makeDimExtractionAggregateStore() } @Override - public void dimExtractionScanAndAggregate( + public long dimExtractionScanAndAggregate( TopNQuery query, FloatColumnSelector selector, Cursor cursor, @@ -70,6 +70,7 @@ public void dimExtractionScanAndAggregate( Int2ObjectMap aggregatesStore ) { + long processedRows = 0; while (!cursor.isDone()) { int key = Float.floatToIntBits(selector.get()); Aggregator[] theAggregators = aggregatesStore.get(key); @@ -81,7 +82,9 @@ public void dimExtractionScanAndAggregate( aggregator.aggregate(); } cursor.advance(); + processedRows++; } + return processedRows; } @Override diff --git a/processing/src/main/java/io/druid/query/topn/types/LongTopNColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/topn/types/LongTopNColumnSelectorStrategy.java index e54fa2b5f5f3..ee9c99fc9b38 100644 --- a/processing/src/main/java/io/druid/query/topn/types/LongTopNColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/topn/types/LongTopNColumnSelectorStrategy.java @@ -62,7 +62,7 @@ public Long2ObjectMap makeDimExtractionAggregateStore() } @Override - public void dimExtractionScanAndAggregate( + public long dimExtractionScanAndAggregate( TopNQuery query, LongColumnSelector selector, Cursor cursor, @@ -70,6 +70,7 @@ public void dimExtractionScanAndAggregate( Long2ObjectMap aggregatesStore ) { + long processedRows = 0; while (!cursor.isDone()) { long key = selector.get(); Aggregator[] theAggregators = aggregatesStore.get(key); @@ -81,7 +82,9 @@ public void dimExtractionScanAndAggregate( aggregator.aggregate(); } cursor.advance(); + processedRows++; } + return processedRows; } @Override diff --git a/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java index a0f7864889a0..26bcfc05dfdf 100644 --- a/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java @@ -77,7 +77,7 @@ public Map makeDimExtractionAggregateStore() } @Override - public void dimExtractionScanAndAggregate( + public long dimExtractionScanAndAggregate( TopNQuery query, DimensionSelector selector, Cursor cursor, @@ -86,9 +86,9 @@ public void dimExtractionScanAndAggregate( ) { if (selector.getValueCardinality() != DimensionSelector.CARDINALITY_UNKNOWN) { - dimExtractionScanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector, aggregatesStore); + return dimExtractionScanAndAggregateWithCardinalityKnown(query, cursor, selector, rowSelector, aggregatesStore); } else { - dimExtractionScanAndAggregateWithCardinalityUnknown(query, cursor, selector, aggregatesStore); + return dimExtractionScanAndAggregateWithCardinalityUnknown(query, cursor, selector, aggregatesStore); } } @@ -121,7 +121,7 @@ public void updateDimExtractionResults( } } - private void dimExtractionScanAndAggregateWithCardinalityKnown( + private long dimExtractionScanAndAggregateWithCardinalityKnown( TopNQuery query, Cursor cursor, DimensionSelector selector, @@ -129,6 +129,7 @@ private void dimExtractionScanAndAggregateWithCardinalityKnown( Map aggregatesStore ) { + long processedRows = 0; while (!cursor.isDone()) { final IndexedInts dimValues = selector.getRow(); for (int i = 0; i < dimValues.size(); ++i) { @@ -149,16 +150,19 @@ private void dimExtractionScanAndAggregateWithCardinalityKnown( } } cursor.advance(); + processedRows++; } + return processedRows; } - private void dimExtractionScanAndAggregateWithCardinalityUnknown( + private long dimExtractionScanAndAggregateWithCardinalityUnknown( TopNQuery query, Cursor cursor, DimensionSelector selector, Map aggregatesStore ) { + long processedRows = 0; while (!cursor.isDone()) { final IndexedInts dimValues = selector.getRow(); for (int i = 0; i < dimValues.size(); ++i) { @@ -175,6 +179,8 @@ private void dimExtractionScanAndAggregateWithCardinalityUnknown( } } cursor.advance(); + processedRows++; } + return processedRows; } } diff --git a/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java index 1c022665becc..2ac610bd5613 100644 --- a/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java @@ -90,8 +90,9 @@ public interface TopNColumnSelectorStrategy * @param cursor Cursor for the segment being queried * @param rowSelector Integer lookup containing aggregators * @param aggregatesStore Map containing aggregators + * @return the number of processed rows (after postFilters are applied inside the cursor being processed) */ - void dimExtractionScanAndAggregate( + long dimExtractionScanAndAggregate( final TopNQuery query, ValueSelectorType selector, Cursor cursor, diff --git a/processing/src/main/java/io/druid/segment/CursorFactory.java b/processing/src/main/java/io/druid/segment/CursorFactory.java index 3394d34ad475..4ffb899b2853 100644 --- a/processing/src/main/java/io/druid/segment/CursorFactory.java +++ b/processing/src/main/java/io/druid/segment/CursorFactory.java @@ -21,9 +21,12 @@ import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; +import io.druid.query.QueryMetrics; import io.druid.query.filter.Filter; import org.joda.time.Interval; +import javax.annotation.Nullable; + /** */ public interface CursorFactory @@ -33,6 +36,7 @@ public Sequence makeCursors( Interval interval, VirtualColumns virtualColumns, Granularity gran, - boolean descending + boolean descending, + @Nullable QueryMetrics queryMetrics ); } diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index f94aa788c654..fac3004b6e0c 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -24,11 +24,15 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.io.Closer; import io.druid.query.BaseQuery; +import io.druid.query.BitmapResultFactory; +import io.druid.query.DefaultBitmapResultFactory; +import io.druid.query.QueryMetrics; import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.Filter; @@ -49,8 +53,10 @@ import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.Closeable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -193,7 +199,8 @@ public Sequence makeCursors( Interval interval, VirtualColumns virtualColumns, Granularity gran, - boolean descending + boolean descending, + @Nullable QueryMetrics queryMetrics ) { Interval actualInterval = interval; @@ -222,8 +229,9 @@ public Sequence makeCursors( index ); + final int totalRows = index.getNumRows(); - /** + /* * Filters can be applied in two stages: * pre-filtering: Use bitmap indexes to prune the set of rows to be scanned. * post-filtering: Iterate through rows and apply the filter to the row values @@ -238,11 +246,14 @@ public Sequence makeCursors( * Any subfilters that cannot be processed entirely with bitmap indexes will be moved to the post-filtering stage. */ final Offset offset; + final List preFilters; final List postFilters = new ArrayList<>(); + int preFilteredRows = totalRows; if (filter == null) { - offset = new NoFilterOffset(0, index.getNumRows(), descending); + preFilters = Collections.emptyList(); + offset = new NoFilterOffset(0, totalRows, descending); } else { - final List preFilters = new ArrayList<>(); + preFilters = new ArrayList<>(); if (filter instanceof AndFilter) { // If we get an AndFilter, we can split the subfilters across both filtering stages @@ -265,12 +276,23 @@ public Sequence makeCursors( if (preFilters.size() == 0) { offset = new NoFilterOffset(0, index.getNumRows(), descending); } else { - // Use AndFilter.getBitmapIndex to intersect the preFilters to get its short-circuiting behavior. - offset = BitmapOffset.of( - AndFilter.getBitmapIndex(selector, preFilters), - descending, - (long) getNumRows() - ); + if (queryMetrics != null) { + BitmapResultFactory bitmapResultFactory = + queryMetrics.makeBitmapResultFactory(selector.getBitmapFactory()); + long bitmapConstructionStartNs = System.nanoTime(); + // Use AndFilter.getBitmapResult to intersect the preFilters to get its short-circuiting behavior. + ImmutableBitmap bitmapIndex = AndFilter.getBitmapIndex(selector, bitmapResultFactory, preFilters); + preFilteredRows = bitmapIndex.size(); + offset = BitmapOffset.of(bitmapIndex, descending, totalRows); + queryMetrics.reportBitmapConstructionTime(System.nanoTime() - bitmapConstructionStartNs); + } else { + BitmapResultFactory bitmapResultFactory = new DefaultBitmapResultFactory(selector.getBitmapFactory()); + offset = BitmapOffset.of( + AndFilter.getBitmapIndex(selector, bitmapResultFactory, preFilters), + descending, + totalRows + ); + } } } @@ -283,6 +305,13 @@ public Sequence makeCursors( postFilter = new AndFilter(postFilters); } + if (queryMetrics != null) { + queryMetrics.preFilters(preFilters); + queryMetrics.postFilters(postFilters); + queryMetrics.reportSegmentRows(totalRows); + queryMetrics.reportPreFilteredRows(preFilteredRows); + } + return Sequences.filter( new CursorSequenceBuilder( this, diff --git a/processing/src/main/java/io/druid/segment/filter/AndFilter.java b/processing/src/main/java/io/druid/segment/filter/AndFilter.java index 5b993b263e6b..0c1405b7f3c1 100644 --- a/processing/src/main/java/io/druid/segment/filter/AndFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/AndFilter.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.query.BitmapResultFactory; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.BooleanFilter; import io.druid.query.filter.Filter; @@ -50,32 +51,45 @@ public AndFilter(List filters) this.filters = filters; } - public static ImmutableBitmap getBitmapIndex(BitmapIndexSelector selector, List filters) + public static ImmutableBitmap getBitmapIndex( + BitmapIndexSelector selector, + BitmapResultFactory bitmapResultFactory, + List filters + ) + { + return bitmapResultFactory.toImmutableBitmap(getBitmapResult(selector, bitmapResultFactory, filters)); + } + + private static T getBitmapResult( + BitmapIndexSelector selector, + BitmapResultFactory bitmapResultFactory, + List filters + ) { if (filters.size() == 1) { - return filters.get(0).getBitmapIndex(selector); + return filters.get(0).getBitmapResult(selector, bitmapResultFactory); } - final List bitmaps = Lists.newArrayListWithCapacity(filters.size()); + final List bitmapResults = Lists.newArrayListWithCapacity(filters.size()); for (final Filter filter : filters) { Preconditions.checkArgument(filter.supportsBitmapIndex(selector), "Filter[%s] does not support bitmap index", filter ); - final ImmutableBitmap bitmapIndex = filter.getBitmapIndex(selector); - if (bitmapIndex.isEmpty()) { + final T bitmapResult = filter.getBitmapResult(selector, bitmapResultFactory); + if (bitmapResultFactory.isEmpty(bitmapResult)) { // Short-circuit. - return Filters.allFalse(selector); + return bitmapResultFactory.wrapAllFalse(Filters.allFalse(selector)); } - bitmaps.add(bitmapIndex); + bitmapResults.add(bitmapResult); } - return selector.getBitmapFactory().intersection(bitmaps); + return bitmapResultFactory.intersection(bitmapResults); } @Override - public ImmutableBitmap getBitmapIndex(BitmapIndexSelector selector) + public T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) { - return getBitmapIndex(selector, filters); + return getBitmapResult(selector, bitmapResultFactory, filters); } @Override diff --git a/processing/src/main/java/io/druid/segment/filter/BoundFilter.java b/processing/src/main/java/io/druid/segment/filter/BoundFilter.java index b0c04419a495..d238e063b627 100644 --- a/processing/src/main/java/io/druid/segment/filter/BoundFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/BoundFilter.java @@ -23,6 +23,7 @@ import com.google.common.base.Supplier; import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.java.util.common.Pair; +import io.druid.query.BitmapResultFactory; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.BoundDimFilter; @@ -59,20 +60,25 @@ public BoundFilter(final BoundDimFilter boundDimFilter) } @Override - public ImmutableBitmap getBitmapIndex(final BitmapIndexSelector selector) + public T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) { if (supportShortCircuit()) { final BitmapIndex bitmapIndex = selector.getBitmapIndex(boundDimFilter.getDimension()); if (bitmapIndex == null || bitmapIndex.getCardinality() == 0) { - return doesMatch(null) ? Filters.allTrue(selector) : Filters.allFalse(selector); + if (doesMatch(null)) { + return bitmapResultFactory.wrapAllTrue(Filters.allTrue(selector)); + } else { + return bitmapResultFactory.wrapAllFalse(Filters.allFalse(selector)); + } } - return selector.getBitmapFactory().union(getBitmapIterator(boundDimFilter, bitmapIndex)); + return bitmapResultFactory.unionDimensionValueBitmaps(getBitmapIterator(boundDimFilter, bitmapIndex)); } else { return Filters.matchPredicate( boundDimFilter.getDimension(), selector, + bitmapResultFactory, getPredicateFactory().makeStringPredicate() ); } diff --git a/processing/src/main/java/io/druid/segment/filter/ColumnComparisonFilter.java b/processing/src/main/java/io/druid/segment/filter/ColumnComparisonFilter.java index f69fca8b9eab..13a39fe65c82 100644 --- a/processing/src/main/java/io/druid/segment/filter/ColumnComparisonFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/ColumnComparisonFilter.java @@ -20,7 +20,7 @@ package io.druid.segment.filter; import com.google.common.base.Preconditions; -import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.query.BitmapResultFactory; import io.druid.query.ColumnSelectorPlus; import io.druid.query.dimension.DimensionSpec; import io.druid.query.filter.BitmapIndexSelector; @@ -51,7 +51,7 @@ public ColumnComparisonFilter( } @Override - public ImmutableBitmap getBitmapIndex(final BitmapIndexSelector selector) + public T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) { throw new UnsupportedOperationException(); } diff --git a/processing/src/main/java/io/druid/segment/filter/DimensionPredicateFilter.java b/processing/src/main/java/io/druid/segment/filter/DimensionPredicateFilter.java index 81f5fc8d6e56..cde3d416a024 100644 --- a/processing/src/main/java/io/druid/segment/filter/DimensionPredicateFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/DimensionPredicateFilter.java @@ -21,7 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Predicate; -import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.query.BitmapResultFactory; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.DruidFloatPredicate; @@ -102,9 +102,9 @@ public boolean applyFloat(float input) } @Override - public ImmutableBitmap getBitmapIndex(final BitmapIndexSelector selector) + public T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) { - return Filters.matchPredicate(dimension, selector, predicateFactory.makeStringPredicate()); + return Filters.matchPredicate(dimension, selector, bitmapResultFactory, predicateFactory.makeStringPredicate()); } @Override diff --git a/processing/src/main/java/io/druid/segment/filter/Filters.java b/processing/src/main/java/io/druid/segment/filter/Filters.java index a8e8a34a74a1..ec83e9b3f516 100644 --- a/processing/src/main/java/io/druid/segment/filter/Filters.java +++ b/processing/src/main/java/io/druid/segment/filter/Filters.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.java.util.common.guava.FunctionalIterable; +import io.druid.query.BitmapResultFactory; import io.druid.query.ColumnSelectorPlus; import io.druid.query.Query; import io.druid.query.dimension.DefaultDimensionSpec; @@ -233,25 +234,26 @@ public void remove() * * @param dimension dimension to look at * @param selector bitmap selector + * @param bitmapResultFactory * @param predicate predicate to use - * * @return bitmap of matching rows * * @see #estimateSelectivity(String, BitmapIndexSelector, Predicate) */ - public static ImmutableBitmap matchPredicate( + public static T matchPredicate( final String dimension, final BitmapIndexSelector selector, + BitmapResultFactory bitmapResultFactory, final Predicate predicate ) { - return selector.getBitmapFactory().union(matchPredicateNoUnion(dimension, selector, predicate)); + return bitmapResultFactory.unionDimensionValueBitmaps(matchPredicateNoUnion(dimension, selector, predicate)); } /** * Return an iterable of bitmaps for all values matching a particular predicate. Unioning these bitmaps - * yields the same result that {@link #matchPredicate(String, BitmapIndexSelector, Predicate)} would have - * returned. + * yields the same result that {@link #matchPredicate(String, BitmapIndexSelector, BitmapResultFactory, Predicate)} + * would have returned. * * @param dimension dimension to look at * @param selector bitmap selector @@ -289,7 +291,7 @@ public static Iterable matchPredicateNoUnion( * * @return estimated selectivity * - * @see #matchPredicate(String, BitmapIndexSelector, Predicate) + * @see #matchPredicate(String, BitmapIndexSelector, BitmapResultFactory, Predicate) */ public static double estimateSelectivity( final String dimension, diff --git a/processing/src/main/java/io/druid/segment/filter/InFilter.java b/processing/src/main/java/io/druid/segment/filter/InFilter.java index 6a4019e2ce2d..e77e5583d58e 100644 --- a/processing/src/main/java/io/druid/segment/filter/InFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/InFilter.java @@ -23,6 +23,7 @@ import com.google.common.base.Strings; import com.google.common.base.Supplier; import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.query.BitmapResultFactory; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.DruidFloatPredicate; @@ -67,15 +68,16 @@ public InFilter( } @Override - public ImmutableBitmap getBitmapIndex(final BitmapIndexSelector selector) + public T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) { if (extractionFn == null) { final BitmapIndex bitmapIndex = selector.getBitmapIndex(dimension); - return selector.getBitmapFactory().union(getBitmapIterable(bitmapIndex)); + return bitmapResultFactory.unionDimensionValueBitmaps(getBitmapIterable(bitmapIndex)); } else { return Filters.matchPredicate( dimension, selector, + bitmapResultFactory, getPredicateFactory().makeStringPredicate() ); } diff --git a/processing/src/main/java/io/druid/segment/filter/JavaScriptFilter.java b/processing/src/main/java/io/druid/segment/filter/JavaScriptFilter.java index dbe56672e897..a2104f56515c 100644 --- a/processing/src/main/java/io/druid/segment/filter/JavaScriptFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/JavaScriptFilter.java @@ -20,7 +20,7 @@ package io.druid.segment.filter; import com.google.common.base.Predicate; -import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.query.BitmapResultFactory; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.Filter; import io.druid.query.filter.JavaScriptDimFilter; @@ -44,11 +44,11 @@ public JavaScriptFilter( } @Override - public ImmutableBitmap getBitmapIndex(final BitmapIndexSelector selector) + public T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) { final Context cx = Context.enter(); try { - return Filters.matchPredicate(dimension, selector, makeStringPredicate(cx)); + return Filters.matchPredicate(dimension, selector, bitmapResultFactory, makeStringPredicate(cx)); } finally { Context.exit(); diff --git a/processing/src/main/java/io/druid/segment/filter/LikeFilter.java b/processing/src/main/java/io/druid/segment/filter/LikeFilter.java index 8b47566e4294..8965de29638a 100644 --- a/processing/src/main/java/io/druid/segment/filter/LikeFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/LikeFilter.java @@ -22,6 +22,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.query.BitmapResultFactory; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.Filter; @@ -55,9 +56,9 @@ public LikeFilter( } @Override - public ImmutableBitmap getBitmapIndex(BitmapIndexSelector selector) + public T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) { - return selector.getBitmapFactory().union(getBitmapIterable(selector)); + return bitmapResultFactory.unionDimensionValueBitmaps(getBitmapIterable(selector)); } @Override diff --git a/processing/src/main/java/io/druid/segment/filter/NotFilter.java b/processing/src/main/java/io/druid/segment/filter/NotFilter.java index 35a365bc63ef..648258c9af1b 100644 --- a/processing/src/main/java/io/druid/segment/filter/NotFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/NotFilter.java @@ -19,7 +19,7 @@ package io.druid.segment.filter; -import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.query.BitmapResultFactory; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; @@ -41,10 +41,10 @@ public NotFilter( } @Override - public ImmutableBitmap getBitmapIndex(BitmapIndexSelector selector) + public T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) { - return selector.getBitmapFactory().complement( - baseFilter.getBitmapIndex(selector), + return bitmapResultFactory.complement( + baseFilter.getBitmapResult(selector, bitmapResultFactory), selector.getNumRows() ); } diff --git a/processing/src/main/java/io/druid/segment/filter/OrFilter.java b/processing/src/main/java/io/druid/segment/filter/OrFilter.java index 58a99f5331e0..216c9d0c6668 100644 --- a/processing/src/main/java/io/druid/segment/filter/OrFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/OrFilter.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.query.BitmapResultFactory; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.BooleanFilter; import io.druid.query.filter.Filter; @@ -51,18 +52,18 @@ public OrFilter(List filters) } @Override - public ImmutableBitmap getBitmapIndex(BitmapIndexSelector selector) + public T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) { if (filters.size() == 1) { - return filters.get(0).getBitmapIndex(selector); + return filters.get(0).getBitmapResult(selector, bitmapResultFactory); } - List bitmaps = Lists.newArrayList(); - for (int i = 0; i < filters.size(); i++) { - bitmaps.add(filters.get(i).getBitmapIndex(selector)); + List bitmapResults = Lists.newArrayList(); + for (Filter filter : filters) { + bitmapResults.add(filter.getBitmapResult(selector, bitmapResultFactory)); } - return selector.getBitmapFactory().union(bitmaps); + return bitmapResultFactory.union(bitmapResults); } @Override diff --git a/processing/src/main/java/io/druid/segment/filter/SelectorFilter.java b/processing/src/main/java/io/druid/segment/filter/SelectorFilter.java index 987921f7ff0b..eef0148f061a 100644 --- a/processing/src/main/java/io/druid/segment/filter/SelectorFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/SelectorFilter.java @@ -19,7 +19,7 @@ package io.druid.segment.filter; -import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.query.BitmapResultFactory; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; @@ -43,9 +43,9 @@ public SelectorFilter( } @Override - public ImmutableBitmap getBitmapIndex(BitmapIndexSelector selector) + public T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) { - return selector.getBitmapIndex(dimension, value); + return bitmapResultFactory.wrapDimensionValue(selector.getBitmapIndex(dimension, value)); } @Override diff --git a/processing/src/main/java/io/druid/segment/filter/SpatialFilter.java b/processing/src/main/java/io/druid/segment/filter/SpatialFilter.java index ba11a264be54..b07a20f39b66 100644 --- a/processing/src/main/java/io/druid/segment/filter/SpatialFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/SpatialFilter.java @@ -22,6 +22,7 @@ import com.google.common.base.Predicate; import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.collections.spatial.search.Bound; +import io.druid.query.BitmapResultFactory; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.DruidFloatPredicate; import io.druid.query.filter.DruidLongPredicate; @@ -49,10 +50,10 @@ public SpatialFilter( } @Override - public ImmutableBitmap getBitmapIndex(final BitmapIndexSelector selector) + public T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) { Iterable search = selector.getSpatialIndex(dimension).search(bound); - return selector.getBitmapFactory().union(search); + return bitmapResultFactory.unionDimensionValueBitmaps(search); } @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index aa194d8c00eb..3a19b4797bf1 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -27,6 +27,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.BaseQuery; +import io.druid.query.QueryMetrics; import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.Filter; @@ -194,7 +195,8 @@ public Sequence makeCursors( final Interval interval, final VirtualColumns virtualColumns, final Granularity gran, - final boolean descending + final boolean descending, + @Nullable QueryMetrics queryMetrics ) { if (index.isEmpty()) { diff --git a/processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java b/processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java index 061cc9604332..2c9b707e896f 100644 --- a/processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.common.guava.SettableSupplier; import io.druid.common.utils.JodaUtils; import io.druid.data.input.InputRow; @@ -34,6 +33,7 @@ import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.query.BitmapResultFactory; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory; @@ -304,7 +304,8 @@ private Sequence makeCursorSequence(final Filter filter) new Interval(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT), VIRTUAL_COLUMNS, Granularities.ALL, - false + false, + null ); } @@ -376,7 +377,7 @@ private List selectColumnValuesMatchingFilterUsingPostFiltering( final Filter postFilteringFilter = new Filter() { @Override - public ImmutableBitmap getBitmapIndex(BitmapIndexSelector selector) + public T getBitmapResult(BitmapIndexSelector selector, BitmapResultFactory bitmapResultFactory) { throw new UnsupportedOperationException(); } diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 7bec8797bd9a..902ec89be498 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -268,7 +268,8 @@ public void testResetSanity() throws IOException interval, VirtualColumns.EMPTY, Granularities.NONE, - descending + descending, + null ); Cursor cursor = Sequences.toList(Sequences.limit(cursorSequence, 1), Lists.newArrayList()).get(0); @@ -337,7 +338,8 @@ public ByteBuffer get() ) ) .build(), - new IncrementalIndexStorageAdapter(index) + new IncrementalIndexStorageAdapter(index), + null ), Lists.>newLinkedList() ); @@ -407,7 +409,12 @@ public void testCursoringAndIndexUpdationInterleaving() throws Exception final StorageAdapter sa = new IncrementalIndexStorageAdapter(index); Sequence cursors = sa.makeCursors( - null, new Interval(timestamp - 60_000, timestamp + 60_000), VirtualColumns.EMPTY, Granularities.ALL, false + null, + new Interval(timestamp - 60_000, timestamp + 60_000), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null ); final AtomicInteger assertCursorsNotEmpty = new AtomicInteger(0); @@ -485,7 +492,12 @@ public void testCursoringAndSnapshot() throws Exception final StorageAdapter sa = new IncrementalIndexStorageAdapter(index); Sequence cursors = sa.makeCursors( - null, new Interval(timestamp - 60_000, timestamp + 60_000), VirtualColumns.EMPTY, Granularities.ALL, false + null, + new Interval(timestamp - 60_000, timestamp + 60_000), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null ); final AtomicInteger assertCursorsNotEmpty = new AtomicInteger(0); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java index fe975c5ed779..8f3f2bc8c6d6 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java @@ -77,7 +77,8 @@ public Sequence apply(WindowedStorageAdapter adapter) adapter.getInterval(), VirtualColumns.EMPTY, Granularities.ALL, - false + false, + null ), new Function>() { @Nullable diff --git a/services/src/main/java/io/druid/cli/DumpSegment.java b/services/src/main/java/io/druid/cli/DumpSegment.java index 8901a4401909..58d7abf957db 100644 --- a/services/src/main/java/io/druid/cli/DumpSegment.java +++ b/services/src/main/java/io/druid/cli/DumpSegment.java @@ -254,7 +254,8 @@ private void runDump(final Injector injector, final QueryableIndex index) throws index.getDataInterval().withChronology(ISOChronology.getInstanceUTC()), VirtualColumns.EMPTY, Granularities.ALL, - false + false, + null ); withOutputStream(