Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
Expand Down Expand Up @@ -58,9 +59,9 @@ public <T> QueryRunner<T> decorate(final QueryRunner<T> delegate,
QueryToolChest<T, ? extends Query<T>> toolChest) {
return new QueryRunner<T>() {
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
return delegate.run(query, responseContext);
return delegate.run(queryPlus, responseContext);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.google.common.base.Function;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;

import java.util.Map;
Expand All @@ -47,12 +47,12 @@ public SerializingQueryRunner(

@Override
public Sequence<T> run(
final Query<T> query,
final QueryPlus<T> queryPlus,
final Map<String, Object> responseContext
)
{
return Sequences.map(
baseRunner.run(query, responseContext),
baseRunner.run(queryPlus, responseContext),
new Function<T, T>()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.YieldingAccumulator;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;

import java.io.IOException;
Expand All @@ -36,13 +37,15 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
private long count = 0;

public ScanQueryLimitRowIterator(
QueryRunner<ScanResultValue> baseRunner, ScanQuery query,
QueryRunner<ScanResultValue> baseRunner,
QueryPlus<ScanResultValue> queryPlus,
Map<String, Object> responseContext
)
{
ScanQuery query = (ScanQuery) queryPlus.getQuery();
resultFormat = query.getResultFormat();
limit = query.getLimit();
Sequence<ScanResultValue> baseSequence = baseRunner.run(query, responseContext);
Sequence<ScanResultValue> baseSequence = baseRunner.run(queryPlus, responseContext);
yielder = baseSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.MetricManipulationFn;
Expand Down Expand Up @@ -55,20 +56,20 @@ public QueryRunner<ScanResultValue> mergeResults(final QueryRunner<ScanResultVal
{
@Override
public Sequence<ScanResultValue> run(
final Query<ScanResultValue> query, final Map<String, Object> responseContext
final QueryPlus<ScanResultValue> queryPlus, final Map<String, Object> responseContext
)
{
ScanQuery scanQuery = (ScanQuery) query;
ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery();
if (scanQuery.getLimit() == Long.MAX_VALUE) {
return runner.run(query, responseContext);
return runner.run(queryPlus, responseContext);
}
return new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
{
@Override
public ScanQueryLimitRowIterator make()
{
return new ScanQueryLimitRowIterator(runner, (ScanQuery) query, responseContext);
return new ScanQueryLimitRowIterator(runner, queryPlus, responseContext);
}

@Override
Expand Down Expand Up @@ -109,14 +110,15 @@ public QueryRunner<ScanResultValue> preMergeQueryDecoration(final QueryRunner<Sc
{
@Override
public Sequence<ScanResultValue> run(
Query<ScanResultValue> query, Map<String, Object> responseContext
QueryPlus<ScanResultValue> queryPlus, Map<String, Object> responseContext
)
{
ScanQuery scanQuery = (ScanQuery) query;
ScanQuery scanQuery = (ScanQuery) queryPlus.getQuery();
if (scanQuery.getDimensionsFilter() != null) {
scanQuery = scanQuery.withDimFilter(scanQuery.getDimensionsFilter().optimize());
queryPlus = queryPlus.withQuery(scanQuery);
}
return runner.run(scanQuery, responseContext);
return runner.run(queryPlus, responseContext);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
Expand Down Expand Up @@ -70,12 +71,12 @@ public QueryRunner<ScanResultValue> mergeRunners(
{
@Override
public Sequence<ScanResultValue> run(
final Query<ScanResultValue> query, final Map<String, Object> responseContext
final QueryPlus<ScanResultValue> queryPlus, final Map<String, Object> responseContext
)
{
// Note: this variable is effective only when queryContext has a timeout.
// See the comment of CTX_TIMEOUT_AT.
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(query);
final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
return Sequences.concat(
Sequences.map(
Expand All @@ -85,7 +86,7 @@ public Sequence<ScanResultValue> run(
@Override
public Sequence<ScanResultValue> apply(final QueryRunner<ScanResultValue> input)
{
return input.run(query, responseContext);
return input.run(queryPlus, responseContext);
}
}
)
Expand Down Expand Up @@ -113,9 +114,10 @@ public ScanQueryRunner(ScanQueryEngine engine, Segment segment)

@Override
public Sequence<ScanResultValue> run(
Query<ScanResultValue> query, Map<String, Object> responseContext
QueryPlus<ScanResultValue> queryPlus, Map<String, Object> responseContext
)
{
Query<ScanResultValue> query = queryPlus.getQuery();
if (!(query instanceof ScanQuery)) {
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
Expand Down Expand Up @@ -214,15 +214,15 @@ public void testMergeResultsWithLimit()
new QueryRunner<ScanResultValue>() {
@Override
public Sequence<ScanResultValue> run(
Query<ScanResultValue> query, Map<String, Object> responseContext
QueryPlus<ScanResultValue> queryPlus, Map<String, Object> responseContext
)
{
// simulate results back from 2 historicals
List<Sequence<ScanResultValue>> sequences = Lists.newArrayListWithExpectedSize(2);
sequences.add(factory.createRunner(segment0).run(query, new HashMap<String, Object>()));
sequences.add(factory.createRunner(segment1).run(query, new HashMap<String, Object>()));
sequences.add(factory.createRunner(segment0).run(queryPlus, new HashMap<String, Object>()));
sequences.add(factory.createRunner(segment1).run(queryPlus, new HashMap<String, Object>()));
return new MergeSequence<>(
query.getResultOrdering(),
queryPlus.getQuery().getResultOrdering(),
Sequences.simple(sequences)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.druid.query.DruidMetrics;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
Expand Down Expand Up @@ -626,9 +627,9 @@ public <T> QueryRunner<T> getQueryRunner(Query<T> query)
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
return query.run(appenderator, responseContext);
return queryPlus.run(appenderator, responseContext);
}
};
}
Expand Down
5 changes: 3 additions & 2 deletions processing/src/main/java/io/druid/query/AsyncQueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ public AsyncQueryRunner(QueryRunner<T> baseRunner, ExecutorService executor, Que
}

@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
final Query<T> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);
final ListenableFuture<Sequence<T>> future = executor.submit(new AbstractPrioritizedCallable<Sequence<T>>(priority)
{
Expand All @@ -57,7 +58,7 @@ public Sequence<T> call() throws Exception
{
//Note: this is assumed that baseRunner does most of the work eagerly on call to the
//run() method and resulting sequence accumulate/yield is fast.
return baseRunner.run(query, responseContext);
return baseRunner.run(queryPlus, responseContext);
}
});
queryWatcher.registerQuery(query, future);
Expand Down
1 change: 1 addition & 0 deletions processing/src/main/java/io/druid/query/BaseQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context)
return run(querySegmentSpec.lookup(this, walker), context);
}

@Override
public Sequence<T> run(QueryRunner<T> runner, Map<String, Object> context)
{
return runner.run(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public BySegmentQueryRunner(

@Override
@SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
if (QueryContexts.isBySegment(query)) {
final Sequence<T> baseSequence = base.run(query, responseContext);
if (QueryContexts.isBySegment(queryPlus.getQuery())) {
final Sequence<T> baseSequence = base.run(queryPlus, responseContext);
final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
return Sequences.simple(
Arrays.asList(
Expand All @@ -61,12 +61,12 @@ public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext
new BySegmentResultValueClass<T>(
results,
segmentIdentifier,
query.getIntervals().get(0)
queryPlus.getQuery().getIntervals().get(0)
)
)
)
);
}
return base.run(query, responseContext);
return base.run(queryPlus, responseContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ public BySegmentSkippingQueryRunner(
}

@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
if (QueryContexts.isBySegment(query)) {
return baseRunner.run(query, responseContext);
if (QueryContexts.isBySegment(queryPlus.getQuery())) {
return baseRunner.run(queryPlus, responseContext);
}

return doRun(baseRunner, query, responseContext);
return doRun(baseRunner, queryPlus, responseContext);
}

protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context);
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, QueryPlus<T> queryPlus, Map<String, Object> context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ private CPUTimeMetricQueryRunner(


@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
final Sequence<T> baseSequence = delegate.run(query, responseContext);
final QueryPlus<T> queryWithMetrics =
queryPlus.withQueryMetrics((QueryToolChest<T, ? extends Query<T>>) queryToolChest);
final Sequence<T> baseSequence = delegate.run(queryWithMetrics, responseContext);
return Sequences.wrap(
baseSequence,
new SequenceWrapper()
Expand All @@ -82,13 +84,14 @@ public void after(boolean isDone, Throwable thrown) throws Exception
if (report) {
final long cpuTimeNs = cpuTimeAccumulator.get();
if (cpuTimeNs > 0) {
queryToolChest.makeMetrics(query).reportCpuTime(cpuTimeNs).emit(emitter);
queryWithMetrics.getQueryMetrics().reportCpuTime(cpuTimeNs).emit(emitter);
}
}
}
}
);
}

public static <T> QueryRunner<T> safeBuild(
QueryRunner<T> delegate,
QueryToolChest<?, ? super Query<T>> queryToolChest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ public ChainedExecutionQueryRunner(
}

@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
Query<T> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);
final Ordering ordering = query.getResultOrdering();

Expand Down Expand Up @@ -121,7 +122,7 @@ public ListenableFuture<Iterable<T>> apply(final QueryRunner<T> input)
public Iterable<T> call() throws Exception
{
try {
Sequence<T> result = input.run(query, responseContext);
Sequence<T> result = input.run(queryPlus, responseContext);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public ConcatQueryRunner(
}

@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
return Sequences.concat(
Sequences.map(
Expand All @@ -48,7 +48,7 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> responseC
@Override
public Sequence<T> apply(final QueryRunner<T> input)
{
return input.run(query, responseContext);
return input.run(queryPlus, responseContext);
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ public FinalizeResultsQueryRunner(
}

@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
public Sequence<T> run(final QueryPlus<T> queryPlus, Map<String, Object> responseContext)
{
final Query<T> query = queryPlus.getQuery();
final boolean isBySegment = QueryContexts.isBySegment(query);
final boolean shouldFinalize = QueryContexts.isFinalize(query, true);

Expand Down Expand Up @@ -100,7 +101,7 @@ public T apply(T input)


return Sequences.map(
baseRunner.run(queryToRun, responseContext),
baseRunner.run(queryPlus.withQuery(queryToRun), responseContext),
finalizerFn
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public FluentQueryRunner(QueryRunner<T> runner)

@Override
public Sequence<T> run(
Query<T> query, Map<String, Object> responseContext
QueryPlus<T> queryPlus, Map<String, Object> responseContext
)
{
return baseRunner.run(query, responseContext);
return baseRunner.run(queryPlus, responseContext);
}

public FluentQueryRunner from(QueryRunner<T> runner) {
Expand Down
Loading