From 8b64ce6ec9e083632bce7bf9fcc365b1d857a41b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 30 Mar 2017 18:57:31 +0900 Subject: [PATCH 1/9] Make timeout behavior consistent to document --- .../io/druid/query/scan/ScanQueryEngine.java | 14 +- .../query/scan/ScanQueryRunnerFactory.java | 6 +- .../java/io/druid/query/AsyncQueryRunner.java | 8 +- .../main/java/io/druid/query/BaseQuery.java | 63 +-------- .../io/druid/query/BySegmentQueryRunner.java | 2 +- .../query/BySegmentSkippingQueryRunner.java | 2 +- .../query/ChainedExecutionQueryRunner.java | 8 +- .../query/FinalizeResultsQueryRunner.java | 4 +- .../druid/query/GroupByMergedQueryRunner.java | 10 +- .../query/IntervalChunkingQueryRunner.java | 2 +- .../java/io/druid/query/QueryContextKeys.java | 27 ---- .../java/io/druid/query/QueryContexts.java | 126 ++++++++++++++++++ .../groupby/GroupByQueryQueryToolChest.java | 6 +- .../GroupByMergingQueryRunnerV2.java | 33 ++--- .../groupby/strategy/GroupByStrategyV2.java | 7 +- .../SegmentMetadataQueryRunnerFactory.java | 11 +- .../search/SearchQueryQueryToolChest.java | 4 +- .../query/topn/TopNQueryQueryToolChest.java | 4 +- .../io/druid/query/AsyncQueryRunnerTest.java | 2 +- .../ChainedExecutionQueryRunnerTest.java | 2 +- .../DataSourceMetadataQueryTest.java | 3 +- .../groupby/GroupByQueryMergeBufferTest.java | 10 +- .../GroupByQueryRunnerFailureTest.java | 8 +- .../query/groupby/GroupByQueryRunnerTest.java | 5 +- .../timeboundary/TimeBoundaryQueryTest.java | 5 +- .../main/java/io/druid/client/CacheUtil.java | 6 +- .../druid/client/CachingClusteredClient.java | 10 +- .../io/druid/client/DirectDruidClient.java | 4 +- .../java/io/druid/server/QueryResource.java | 16 +-- .../PriorityTieredBrokerSelectorStrategy.java | 4 +- 30 files changed, 227 insertions(+), 185 deletions(-) delete mode 100644 processing/src/main/java/io/druid/query/QueryContextKeys.java create mode 100644 processing/src/main/java/io/druid/query/QueryContexts.java diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 139cc6d0ad0e..57385f50c116 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -28,6 +28,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.ColumnSelectorPlus; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; @@ -65,6 +66,7 @@ public Sequence process( return Sequences.empty(); } } + final boolean hasTimeout = QueryContexts.hasTimeout(query); final Long timeoutAt = (long) responseContext.get(ScanQueryRunnerFactory.CTX_TIMEOUT_AT); final long start = System.currentTimeMillis(); final StorageAdapter adapter = segment.asStorageAdapter(); @@ -156,7 +158,7 @@ public boolean hasNext() @Override public ScanResultValue next() { - if (System.currentTimeMillis() >= timeoutAt) { + if (hasTimeout && System.currentTimeMillis() >= timeoutAt) { throw new QueryInterruptedException(new TimeoutException()); } long lastOffset = offset; @@ -173,10 +175,12 @@ public ScanResultValue next() ScanQueryRunnerFactory.CTX_COUNT, (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset) ); - responseContext.put( - ScanQueryRunnerFactory.CTX_TIMEOUT_AT, - timeoutAt - (System.currentTimeMillis() - start) - ); + if (hasTimeout) { + responseContext.put( + ScanQueryRunnerFactory.CTX_TIMEOUT_AT, + timeoutAt - (System.currentTimeMillis() - start) + ); + } return new ScanResultValue(segmentId, allColumns, events); } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index 712249eac5b7..6f57403c1de5 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -25,7 +25,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -71,9 +71,7 @@ public Sequence run( final Query query, final Map responseContext ) { - final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); - final long timeoutAt = (queryTimeout == null || queryTimeout.longValue() == 0L) - ? JodaUtils.MAX_INSTANT : System.currentTimeMillis() + queryTimeout.longValue(); + final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(query); responseContext.put(CTX_TIMEOUT_AT, timeoutAt); return Sequences.concat( Sequences.map( diff --git a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java index 2f6e84c8bc69..f36461f096cd 100644 --- a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java +++ b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java @@ -49,7 +49,7 @@ public AsyncQueryRunner(QueryRunner baseRunner, ExecutorService executor, Que @Override public Sequence run(final Query query, final Map responseContext) { - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query, 0); final ListenableFuture> future = executor.submit(new AbstractPrioritizedCallable>(priority) { @Override @@ -68,11 +68,11 @@ public Sequence call() throws Exception public Sequence get() { try { - Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT); - if (timeout == null) { + final long timeout = QueryContexts.getTimeout(query); + if (timeout == 0) { return future.get(); } else { - return future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + return future.get(timeout, TimeUnit.MILLISECONDS); } } catch (ExecutionException | InterruptedException | TimeoutException ex) { throw Throwables.propagate(ex); diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 22d0fb1a4ba5..0186e00075ab 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; -import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.query.spec.QuerySegmentSpec; import org.joda.time.Duration; @@ -37,66 +36,6 @@ */ public abstract class BaseQuery> implements Query { - public static int getContextPriority(Query query, int defaultValue) - { - return parseInt(query, "priority", defaultValue); - } - - public static boolean getContextBySegment(Query query, boolean defaultValue) - { - return parseBoolean(query, "bySegment", defaultValue); - } - - public static boolean getContextPopulateCache(Query query, boolean defaultValue) - { - return parseBoolean(query, "populateCache", defaultValue); - } - - public static boolean getContextUseCache(Query query, boolean defaultValue) - { - return parseBoolean(query, "useCache", defaultValue); - } - - public static boolean getContextFinalize(Query query, boolean defaultValue) - { - return parseBoolean(query, "finalize", defaultValue); - } - - public static int getContextUncoveredIntervalsLimit(Query query, int defaultValue) - { - return parseInt(query, "uncoveredIntervalsLimit", defaultValue); - } - - private static int parseInt(Query query, String key, int defaultValue) - { - Object val = query.getContextValue(key); - if (val == null) { - return defaultValue; - } - if (val instanceof String) { - return Integer.parseInt((String) val); - } else if (val instanceof Integer) { - return (int) val; - } else { - throw new ISE("Unknown type [%s]", val.getClass()); - } - } - - private static boolean parseBoolean(Query query, String key, boolean defaultValue) - { - Object val = query.getContextValue(key); - if (val == null) { - return defaultValue; - } - if (val instanceof String) { - return Boolean.parseBoolean((String) val); - } else if (val instanceof Boolean) { - return (boolean) val; - } else { - throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass()); - } - } - public static void checkInterrupted() { if (Thread.interrupted()) { @@ -203,7 +142,7 @@ public ContextType getContextValue(String key, ContextType default @Override public boolean getContextBoolean(String key, boolean defaultValue) { - return parseBoolean(this, key, defaultValue); + return QueryContexts.parseBoolean(this, key, defaultValue); } protected Map computeOverridenContext(Map overrides) diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java index a2a8a960b4f8..1ac46cf4d0c0 100644 --- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java @@ -51,7 +51,7 @@ public BySegmentQueryRunner( @SuppressWarnings("unchecked") public Sequence run(final Query query, Map responseContext) { - if (BaseQuery.getContextBySegment(query, false)) { + if (QueryContexts.isBySegment(query, false)) { final Sequence baseSequence = base.run(query, responseContext); final List results = Sequences.toList(baseSequence, Lists.newArrayList()); return Sequences.simple( diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java index c49ced5aa46c..8c5e987ed89f 100644 --- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java @@ -40,7 +40,7 @@ public BySegmentSkippingQueryRunner( @Override public Sequence run(Query query, Map responseContext) { - if (BaseQuery.getContextBySegment(query, false)) { + if (QueryContexts.isBySegment(query, false)) { return baseRunner.run(query, responseContext); } diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 45363cd3a5c7..0ec9120a99d2 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -91,7 +91,7 @@ public ChainedExecutionQueryRunner( @Override public Sequence run(final Query query, final Map responseContext) { - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query, 0); final Ordering ordering = query.getResultOrdering(); return new BaseSequence>( @@ -152,12 +152,12 @@ public Iterable call() throws Exception queryWatcher.registerQuery(query, futures); try { - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, (Number) null); + final long timeout = QueryContexts.getTimeout(query); return new MergeIterable<>( ordering.nullsFirst(), - timeout == null ? + timeout == 0 ? futures.get() : - futures.get(timeout.longValue(), TimeUnit.MILLISECONDS) + futures.get(timeout, TimeUnit.MILLISECONDS) ).iterator(); } catch (InterruptedException e) { diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 1035a4ed751c..f76b5e6434a5 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -49,8 +49,8 @@ public FinalizeResultsQueryRunner( @Override public Sequence run(final Query query, Map responseContext) { - final boolean isBySegment = BaseQuery.getContextBySegment(query, false); - final boolean shouldFinalize = BaseQuery.getContextFinalize(query, true); + final boolean isBySegment = QueryContexts.isBySegment(query, false); + final boolean shouldFinalize = QueryContexts.isFinalize(query, true); final Query queryToRun; final Function finalizerFn; diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index d416589f199a..4b8cf9794f41 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -90,8 +90,8 @@ public Sequence run(final Query queryParam, final Map resp true ); final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); - final boolean bySegment = BaseQuery.getContextBySegment(query, false); - final int priority = BaseQuery.getContextPriority(query, 0); + final boolean bySegment = QueryContexts.isBySegment(query, false); + final int priority = QueryContexts.getPriority(query, 0); ListenableFuture> futures = Futures.allAsList( Lists.newArrayList( @@ -178,11 +178,11 @@ private void waitForFutureCompletion( { try { queryWatcher.registerQuery(query, future); - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, (Number) null); - if (timeout == null) { + final long timeout = QueryContexts.getTimeout(query); + if (timeout == 0) { future.get(); } else { - future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + future.get(timeout, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { diff --git a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java index a42ce69fde0e..5e8b529b1535 100644 --- a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java @@ -148,7 +148,7 @@ private Iterable splitInterval(Interval interval, Period period) private Period getChunkPeriod(Query query) { - String p = query.getContextValue(QueryContextKeys.CHUNK_PERIOD, "P0D"); + final String p = QueryContexts.getChunkPeriod(query); return Period.parse(p); } } diff --git a/processing/src/main/java/io/druid/query/QueryContextKeys.java b/processing/src/main/java/io/druid/query/QueryContextKeys.java deleted file mode 100644 index 480dcd551f4f..000000000000 --- a/processing/src/main/java/io/druid/query/QueryContextKeys.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.query; - -public class QueryContextKeys -{ - public static final String PRIORITY = "priority"; - public static final String TIMEOUT = "timeout"; - public static final String CHUNK_PERIOD = "chunkPeriod"; -} diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java new file mode 100644 index 000000000000..e18a160c1a6b --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -0,0 +1,126 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import io.druid.java.util.common.ISE; + +public class QueryContexts +{ + public static final String PRIORITY = "priority"; + public static final String TIMEOUT = "timeout"; + public static final String CHUNK_PERIOD = "chunkPeriod"; + + public static final long DEFAULT_TIMEOUT = 0; + + public static boolean isBySegment(Query query, boolean defaultValue) + { + return parseBoolean(query, "bySegment", defaultValue); + } + + public static boolean isPopulateCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "populateCache", defaultValue); + } + + public static boolean isUseCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "useCache", defaultValue); + } + + public static boolean isFinalize(Query query, boolean defaultValue) + { + return parseBoolean(query, "finalize", defaultValue); + } + + public static int getUncoveredIntervalsLimit(Query query, int defaultValue) + { + return parseInt(query, "uncoveredIntervalsLimit", defaultValue); + } + + public static int getPriority(Query query, int defaultValue) + { + return parseInt(query, PRIORITY, defaultValue); + } + + public static String getChunkPeriod(Query query) + { + return query.getContextValue(CHUNK_PERIOD, "P0D"); + } + + public static boolean hasTimeout(Query query) + { + return getTimeout(query) != 0; + } + + public static long getTimeout(Query query) + { + return getTimeout(query, DEFAULT_TIMEOUT); + } + + public static long getTimeout(Query query, long defaultValue) + { + return parseLong(query, TIMEOUT, defaultValue); + } + + static long parseLong(Query query, String key, long defaultValue) + { + Object val = query.getContextValue(key); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Long.parseLong((String) val); + } else if (val instanceof Number) { + return ((Number) val).longValue(); + } else { + throw new ISE("Unknown type [%s]", val.getClass()); + } + } + + static int parseInt(Query query, String key, int defaultValue) + { + Object val = query.getContextValue(key); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Integer.parseInt((String) val); + } else if (val instanceof Number) { + return ((Number) val).intValue(); + } else { + throw new ISE("Unknown type [%s]", val.getClass()); + } + } + + static boolean parseBoolean(Query query, String key, boolean defaultValue) + { + Object val = query.getContextValue(key); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Boolean.parseBoolean((String) val); + } else if (val instanceof Boolean) { + return (boolean) val; + } else { + throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass()); + } + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 85f4cf559adf..b2fab6ff5c04 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -38,11 +38,11 @@ import io.druid.java.util.common.guava.MappedSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; -import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.DataSource; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; @@ -115,7 +115,7 @@ public QueryRunner mergeResults(final QueryRunner runner) @Override public Sequence run(Query query, Map responseContext) { - if (BaseQuery.getContextBySegment(query, false)) { + if (QueryContexts.isBySegment(query, false)) { return runner.run(query, responseContext); } @@ -204,7 +204,7 @@ private Sequence mergeGroupByResults( ); final Sequence finalizingResults; - if (GroupByQuery.getContextFinalize(subquery, false)) { + if (QueryContexts.isFinalize(subquery, false)) { finalizingResults = new MappedSequence<>( subqueryResult, makePreComputeManipulatorFn( diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 2bdb9d354430..94e0f9d9004b 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -35,7 +35,6 @@ import io.druid.collections.BlockingPool; import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.Releaser; -import io.druid.common.utils.JodaUtils; import io.druid.data.input.Row; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; @@ -45,10 +44,9 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.logger.Logger; import io.druid.query.AbstractPrioritizedCallable; -import io.druid.query.BaseQuery; import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; @@ -125,7 +123,7 @@ public Sequence run(final Query queryParam, final Map responseContext) ImmutableMap.of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true) ); - if (BaseQuery.getContextBySegment(query, false) || forceChainedExecution) { + if (QueryContexts.isBySegment(query, false) || forceChainedExecution) { return new ChainedExecutionQueryRunner(exec, queryWatcher, queryables).run(query, responseContext); } @@ -141,14 +139,13 @@ public Sequence run(final Query queryParam, final Map responseContext) String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) ); - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query, 0); // Figure out timeoutAt time now, so we can apply the timeout to both the mergeBufferPool.take and the actual // query processing together. - final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); - final long timeoutAt = queryTimeout == null - ? JodaUtils.MAX_INSTANT - : System.currentTimeMillis() + queryTimeout.longValue(); + final long queryTimeout = QueryContexts.getTimeout(query); + final boolean hasTimeout = QueryContexts.hasTimeout(query); + final long timeoutAt = System.currentTimeMillis() + queryTimeout; return new BaseSequence<>( new BaseSequence.IteratorMaker>() @@ -170,9 +167,13 @@ public CloseableGrouperIterator make() final ReferenceCountingResourceHolder mergeBufferHolder; try { // This will potentially block if there are no merge buffers left in the pool. - final long timeout = timeoutAt - System.currentTimeMillis(); - if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { - throw new TimeoutException(); + if (hasTimeout) { + final long timeout = timeoutAt - System.currentTimeMillis(); + if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { + throw new TimeoutException(); + } + } else { + mergeBufferHolder = mergeBufferPool.take(-1); } resources.add(mergeBufferHolder); } @@ -245,6 +246,7 @@ public Boolean call() throws Exception waitForFutureCompletion( query, Futures.allAsList(ImmutableList.of(future)), + hasTimeout, timeoutAt - System.currentTimeMillis() ); } @@ -257,7 +259,7 @@ public Boolean call() throws Exception ); if (!isSingleThreaded) { - waitForFutureCompletion(query, futures, timeoutAt - System.currentTimeMillis()); + waitForFutureCompletion(query, futures, hasTimeout, timeoutAt - System.currentTimeMillis()); } return RowBasedGrouperHelper.makeGrouperIterator( @@ -296,6 +298,7 @@ public void cleanup(CloseableGrouperIterator iterFromMake) private void waitForFutureCompletion( GroupByQuery query, ListenableFuture> future, + boolean hasTimeout, long timeout ) { @@ -304,11 +307,11 @@ private void waitForFutureCompletion( queryWatcher.registerQuery(query, future); } - if (timeout <= 0) { + if (hasTimeout && timeout <= 0) { throw new TimeoutException(); } - final List results = future.get(timeout, TimeUnit.MILLISECONDS); + final List results = hasTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get(); for (Boolean result : results) { if (!result) { diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 45dbd954d1c7..0c28b95aef9d 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -31,7 +31,6 @@ import io.druid.collections.BlockingPool; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidPool; -import io.druid.common.utils.JodaUtils; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.guice.annotations.Global; @@ -47,7 +46,7 @@ import io.druid.query.InsufficientResourcesException; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; @@ -141,9 +140,9 @@ public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMerg } else if (requiredMergeBufferNum == 0) { return new GroupByQueryResource(); } else { - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT); + final long timeout = QueryContexts.getTimeout(query); final ResourceHolder> mergeBufferHolders = mergeBufferPool.takeBatch( - requiredMergeBufferNum, timeout.longValue() + requiredMergeBufferNum, timeout ); if (mergeBufferHolders == null) { throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 5620219f8b0a..1097e65c5495 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -32,10 +32,9 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; import io.druid.query.AbstractPrioritizedCallable; -import io.druid.query.BaseQuery; import io.druid.query.ConcatQueryRunner; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; @@ -202,7 +201,7 @@ public Sequence run( final Map responseContext ) { - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query, 0); ListenableFuture> future = queryExecutor.submit( new AbstractPrioritizedCallable>(priority) { @@ -210,15 +209,15 @@ public Sequence run( public Sequence call() throws Exception { return Sequences.simple( - Sequences.toList(input.run(query, responseContext), new ArrayList()) + Sequences.toList(input.run(query, responseContext), new ArrayList<>()) ); } } ); try { queryWatcher.registerQuery(query, future); - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, (Number) null); - return timeout == null ? future.get() : future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + final long timeout = QueryContexts.getTimeout(query); + return timeout == 0 ? future.get() : future.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index f5ab114f6873..6f74a0572e09 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -35,12 +35,12 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.nary.BinaryFn; -import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; @@ -391,7 +391,7 @@ public Sequence> run( return runner.run(query, responseContext); } - final boolean isBySegment = BaseQuery.getContextBySegment(query, false); + final boolean isBySegment = QueryContexts.isBySegment(query, false); return Sequences.map( runner.run(query.withLimit(config.getMaxSearchLimit()), responseContext), diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index a7826dbdb9d9..9b51cf13c1d5 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -32,11 +32,11 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.nary.BinaryFn; -import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValue; import io.druid.query.CacheStrategy; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -535,7 +535,7 @@ public Sequence> run( return runner.run(query, responseContext); } - final boolean isBySegment = BaseQuery.getContextBySegment(query, false); + final boolean isBySegment = QueryContexts.isBySegment(query, false); return Sequences.map( runner.run(query.withThreshold(minTopNThreshold), responseContext), diff --git a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java index 1e3d931b5caf..1bb1ceb8993a 100644 --- a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java @@ -100,7 +100,7 @@ public Sequence run(Query query, Map responseContext) QueryRunnerTestHelper.NOOP_QUERYWATCHER); Sequence lazy = asyncRunner.run( - query.withOverriddenContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, 1)), + query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT, 1)), Collections.EMPTY_MAP); try { diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index 67918aef4819..5c72e937601d 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -251,7 +251,7 @@ public Void answer() throws Throwable .dataSource("test") .intervals("2014/2015") .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) - .context(ImmutableMap.of(QueryContextKeys.TIMEOUT, 100, "queryId", "test")) + .context(ImmutableMap.of(QueryContexts.TIMEOUT, 100, "queryId", "test")) .build(), context ); diff --git a/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java b/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java index e05caa4336d2..84ef7bd1e8f1 100644 --- a/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java @@ -33,6 +33,7 @@ import io.druid.query.Druids; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -101,7 +102,7 @@ public void testContextSerde() throws Exception ), Query.class ); - Assert.assertEquals(1, serdeQuery.getContextValue("priority")); + Assert.assertEquals(1, serdeQuery.getContextValue(QueryContexts.PRIORITY)); Assert.assertEquals(true, serdeQuery.getContextValue("useCache")); Assert.assertEquals("true", serdeQuery.getContextValue("populateCache")); Assert.assertEquals(true, serdeQuery.getContextValue("finalize")); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index 460c9d8c8519..066e22d73e01 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -32,7 +32,7 @@ import io.druid.data.input.Row; import io.druid.java.util.common.granularity.Granularities; import io.druid.query.DruidProcessingConfig; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; @@ -230,7 +230,7 @@ public void testSimpleGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -258,7 +258,7 @@ public void testNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -297,7 +297,7 @@ public void testDoubleNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -349,7 +349,7 @@ public void testTripleNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java index a2f6f6b8d770..029d640d7637 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -33,7 +33,7 @@ import io.druid.java.util.common.granularity.Granularities; import io.druid.query.DruidProcessingConfig; import io.druid.query.InsufficientResourcesException; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; @@ -201,7 +201,7 @@ public void testNotEnoughMergeBuffersOnQueryable() throws IOException .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -239,7 +239,7 @@ public void testResourceLimitExceededOnBroker() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -265,7 +265,7 @@ public void testInsufficientResourcesOnBroker() throws IOException .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) .build(); try { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 07a4b876c67b..a8320c522de8 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -52,6 +52,7 @@ import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; @@ -1213,7 +1214,7 @@ public void testGroupByTimeoutContextOverride() ) ) .setGranularity(QueryRunnerTestHelper.dayGran) - .setContext(ImmutableMap.of("timeout", Integer.valueOf(60000))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integer.valueOf(60000))) .build(); List expectedResults = Arrays.asList( @@ -5316,7 +5317,7 @@ public void testSubqueryWithContextTimeout() .setDimensions(Lists.newArrayList()) .setAggregatorSpecs(ImmutableList.of(new CountAggregatorFactory("count"))) .setGranularity(QueryRunnerTestHelper.allGran) - .setContext(ImmutableMap.of("timeout", 10000)) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, 10000)) .build(); List expectedResults = Arrays.asList( diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java index 1c332fb23dc6..0e8eef0dc401 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java @@ -24,6 +24,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Druids; import io.druid.query.Query; +import io.druid.query.QueryContexts; import org.junit.Assert; import org.junit.Test; @@ -78,7 +79,7 @@ public void testContextSerde() throws Exception ); - Assert.assertEquals(new Integer(1), serdeQuery.getContextValue("priority")); + Assert.assertEquals(new Integer(1), serdeQuery.getContextValue(QueryContexts.PRIORITY)); Assert.assertEquals(true, serdeQuery.getContextValue("useCache")); Assert.assertEquals(true, serdeQuery.getContextValue("populateCache")); Assert.assertEquals(true, serdeQuery.getContextValue("finalize")); @@ -116,7 +117,7 @@ public void testContextSerde2() throws Exception ); - Assert.assertEquals("1", serdeQuery.getContextValue("priority")); + Assert.assertEquals("1", serdeQuery.getContextValue(QueryContexts.PRIORITY)); Assert.assertEquals("true", serdeQuery.getContextValue("useCache")); Assert.assertEquals("true", serdeQuery.getContextValue("populateCache")); Assert.assertEquals("true", serdeQuery.getContextValue("finalize")); diff --git a/server/src/main/java/io/druid/client/CacheUtil.java b/server/src/main/java/io/druid/client/CacheUtil.java index 6210f2c5ee37..636b39e06408 100644 --- a/server/src/main/java/io/druid/client/CacheUtil.java +++ b/server/src/main/java/io/druid/client/CacheUtil.java @@ -24,9 +24,9 @@ import com.google.common.collect.Lists; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; -import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.SegmentDescriptor; import org.joda.time.Interval; @@ -123,7 +123,7 @@ private static boolean useCache( CacheConfig cacheConfig ) { - return BaseQuery.getContextUseCache(query, true) + return QueryContexts.isUseCache(query, true) && strategy != null && cacheConfig.isUseCache() && cacheConfig.isQueryCacheable(query); @@ -135,7 +135,7 @@ private static boolean populateCache( CacheConfig cacheConfig ) { - return BaseQuery.getContextPopulateCache(query, true) + return QueryContexts.isPopulateCache(query, true) && strategy != null && cacheConfig.isPopulateCache() && cacheConfig.isQueryCacheable(query); diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index b15e912fd3f3..8acce31287ef 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -55,10 +55,10 @@ import io.druid.java.util.common.guava.MergeSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; -import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValueClass; import io.druid.query.CacheStrategy; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -151,12 +151,12 @@ public Sequence run(final Query query, final Map responseC final boolean useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig); final boolean populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig); - final boolean isBySegment = BaseQuery.getContextBySegment(query, false); + final boolean isBySegment = QueryContexts.isBySegment(query, false); final ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); - final int priority = BaseQuery.getContextPriority(query, 0); - contextBuilder.put("priority", priority); + final int priority = QueryContexts.getPriority(query, 0); + contextBuilder.put(QueryContexts.PRIORITY, priority); if (populateCache) { // prevent down-stream nodes from caching results as well if we are populating the cache @@ -177,7 +177,7 @@ public Sequence run(final Query query, final Map responseC // Note that enabling this leads to putting uncovered intervals information in the response headers // and might blow up in some cases https://github.com/druid-io/druid/issues/2108 - int uncoveredIntervalsLimit = BaseQuery.getContextUncoveredIntervalsLimit(query, 0); + int uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query, 0); if (uncoveredIntervalsLimit > 0) { List uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit); diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index cd6401462a0c..9f847e50410a 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -50,9 +50,9 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; -import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValueClass; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; @@ -134,7 +134,7 @@ public int getNumOpenConnections() public Sequence run(final Query query, final Map context) { QueryToolChest> toolChest = warehouse.getToolChest(query); - boolean isBySegment = BaseQuery.getContextBySegment(query, false); + boolean isBySegment = QueryContexts.isBySegment(query, false); Pair types = typesMap.get(query.getClass()); if (types == null) { diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 2bd7d192cbb3..8d3149128e8b 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -32,6 +32,7 @@ import com.metamx.emitter.service.ServiceEmitter; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -40,7 +41,7 @@ import io.druid.query.DruidMetrics; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; import io.druid.query.QuerySegmentWalker; @@ -177,7 +178,7 @@ public Response doPost( ) throws IOException { final long startNs = System.nanoTime(); - Query query = null; + Query query = null; QueryToolChest toolChest = null; String queryId = null; @@ -191,14 +192,11 @@ public Response doPost( queryId = UUID.randomUUID().toString(); query = query.withId(queryId); } - if (query.getContextValue(QueryContextKeys.TIMEOUT) == null) { - query = query.withOverriddenContext( - ImmutableMap.of( - QueryContextKeys.TIMEOUT, - config.getMaxIdleTime().toStandardDuration().getMillis() - ) - ); + + if (QueryContexts.getTimeout(query) < 0) { + throw new IAE("Timeout must be a non negative value, but was [%d]", QueryContexts.getTimeout(query)); } + toolChest = warehouse.getToolChest(query); Thread.currentThread() diff --git a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java index 66164f91c706..4d0ad861dd70 100644 --- a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java +++ b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java @@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.collect.Iterables; -import io.druid.query.BaseQuery; import io.druid.query.Query; +import io.druid.query.QueryContexts; /** */ @@ -46,7 +46,7 @@ public PriorityTieredBrokerSelectorStrategy( @Override public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) { - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query, 0); if (priority < minPriority) { return Optional.of( From 880d848caacef1b2a8c90836eb9464525e586c4f Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 1 Apr 2017 12:02:46 +0900 Subject: [PATCH 2/9] Refactoring BlockingPool and add more methods to QueryContexts --- .../io/druid/collections/BlockingPool.java | 122 +++++++++++------- .../query/scan/ScanQueryRunnerFactory.java | 2 +- .../java/io/druid/query/AsyncQueryRunner.java | 4 +- .../io/druid/query/BySegmentQueryRunner.java | 2 +- .../query/BySegmentSkippingQueryRunner.java | 2 +- .../query/ChainedExecutionQueryRunner.java | 4 +- .../query/FinalizeResultsQueryRunner.java | 2 +- .../druid/query/GroupByMergedQueryRunner.java | 6 +- .../java/io/druid/query/QueryContexts.java | 43 +++++- .../groupby/GroupByQueryQueryToolChest.java | 2 +- .../GroupByMergingQueryRunnerV2.java | 6 +- .../groupby/strategy/GroupByStrategyV2.java | 9 +- .../SegmentMetadataQueryRunnerFactory.java | 8 +- .../search/SearchQueryQueryToolChest.java | 2 +- .../query/topn/TopNQueryQueryToolChest.java | 2 +- .../main/java/io/druid/client/CacheUtil.java | 4 +- .../druid/client/CachingClusteredClient.java | 6 +- .../io/druid/client/DirectDruidClient.java | 2 +- .../java/io/druid/server/QueryResource.java | 4 - .../PriorityTieredBrokerSelectorStrategy.java | 2 +- 20 files changed, 155 insertions(+), 79 deletions(-) diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index 9a883da35cfb..54ae2c9a047a 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -74,40 +74,56 @@ public int getPoolSize() } /** - * Take a resource from the pool. + * Take a resource from the pool, waiting up to the + * specified wait time if necessary for an element to become available. * - * @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout. + * @param timeout maximum time to wait for a resource, in milliseconds. * * @return a resource, or null if the timeout was reached */ public ReferenceCountingResourceHolder take(final long timeout) { checkInitialized(); - final T theObject; try { - if (timeout > -1) { - theObject = timeout > 0 ? poll(timeout) : poll(); - } else { - theObject = take(); - } - return theObject == null ? null : new ReferenceCountingResourceHolder<>( - theObject, - new Closeable() - { - @Override - public void close() throws IOException - { - offer(theObject); - } - } - ); + return wrapObject(timeout > 0 ? pollObject(timeout) : pollObject()); } catch (InterruptedException e) { throw Throwables.propagate(e); } } - private T poll() + /** + * Take a resource from the pool, waiting if necessary until an element becomes available. + * + * @return a resource + */ + public ReferenceCountingResourceHolder take() + { + checkInitialized(); + try { + return wrapObject(takeObject()); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + private ReferenceCountingResourceHolder wrapObject(T theObject) + { + return theObject == null ? null : new ReferenceCountingResourceHolder<>( + theObject, + new Closeable() + { + @Override + public void close() throws IOException + { + offer(theObject); + } + } + ); + } + + private T pollObject() { final ReentrantLock lock = this.lock; lock.lock(); @@ -118,7 +134,7 @@ private T poll() } } - private T poll(long timeout) throws InterruptedException + private T pollObject(long timeout) throws InterruptedException { long nanos = TIME_UNIT.toNanos(timeout); final ReentrantLock lock = this.lock; @@ -136,7 +152,7 @@ private T poll(long timeout) throws InterruptedException } } - private T take() throws InterruptedException + private T takeObject() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); @@ -151,41 +167,59 @@ private T take() throws InterruptedException } /** - * Take a resource from the pool. + * Take resources from the pool, waiting up to the + * specified wait time if necessary for elements of the given number to become available. * * @param elementNum number of resources to take - * @param timeout maximum time to wait for resources, in milliseconds. Negative means do not use a timeout. + * @param timeout maximum time to wait for resources, in milliseconds. * * @return a resource, or null if the timeout was reached */ public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeout) { checkInitialized(); - final List objects; try { - if (timeout > -1) { - objects = timeout > 0 ? pollBatch(elementNum, timeout) : pollBatch(elementNum); - } else { - objects = takeBatch(elementNum); - } - return objects == null ? null : new ReferenceCountingResourceHolder<>( - objects, - new Closeable() - { - @Override - public void close() throws IOException - { - offerBatch(objects); - } - } - ); + return wrapObjects(timeout > 0 ? pollObjects(elementNum, timeout) : pollObjects(elementNum)); } catch (InterruptedException e) { throw Throwables.propagate(e); } } - private List pollBatch(int elementNum) throws InterruptedException + /** + * Take resources from the pool, waiting if necessary until the elements of the given number become available. + * + * @param elementNum number of resources to take + * + * @return a resource + */ + public ReferenceCountingResourceHolder> takeBatch(final int elementNum) + { + checkInitialized(); + try { + return wrapObjects(takeObjects(elementNum)); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + private ReferenceCountingResourceHolder> wrapObjects(List theObjects) + { + return theObjects == null ? null : new ReferenceCountingResourceHolder<>( + theObjects, + new Closeable() + { + @Override + public void close() throws IOException + { + offerBatch(theObjects); + } + } + ); + } + + private List pollObjects(int elementNum) throws InterruptedException { final List list = Lists.newArrayListWithCapacity(elementNum); final ReentrantLock lock = this.lock; @@ -204,7 +238,7 @@ private List pollBatch(int elementNum) throws InterruptedException } } - private List pollBatch(int elementNum, long timeout) throws InterruptedException + private List pollObjects(int elementNum, long timeout) throws InterruptedException { long nanos = TIME_UNIT.toNanos(timeout); final List list = Lists.newArrayListWithCapacity(elementNum); @@ -226,7 +260,7 @@ private List pollBatch(int elementNum, long timeout) throws InterruptedExcept } } - private List takeBatch(int elementNum) throws InterruptedException + private List takeObjects(int elementNum) throws InterruptedException { final List list = Lists.newArrayListWithCapacity(elementNum); final ReentrantLock lock = this.lock; diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index 6f57403c1de5..2565b06e721b 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -120,7 +120,7 @@ public Sequence run( final Number timeoutAt = (Number) responseContext.get(CTX_TIMEOUT_AT); if (timeoutAt == null || timeoutAt.longValue() == 0L) { responseContext.put(CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT); - }; + } return engine.process((ScanQuery) query, segment, responseContext); } } diff --git a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java index f36461f096cd..13daf0f4e007 100644 --- a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java +++ b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java @@ -49,7 +49,7 @@ public AsyncQueryRunner(QueryRunner baseRunner, ExecutorService executor, Que @Override public Sequence run(final Query query, final Map responseContext) { - final int priority = QueryContexts.getPriority(query, 0); + final int priority = QueryContexts.getPriority(query); final ListenableFuture> future = executor.submit(new AbstractPrioritizedCallable>(priority) { @Override @@ -69,7 +69,7 @@ public Sequence get() { try { final long timeout = QueryContexts.getTimeout(query); - if (timeout == 0) { + if (QueryContexts.isNoTimeout(timeout)) { return future.get(); } else { return future.get(timeout, TimeUnit.MILLISECONDS); diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java index 1ac46cf4d0c0..e57b9471ce3b 100644 --- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java @@ -51,7 +51,7 @@ public BySegmentQueryRunner( @SuppressWarnings("unchecked") public Sequence run(final Query query, Map responseContext) { - if (QueryContexts.isBySegment(query, false)) { + if (QueryContexts.isBySegment(query)) { final Sequence baseSequence = base.run(query, responseContext); final List results = Sequences.toList(baseSequence, Lists.newArrayList()); return Sequences.simple( diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java index 8c5e987ed89f..373c2e3b1192 100644 --- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java @@ -40,7 +40,7 @@ public BySegmentSkippingQueryRunner( @Override public Sequence run(Query query, Map responseContext) { - if (QueryContexts.isBySegment(query, false)) { + if (QueryContexts.isBySegment(query)) { return baseRunner.run(query, responseContext); } diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 0ec9120a99d2..ae5641d26af7 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -91,7 +91,7 @@ public ChainedExecutionQueryRunner( @Override public Sequence run(final Query query, final Map responseContext) { - final int priority = QueryContexts.getPriority(query, 0); + final int priority = QueryContexts.getPriority(query); final Ordering ordering = query.getResultOrdering(); return new BaseSequence>( @@ -155,7 +155,7 @@ public Iterable call() throws Exception final long timeout = QueryContexts.getTimeout(query); return new MergeIterable<>( ordering.nullsFirst(), - timeout == 0 ? + QueryContexts.isNoTimeout(timeout) ? futures.get() : futures.get(timeout, TimeUnit.MILLISECONDS) ).iterator(); diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index f76b5e6434a5..7fe58ee06b90 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -49,7 +49,7 @@ public FinalizeResultsQueryRunner( @Override public Sequence run(final Query query, Map responseContext) { - final boolean isBySegment = QueryContexts.isBySegment(query, false); + final boolean isBySegment = QueryContexts.isBySegment(query); final boolean shouldFinalize = QueryContexts.isFinalize(query, true); final Query queryToRun; diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index 4b8cf9794f41..c10f89ac01f6 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -90,8 +90,8 @@ public Sequence run(final Query queryParam, final Map resp true ); final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); - final boolean bySegment = QueryContexts.isBySegment(query, false); - final int priority = QueryContexts.getPriority(query, 0); + final boolean bySegment = QueryContexts.isBySegment(query); + final int priority = QueryContexts.getPriority(query); ListenableFuture> futures = Futures.allAsList( Lists.newArrayList( @@ -179,7 +179,7 @@ private void waitForFutureCompletion( try { queryWatcher.registerQuery(query, future); final long timeout = QueryContexts.getTimeout(query); - if (timeout == 0) { + if (QueryContexts.isNoTimeout(timeout)) { future.get(); } else { future.get(timeout, TimeUnit.MILLISECONDS); diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index e18a160c1a6b..277ac25f3df7 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -19,6 +19,7 @@ package io.druid.query; +import com.google.common.base.Preconditions; import io.druid.java.util.common.ISE; public class QueryContexts @@ -27,18 +28,39 @@ public class QueryContexts public static final String TIMEOUT = "timeout"; public static final String CHUNK_PERIOD = "chunkPeriod"; + public static final boolean DEFAULT_BY_SEGMENT = false; + public static final boolean DEFAULT_POPULATE_CACHE = true; + public static final boolean DEFAULT_USE_CACHE = true; + public static final int DEFAULT_PRIORITY = 0; + public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0; public static final long DEFAULT_TIMEOUT = 0; + public static final long NO_TIMEOUT = 0; + + public static boolean isBySegment(Query query) + { + return isBySegment(query, DEFAULT_BY_SEGMENT); + } public static boolean isBySegment(Query query, boolean defaultValue) { return parseBoolean(query, "bySegment", defaultValue); } + public static boolean isPopulateCache(Query query) + { + return isPopulateCache(query, DEFAULT_POPULATE_CACHE); + } + public static boolean isPopulateCache(Query query, boolean defaultValue) { return parseBoolean(query, "populateCache", defaultValue); } + public static boolean isUseCache(Query query) + { + return isUseCache(query, DEFAULT_USE_CACHE); + } + public static boolean isUseCache(Query query, boolean defaultValue) { return parseBoolean(query, "useCache", defaultValue); @@ -49,11 +71,21 @@ public static boolean isFinalize(Query query, boolean defaultValue) return parseBoolean(query, "finalize", defaultValue); } + public static int getUncoveredIntervalsLimit(Query query) + { + return getUncoveredIntervalsLimit(query, DEFAULT_UNCOVERED_INTERVALS_LIMIT); + } + public static int getUncoveredIntervalsLimit(Query query, int defaultValue) { return parseInt(query, "uncoveredIntervalsLimit", defaultValue); } + public static int getPriority(Query query) + { + return getPriority(query, DEFAULT_PRIORITY); + } + public static int getPriority(Query query, int defaultValue) { return parseInt(query, PRIORITY, defaultValue); @@ -66,7 +98,12 @@ public static String getChunkPeriod(Query query) public static boolean hasTimeout(Query query) { - return getTimeout(query) != 0; + return getTimeout(query) != NO_TIMEOUT; + } + + public static boolean isNoTimeout(long timeout) + { + return timeout == NO_TIMEOUT; } public static long getTimeout(Query query) @@ -76,7 +113,9 @@ public static long getTimeout(Query query) public static long getTimeout(Query query, long defaultValue) { - return parseLong(query, TIMEOUT, defaultValue); + final long timeout = parseLong(query, TIMEOUT, defaultValue); + Preconditions.checkState(timeout >= 0, "Timeout must be a non negative value, but was [%d]", timeout); + return timeout; } static long parseLong(Query query, String key, long defaultValue) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index b2fab6ff5c04..ddf9c4277c3f 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -115,7 +115,7 @@ public QueryRunner mergeResults(final QueryRunner runner) @Override public Sequence run(Query query, Map responseContext) { - if (QueryContexts.isBySegment(query, false)) { + if (QueryContexts.isBySegment(query)) { return runner.run(query, responseContext); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 94e0f9d9004b..5850a72426ad 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -123,7 +123,7 @@ public Sequence run(final Query queryParam, final Map responseContext) ImmutableMap.of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true) ); - if (QueryContexts.isBySegment(query, false) || forceChainedExecution) { + if (QueryContexts.isBySegment(query) || forceChainedExecution) { return new ChainedExecutionQueryRunner(exec, queryWatcher, queryables).run(query, responseContext); } @@ -139,7 +139,7 @@ public Sequence run(final Query queryParam, final Map responseContext) String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) ); - final int priority = QueryContexts.getPriority(query, 0); + final int priority = QueryContexts.getPriority(query); // Figure out timeoutAt time now, so we can apply the timeout to both the mergeBufferPool.take and the actual // query processing together. @@ -173,7 +173,7 @@ public CloseableGrouperIterator make() throw new TimeoutException(); } } else { - mergeBufferHolder = mergeBufferPool.take(-1); + mergeBufferHolder = mergeBufferPool.take(); } resources.add(mergeBufferHolder); } diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 0c28b95aef9d..b190f2136970 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -140,10 +140,13 @@ public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMerg } else if (requiredMergeBufferNum == 0) { return new GroupByQueryResource(); } else { + final ResourceHolder> mergeBufferHolders; final long timeout = QueryContexts.getTimeout(query); - final ResourceHolder> mergeBufferHolders = mergeBufferPool.takeBatch( - requiredMergeBufferNum, timeout - ); + if (QueryContexts.isNoTimeout(timeout)) { + mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum); + } else { + mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum, timeout); + } if (mergeBufferHolders == null) { throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); } else { diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 1097e65c5495..de78b0f68905 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -201,7 +201,7 @@ public Sequence run( final Map responseContext ) { - final int priority = QueryContexts.getPriority(query, 0); + final int priority = QueryContexts.getPriority(query); ListenableFuture> future = queryExecutor.submit( new AbstractPrioritizedCallable>(priority) { @@ -217,7 +217,11 @@ public Sequence call() throws Exception try { queryWatcher.registerQuery(query, future); final long timeout = QueryContexts.getTimeout(query); - return timeout == 0 ? future.get() : future.get(timeout, TimeUnit.MILLISECONDS); + if (QueryContexts.isNoTimeout(timeout)) { + return future.get(); + } else { + return future.get(timeout, TimeUnit.MILLISECONDS); + } } catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index 6f74a0572e09..d84a7396e3e6 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -391,7 +391,7 @@ public Sequence> run( return runner.run(query, responseContext); } - final boolean isBySegment = QueryContexts.isBySegment(query, false); + final boolean isBySegment = QueryContexts.isBySegment(query); return Sequences.map( runner.run(query.withLimit(config.getMaxSearchLimit()), responseContext), diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index 9b51cf13c1d5..7d9be82c6fb4 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -535,7 +535,7 @@ public Sequence> run( return runner.run(query, responseContext); } - final boolean isBySegment = QueryContexts.isBySegment(query, false); + final boolean isBySegment = QueryContexts.isBySegment(query); return Sequences.map( runner.run(query.withThreshold(minTopNThreshold), responseContext), diff --git a/server/src/main/java/io/druid/client/CacheUtil.java b/server/src/main/java/io/druid/client/CacheUtil.java index 636b39e06408..a5bce97feadd 100644 --- a/server/src/main/java/io/druid/client/CacheUtil.java +++ b/server/src/main/java/io/druid/client/CacheUtil.java @@ -123,7 +123,7 @@ private static boolean useCache( CacheConfig cacheConfig ) { - return QueryContexts.isUseCache(query, true) + return QueryContexts.isUseCache(query) && strategy != null && cacheConfig.isUseCache() && cacheConfig.isQueryCacheable(query); @@ -135,7 +135,7 @@ private static boolean populateCache( CacheConfig cacheConfig ) { - return QueryContexts.isPopulateCache(query, true) + return QueryContexts.isPopulateCache(query) && strategy != null && cacheConfig.isPopulateCache() && cacheConfig.isQueryCacheable(query); diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 8acce31287ef..fd8e090405f4 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -151,11 +151,11 @@ public Sequence run(final Query query, final Map responseC final boolean useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig); final boolean populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig); - final boolean isBySegment = QueryContexts.isBySegment(query, false); + final boolean isBySegment = QueryContexts.isBySegment(query); final ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); - final int priority = QueryContexts.getPriority(query, 0); + final int priority = QueryContexts.getPriority(query); contextBuilder.put(QueryContexts.PRIORITY, priority); if (populateCache) { @@ -177,7 +177,7 @@ public Sequence run(final Query query, final Map responseC // Note that enabling this leads to putting uncovered intervals information in the response headers // and might blow up in some cases https://github.com/druid-io/druid/issues/2108 - int uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query, 0); + int uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query); if (uncoveredIntervalsLimit > 0) { List uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit); diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 9f847e50410a..d8dc7882a74b 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -134,7 +134,7 @@ public int getNumOpenConnections() public Sequence run(final Query query, final Map context) { QueryToolChest> toolChest = warehouse.getToolChest(query); - boolean isBySegment = QueryContexts.isBySegment(query, false); + boolean isBySegment = QueryContexts.isBySegment(query); Pair types = typesMap.get(query.getClass()); if (types == null) { diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 8d3149128e8b..ce723850f931 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -193,10 +193,6 @@ public Response doPost( query = query.withId(queryId); } - if (QueryContexts.getTimeout(query) < 0) { - throw new IAE("Timeout must be a non negative value, but was [%d]", QueryContexts.getTimeout(query)); - } - toolChest = warehouse.getToolChest(query); Thread.currentThread() diff --git a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java index 4d0ad861dd70..c4af26e0b10d 100644 --- a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java +++ b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java @@ -46,7 +46,7 @@ public PriorityTieredBrokerSelectorStrategy( @Override public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) { - final int priority = QueryContexts.getPriority(query, 0); + final int priority = QueryContexts.getPriority(query); if (priority < minPriority) { return Optional.of( From 7196bec1e2309ada64cbe08512f338eea93e5409 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 1 Apr 2017 12:09:16 +0900 Subject: [PATCH 3/9] remove unused imports --- server/src/main/java/io/druid/server/QueryResource.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index ce723850f931..df992db1389e 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -32,7 +32,6 @@ import com.metamx.emitter.service.ServiceEmitter; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; -import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -41,7 +40,6 @@ import io.druid.query.DruidMetrics; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; -import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; import io.druid.query.QuerySegmentWalker; From a899554fdcb2e0a72baf25b7111819bb4c5f19e9 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 4 Apr 2017 09:46:43 +0900 Subject: [PATCH 4/9] Addressed comments --- .../io/druid/collections/BlockingPool.java | 22 ++++++++++--------- .../query/scan/ScanQueryRunnerFactory.java | 4 ++++ 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index 54ae2c9a047a..af48f53cf098 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -77,15 +77,16 @@ public int getPoolSize() * Take a resource from the pool, waiting up to the * specified wait time if necessary for an element to become available. * - * @param timeout maximum time to wait for a resource, in milliseconds. + * @param timeoutMs maximum time to wait for a resource, in milliseconds. * * @return a resource, or null if the timeout was reached */ - public ReferenceCountingResourceHolder take(final long timeout) + public ReferenceCountingResourceHolder take(final long timeoutMs) { + Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); checkInitialized(); try { - return wrapObject(timeout > 0 ? pollObject(timeout) : pollObject()); + return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject()); } catch (InterruptedException e) { throw Throwables.propagate(e); @@ -134,9 +135,9 @@ private T pollObject() } } - private T pollObject(long timeout) throws InterruptedException + private T pollObject(long timeoutMs) throws InterruptedException { - long nanos = TIME_UNIT.toNanos(timeout); + long nanos = TIME_UNIT.toNanos(timeoutMs); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { @@ -171,15 +172,16 @@ private T takeObject() throws InterruptedException * specified wait time if necessary for elements of the given number to become available. * * @param elementNum number of resources to take - * @param timeout maximum time to wait for resources, in milliseconds. + * @param timeoutMs maximum time to wait for resources, in milliseconds. * * @return a resource, or null if the timeout was reached */ - public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeout) + public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeoutMs) { + Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); checkInitialized(); try { - return wrapObjects(timeout > 0 ? pollObjects(elementNum, timeout) : pollObjects(elementNum)); + return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum)); } catch (InterruptedException e) { throw Throwables.propagate(e); @@ -238,9 +240,9 @@ private List pollObjects(int elementNum) throws InterruptedException } } - private List pollObjects(int elementNum, long timeout) throws InterruptedException + private List pollObjects(int elementNum, long timeoutMs) throws InterruptedException { - long nanos = TIME_UNIT.toNanos(timeout); + long nanos = TIME_UNIT.toNanos(timeoutMs); final List list = Lists.newArrayListWithCapacity(elementNum); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index 2565b06e721b..fcfe11b43664 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -36,6 +36,8 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory { + // This variable indicates when a running query should be expired, + // and is effective only when 'timeout' of queryContext has a positive value. public static final String CTX_TIMEOUT_AT = "timeoutAt"; public static final String CTX_COUNT = "count"; private final ScanQueryQueryToolChest toolChest; @@ -71,6 +73,8 @@ public Sequence run( final Query query, final Map responseContext ) { + // Note: this variable is effective only when queryContext has a timeout. + // See the comment of CTX_TIMEOUT_AT. final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(query); responseContext.put(CTX_TIMEOUT_AT, timeoutAt); return Sequences.concat( From 8c2d10fdb0cb5f310cf2da13352b9ec07b18352b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 6 Apr 2017 11:50:46 +0900 Subject: [PATCH 5/9] Address comments --- .../src/main/java/io/druid/query/AsyncQueryRunner.java | 7 +++---- .../java/io/druid/query/ChainedExecutionQueryRunner.java | 7 +++---- .../main/java/io/druid/query/GroupByMergedQueryRunner.java | 7 +++---- .../io/druid/query/groupby/strategy/GroupByStrategyV2.java | 7 +++---- .../query/metadata/SegmentMetadataQueryRunnerFactory.java | 7 +++---- 5 files changed, 15 insertions(+), 20 deletions(-) diff --git a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java index 13daf0f4e007..ce5f1fc64b1b 100644 --- a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java +++ b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java @@ -68,11 +68,10 @@ public Sequence call() throws Exception public Sequence get() { try { - final long timeout = QueryContexts.getTimeout(query); - if (QueryContexts.isNoTimeout(timeout)) { - return future.get(); + if (QueryContexts.hasTimeout(query)) { + return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS); } else { - return future.get(timeout, TimeUnit.MILLISECONDS); + return future.get(); } } catch (ExecutionException | InterruptedException | TimeoutException ex) { throw Throwables.propagate(ex); diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index ae5641d26af7..117d6436082b 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -152,12 +152,11 @@ public Iterable call() throws Exception queryWatcher.registerQuery(query, futures); try { - final long timeout = QueryContexts.getTimeout(query); return new MergeIterable<>( ordering.nullsFirst(), - QueryContexts.isNoTimeout(timeout) ? - futures.get() : - futures.get(timeout, TimeUnit.MILLISECONDS) + QueryContexts.hasTimeout(query) ? + futures.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) : + futures.get() ).iterator(); } catch (InterruptedException e) { diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index c10f89ac01f6..77775f295c6f 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -178,11 +178,10 @@ private void waitForFutureCompletion( { try { queryWatcher.registerQuery(query, future); - final long timeout = QueryContexts.getTimeout(query); - if (QueryContexts.isNoTimeout(timeout)) { - future.get(); + if (QueryContexts.hasTimeout(query)) { + future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS); } else { - future.get(timeout, TimeUnit.MILLISECONDS); + future.get(); } } catch (InterruptedException e) { diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index b190f2136970..e0e7273f572b 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -141,11 +141,10 @@ public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMerg return new GroupByQueryResource(); } else { final ResourceHolder> mergeBufferHolders; - final long timeout = QueryContexts.getTimeout(query); - if (QueryContexts.isNoTimeout(timeout)) { - mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum); + if (QueryContexts.hasTimeout(query)) { + mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum, QueryContexts.getTimeout(query)); } else { - mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum, timeout); + mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum); } if (mergeBufferHolders == null) { throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index de78b0f68905..475d98453df9 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -216,11 +216,10 @@ public Sequence call() throws Exception ); try { queryWatcher.registerQuery(query, future); - final long timeout = QueryContexts.getTimeout(query); - if (QueryContexts.isNoTimeout(timeout)) { - return future.get(); + if (QueryContexts.hasTimeout(query)) { + return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS); } else { - return future.get(timeout, TimeUnit.MILLISECONDS); + return future.get(); } } catch (InterruptedException e) { From 8b3c009700db5a727db7fd3c9410ff793eb70c62 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 6 Apr 2017 16:12:32 +0900 Subject: [PATCH 6/9] remove unused method --- processing/src/main/java/io/druid/query/QueryContexts.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index 277ac25f3df7..f254bf44fbb1 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -101,11 +101,6 @@ public static boolean hasTimeout(Query query) return getTimeout(query) != NO_TIMEOUT; } - public static boolean isNoTimeout(long timeout) - { - return timeout == NO_TIMEOUT; - } - public static long getTimeout(Query query) { return getTimeout(query, DEFAULT_TIMEOUT); From c9ce123a94fa392579b83eef8f37c5fc365b8056 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 18 Apr 2017 21:25:51 +0900 Subject: [PATCH 7/9] Make default query timeout configurable --- docs/content/configuration/broker.md | 1 + docs/content/configuration/historical.md | 1 + docs/content/querying/query-context.md | 20 +-- .../main/java/io/druid/query/BaseQuery.java | 7 + .../src/main/java/io/druid/query/Query.java | 3 + .../java/io/druid/query/QueryContexts.java | 29 ++-- .../io/druid/query/AsyncQueryRunnerTest.java | 2 +- .../ChainedExecutionQueryRunnerTest.java | 2 +- .../io/druid/query/QueryContextsTest.java | 128 ++++++++++++++++++ .../DataSourceMetadataQueryTest.java | 2 +- .../groupby/GroupByQueryMergeBufferTest.java | 8 +- .../GroupByQueryRunnerFailureTest.java | 6 +- .../query/groupby/GroupByQueryRunnerTest.java | 4 +- .../timeboundary/TimeBoundaryQueryTest.java | 4 +- .../druid/client/CachingClusteredClient.java | 2 +- .../java/io/druid/server/QueryResource.java | 1 + .../server/initialization/ServerConfig.java | 16 ++- 17 files changed, 196 insertions(+), 40 deletions(-) create mode 100644 processing/src/test/java/io/druid/query/QueryContextsTest.java diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 676d7ac31a08..4cb37a3a95a2 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -39,6 +39,7 @@ Druid uses Jetty to serve HTTP requests. |`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20| |`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| |`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M| +|`druid.server.http.defaultQueryTimeout`|Query timeout, beyond which unfinished queries will be cancelled|PT5M| #### Retry Policy diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index 7ad71ae9ea08..f862571a069b 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -48,6 +48,7 @@ Druid uses Jetty to serve HTTP requests. |--------|-----------|-------| |`druid.server.http.numThreads`|Number of threads for HTTP requests.|10| |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| +|`druid.server.http.defaultQueryTimeout`|Query timeout, beyond which unfinished queries will be cancelled|PT5M| #### Processing diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index 0c475217a72b..bee0c4a3c87f 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -7,16 +7,16 @@ Query Context The query context is used for various query configuration parameters. The following parameters apply to all queries. -|property |default | description | -|-----------------|---------------------|----------------------| -|timeout | `0` (no timeout) | Query timeout in milliseconds, beyond which unfinished queries will be cancelled. | -|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| -|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | -|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration | -|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration | -|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | -|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | -|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | +|property |default | description | +|-----------------|----------------------------------------|----------------------| +|timeout | `druid.server.http.defaultQueryTimeout`| Query timeout, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) | +|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| +|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | +|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration | +|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration | +|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | +|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | +|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | In addition, some query types offer context parameters specific to that query type. diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 0186e00075ab..93f0a738a5fb 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -28,6 +28,7 @@ import io.druid.query.spec.QuerySegmentSpec; import org.joda.time.Duration; import org.joda.time.Interval; +import org.joda.time.Period; import java.util.List; import java.util.Map; @@ -176,6 +177,12 @@ public Query withId(String id) return withOverriddenContext(ImmutableMap.of(QUERYID, id)); } + @Override + public Query withDefaultTimeout(Period defaultTimeout) + { + return withOverriddenContext(ImmutableMap.of(QueryContexts.DEFAULT_TIMEOUT_KEY, defaultTimeout)); + } + @Override public boolean equals(Object o) { diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 9ad178161ead..8dea9d73815e 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -35,6 +35,7 @@ import io.druid.query.topn.TopNQuery; import org.joda.time.Duration; import org.joda.time.Interval; +import org.joda.time.Period; import java.util.List; import java.util.Map; @@ -99,4 +100,6 @@ public interface Query String getId(); Query withDataSource(DataSource dataSource); + + Query withDefaultTimeout(Period defaultTimeout); } diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index f254bf44fbb1..9f1e68d9c070 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -19,21 +19,22 @@ package io.druid.query; -import com.google.common.base.Preconditions; import io.druid.java.util.common.ISE; +import org.joda.time.Period; public class QueryContexts { - public static final String PRIORITY = "priority"; - public static final String TIMEOUT = "timeout"; - public static final String CHUNK_PERIOD = "chunkPeriod"; + public static final String PRIORITY_KEY = "priority"; + public static final String TIMEOUT_KEY = "timeout"; + public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout"; + public static final String CHUNK_PERIOD_KEY = "chunkPeriod"; public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; public static final boolean DEFAULT_USE_CACHE = true; public static final int DEFAULT_PRIORITY = 0; public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0; - public static final long DEFAULT_TIMEOUT = 0; + public static final Period DEFAULT_TIMEOUT = new Period("PT5M"); public static final long NO_TIMEOUT = 0; public static boolean isBySegment(Query query) @@ -88,12 +89,12 @@ public static int getPriority(Query query) public static int getPriority(Query query, int defaultValue) { - return parseInt(query, PRIORITY, defaultValue); + return parseInt(query, PRIORITY_KEY, defaultValue); } public static String getChunkPeriod(Query query) { - return query.getContextValue(CHUNK_PERIOD, "P0D"); + return query.getContextValue(CHUNK_PERIOD_KEY, "P0D"); } public static boolean hasTimeout(Query query) @@ -103,14 +104,18 @@ public static boolean hasTimeout(Query query) public static long getTimeout(Query query) { - return getTimeout(query, DEFAULT_TIMEOUT); + return getTimeout(query, getDefaultTimeout(query)); } - public static long getTimeout(Query query, long defaultValue) + public static long getTimeout(Query query, Period defaultTimeout) { - final long timeout = parseLong(query, TIMEOUT, defaultValue); - Preconditions.checkState(timeout >= 0, "Timeout must be a non negative value, but was [%d]", timeout); - return timeout; + final Period timeout = query.getContextValue(TIMEOUT_KEY, defaultTimeout); + return timeout.toStandardDuration().getMillis(); + } + + static Period getDefaultTimeout(Query query) + { + return query.getContextValue(DEFAULT_TIMEOUT_KEY, DEFAULT_TIMEOUT); } static long parseLong(Query query, String key, long defaultValue) diff --git a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java index 1bb1ceb8993a..b6c074e7eeb6 100644 --- a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java @@ -100,7 +100,7 @@ public Sequence run(Query query, Map responseContext) QueryRunnerTestHelper.NOOP_QUERYWATCHER); Sequence lazy = asyncRunner.run( - query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT, 1)), + query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1)), Collections.EMPTY_MAP); try { diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index 5c72e937601d..311aaaa798f9 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -251,7 +251,7 @@ public Void answer() throws Throwable .dataSource("test") .intervals("2014/2015") .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) - .context(ImmutableMap.of(QueryContexts.TIMEOUT, 100, "queryId", "test")) + .context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 100, "queryId", "test")) .build(), context ); diff --git a/processing/src/test/java/io/druid/query/QueryContextsTest.java b/processing/src/test/java/io/druid/query/QueryContextsTest.java new file mode 100644 index 000000000000..2bafa5f55bdb --- /dev/null +++ b/processing/src/test/java/io/druid/query/QueryContextsTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.query.filter.DimFilter; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class QueryContextsTest +{ + private static class TestQuery extends BaseQuery + { + + public TestQuery(DataSource dataSource, QuerySegmentSpec querySegmentSpec, boolean descending, Map context) + { + super(dataSource, querySegmentSpec, descending, context); + } + + @Override + public boolean hasFilters() + { + return false; + } + + @Override + public DimFilter getFilter() + { + return null; + } + + @Override + public String getType() + { + return null; + } + + @Override + public Query withQuerySegmentSpec(QuerySegmentSpec spec) + { + return null; + } + + @Override + public Query withDataSource(DataSource dataSource) + { + return null; + } + + @Override + public Query withOverriddenContext(Map contextOverride) + { + return new TestQuery( + getDataSource(), + getQuerySegmentSpec(), + isDescending(), + computeOverridenContext(contextOverride) + ); + } + } + + @Test + public void testDefaultQueryTimeout() + { + final Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))), + false, + new HashMap() + ); + Assert.assertEquals(new Period("PT5M"), QueryContexts.getDefaultTimeout(query)); + } + + @Test + public void testEmptyQueryTimeout() + { + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))), + false, + new HashMap() + ); + Assert.assertEquals(300_000, QueryContexts.getTimeout(query)); + + query = query.withDefaultTimeout(new Period("PT1M")); + Assert.assertEquals(60_000, QueryContexts.getTimeout(query)); + } + + @Test + public void testQueryTimeout() + { + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))), + false, + ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period("PT1S")) + ); + Assert.assertEquals(1000, QueryContexts.getTimeout(query)); + + query = query.withDefaultTimeout(new Period("PT1H")); + Assert.assertEquals(1000, QueryContexts.getTimeout(query)); + } +} diff --git a/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java b/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java index 84ef7bd1e8f1..f66d577ec12d 100644 --- a/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java @@ -102,7 +102,7 @@ public void testContextSerde() throws Exception ), Query.class ); - Assert.assertEquals(1, serdeQuery.getContextValue(QueryContexts.PRIORITY)); + Assert.assertEquals(1, serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY)); Assert.assertEquals(true, serdeQuery.getContextValue("useCache")); Assert.assertEquals("true", serdeQuery.getContextValue("populateCache")); Assert.assertEquals(true, serdeQuery.getContextValue("finalize")); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index 066e22d73e01..5169a6f76ecf 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -230,7 +230,7 @@ public void testSimpleGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -258,7 +258,7 @@ public void testNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -297,7 +297,7 @@ public void testDoubleNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -349,7 +349,7 @@ public void testTripleNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java index 029d640d7637..6bb40a15daa8 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -201,7 +201,7 @@ public void testNotEnoughMergeBuffersOnQueryable() throws IOException .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -239,7 +239,7 @@ public void testResourceLimitExceededOnBroker() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -265,7 +265,7 @@ public void testInsufficientResourcesOnBroker() throws IOException .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) .build(); try { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 4a8eeb2f88c3..c22677ce3596 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -1214,7 +1214,7 @@ public void testGroupByTimeoutContextOverride() ) ) .setGranularity(QueryRunnerTestHelper.dayGran) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, Integer.valueOf(60000))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integer.valueOf(60000))) .build(); List expectedResults = Arrays.asList( @@ -5422,7 +5422,7 @@ public void testSubqueryWithContextTimeout() .setDimensions(Lists.newArrayList()) .setAggregatorSpecs(ImmutableList.of(new CountAggregatorFactory("count"))) .setGranularity(QueryRunnerTestHelper.allGran) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT, 10000)) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 10000)) .build(); List expectedResults = Arrays.asList( diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java index 0e8eef0dc401..cad7fe53119a 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java @@ -79,7 +79,7 @@ public void testContextSerde() throws Exception ); - Assert.assertEquals(new Integer(1), serdeQuery.getContextValue(QueryContexts.PRIORITY)); + Assert.assertEquals(new Integer(1), serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY)); Assert.assertEquals(true, serdeQuery.getContextValue("useCache")); Assert.assertEquals(true, serdeQuery.getContextValue("populateCache")); Assert.assertEquals(true, serdeQuery.getContextValue("finalize")); @@ -117,7 +117,7 @@ public void testContextSerde2() throws Exception ); - Assert.assertEquals("1", serdeQuery.getContextValue(QueryContexts.PRIORITY)); + Assert.assertEquals("1", serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY)); Assert.assertEquals("true", serdeQuery.getContextValue("useCache")); Assert.assertEquals("true", serdeQuery.getContextValue("populateCache")); Assert.assertEquals("true", serdeQuery.getContextValue("finalize")); diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index fd8e090405f4..c86123b0ec85 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -156,7 +156,7 @@ public Sequence run(final Query query, final Map responseC final ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); final int priority = QueryContexts.getPriority(query); - contextBuilder.put(QueryContexts.PRIORITY, priority); + contextBuilder.put(QueryContexts.PRIORITY_KEY, priority); if (populateCache) { // prevent down-stream nodes from caching results as well if we are populating the cache diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index f3700fb38607..554cc34de83c 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -190,6 +190,7 @@ public Response doPost( queryId = UUID.randomUUID().toString(); query = query.withId(queryId); } + query = query.withDefaultTimeout(config.getDefaultQueryTimeout()); toolChest = warehouse.getToolChest(query); diff --git a/server/src/main/java/io/druid/server/initialization/ServerConfig.java b/server/src/main/java/io/druid/server/initialization/ServerConfig.java index cad5ace2c96c..5772496ad7c2 100644 --- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java @@ -37,6 +37,10 @@ public class ServerConfig @NotNull private Period maxIdleTime = new Period("PT5m"); + @JsonProperty + @NotNull + private Period defaultQueryTimeout = new Period("PT5M"); + public int getNumThreads() { return numThreads; @@ -47,12 +51,18 @@ public Period getMaxIdleTime() return maxIdleTime; } + public Period getDefaultQueryTimeout() + { + return defaultQueryTimeout; + } + @Override public String toString() { return "ServerConfig{" + - "numThreads=" + numThreads + - ", maxIdleTime=" + maxIdleTime + - '}'; + "numThreads=" + numThreads + + ", maxIdleTime=" + maxIdleTime + + ", defaultQueryTimeout=" + defaultQueryTimeout + + '}'; } } From a510acf20753f3109a242da350846811b56d7547 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 18 Apr 2017 23:13:04 +0900 Subject: [PATCH 8/9] Fix test failure --- .../test/java/io/druid/query/AsyncQueryRunnerTest.java | 3 ++- .../druid/query/ChainedExecutionQueryRunnerTest.java | 3 ++- .../query/groupby/GroupByQueryMergeBufferTest.java | 10 +++++----- .../query/groupby/GroupByQueryRunnerFailureTest.java | 8 ++++---- .../io/druid/query/groupby/GroupByQueryRunnerTest.java | 4 ++-- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java index b6c074e7eeb6..2b4bbbf507e2 100644 --- a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java @@ -28,6 +28,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import org.easymock.EasyMock; +import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; @@ -100,7 +101,7 @@ public Sequence run(Query query, Map responseContext) QueryRunnerTestHelper.NOOP_QUERYWATCHER); Sequence lazy = asyncRunner.run( - query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1)), + query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(1))), Collections.EMPTY_MAP); try { diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index 311aaaa798f9..ee251c0e17df 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -31,6 +31,7 @@ import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; +import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -251,7 +252,7 @@ public Void answer() throws Throwable .dataSource("test") .intervals("2014/2015") .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) - .context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 100, "queryId", "test")) + .context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(100), "queryId", "test")) .build(), context ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index 5169a6f76ecf..c5108397124c 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -43,7 +43,7 @@ import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; -import org.bouncycastle.util.Integers; +import org.joda.time.Period; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -230,7 +230,7 @@ public void testSimpleGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -258,7 +258,7 @@ public void testNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -297,7 +297,7 @@ public void testDoubleNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -349,7 +349,7 @@ public void testTripleNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java index 6bb40a15daa8..d9ac3ea18ff5 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -46,8 +46,8 @@ import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; -import org.bouncycastle.util.Integers; import org.hamcrest.CoreMatchers; +import org.joda.time.Period; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -201,7 +201,7 @@ public void testNotEnoughMergeBuffersOnQueryable() throws IOException .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -239,7 +239,7 @@ public void testResourceLimitExceededOnBroker() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -265,7 +265,7 @@ public void testInsufficientResourcesOnBroker() throws IOException .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) .build(); try { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index c22677ce3596..bd846fe7ba22 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -1214,7 +1214,7 @@ public void testGroupByTimeoutContextOverride() ) ) .setGranularity(QueryRunnerTestHelper.dayGran) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, Integer.valueOf(60000))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(60000))) .build(); List expectedResults = Arrays.asList( @@ -5422,7 +5422,7 @@ public void testSubqueryWithContextTimeout() .setDimensions(Lists.newArrayList()) .setAggregatorSpecs(ImmutableList.of(new CountAggregatorFactory("count"))) .setGranularity(QueryRunnerTestHelper.allGran) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 10000)) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(10000))) .build(); List expectedResults = Arrays.asList( From 526f7083f3793a5ddce1601855f4971294bb9517 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 19 Apr 2017 08:53:03 +0900 Subject: [PATCH 9/9] Change timeout from period to millis --- docs/content/configuration/broker.md | 2 +- docs/content/configuration/historical.md | 2 +- docs/content/querying/query-context.md | 2 +- .../src/main/java/io/druid/query/BaseQuery.java | 3 +-- .../src/main/java/io/druid/query/Query.java | 3 +-- .../main/java/io/druid/query/QueryContexts.java | 17 ++++++++++------- .../io/druid/query/AsyncQueryRunnerTest.java | 3 +-- .../query/ChainedExecutionQueryRunnerTest.java | 3 +-- .../java/io/druid/query/QueryContextsTest.java | 9 ++++----- .../groupby/GroupByQueryMergeBufferTest.java | 9 ++++----- .../groupby/GroupByQueryRunnerFailureTest.java | 7 +++---- .../query/groupby/GroupByQueryRunnerTest.java | 4 ++-- .../server/initialization/ServerConfig.java | 6 +++--- 13 files changed, 33 insertions(+), 37 deletions(-) diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 4cb37a3a95a2..fa6a01b56476 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -36,10 +36,10 @@ Druid uses Jetty to serve HTTP requests. |--------|-----------|-------| |`druid.server.http.numThreads`|Number of threads for HTTP requests.|10| |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| +|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| |`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20| |`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| |`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M| -|`druid.server.http.defaultQueryTimeout`|Query timeout, beyond which unfinished queries will be cancelled|PT5M| #### Retry Policy diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index f862571a069b..e8b8832a4786 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -48,7 +48,7 @@ Druid uses Jetty to serve HTTP requests. |--------|-----------|-------| |`druid.server.http.numThreads`|Number of threads for HTTP requests.|10| |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| -|`druid.server.http.defaultQueryTimeout`|Query timeout, beyond which unfinished queries will be cancelled|PT5M| +|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| #### Processing diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index bee0c4a3c87f..b93ef4dd3258 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -9,7 +9,7 @@ The query context is used for various query configuration parameters. The follow |property |default | description | |-----------------|----------------------------------------|----------------------| -|timeout | `druid.server.http.defaultQueryTimeout`| Query timeout, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) | +|timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) | |priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | |useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration | diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 93f0a738a5fb..edb1ca5bf32e 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -28,7 +28,6 @@ import io.druid.query.spec.QuerySegmentSpec; import org.joda.time.Duration; import org.joda.time.Interval; -import org.joda.time.Period; import java.util.List; import java.util.Map; @@ -178,7 +177,7 @@ public Query withId(String id) } @Override - public Query withDefaultTimeout(Period defaultTimeout) + public Query withDefaultTimeout(long defaultTimeout) { return withOverriddenContext(ImmutableMap.of(QueryContexts.DEFAULT_TIMEOUT_KEY, defaultTimeout)); } diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 8dea9d73815e..cfbf6f2d3403 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -35,7 +35,6 @@ import io.druid.query.topn.TopNQuery; import org.joda.time.Duration; import org.joda.time.Interval; -import org.joda.time.Period; import java.util.List; import java.util.Map; @@ -101,5 +100,5 @@ public interface Query Query withDataSource(DataSource dataSource); - Query withDefaultTimeout(Period defaultTimeout); + Query withDefaultTimeout(long defaultTimeout); } diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index 9f1e68d9c070..b59c6bc2a2b7 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -19,8 +19,8 @@ package io.druid.query; +import com.google.common.base.Preconditions; import io.druid.java.util.common.ISE; -import org.joda.time.Period; public class QueryContexts { @@ -34,7 +34,7 @@ public class QueryContexts public static final boolean DEFAULT_USE_CACHE = true; public static final int DEFAULT_PRIORITY = 0; public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0; - public static final Period DEFAULT_TIMEOUT = new Period("PT5M"); + public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes public static final long NO_TIMEOUT = 0; public static boolean isBySegment(Query query) @@ -107,15 +107,18 @@ public static long getTimeout(Query query) return getTimeout(query, getDefaultTimeout(query)); } - public static long getTimeout(Query query, Period defaultTimeout) + public static long getTimeout(Query query, long defaultTimeout) { - final Period timeout = query.getContextValue(TIMEOUT_KEY, defaultTimeout); - return timeout.toStandardDuration().getMillis(); + final long timeout = parseLong(query, TIMEOUT_KEY, defaultTimeout); + Preconditions.checkState(timeout >= 0, "Timeout must be a non negative value, but was [%d]", timeout); + return timeout; } - static Period getDefaultTimeout(Query query) + static long getDefaultTimeout(Query query) { - return query.getContextValue(DEFAULT_TIMEOUT_KEY, DEFAULT_TIMEOUT); + final long defaultTimeout = parseLong(query, DEFAULT_TIMEOUT_KEY, DEFAULT_TIMEOUT_MILLIS); + Preconditions.checkState(defaultTimeout >= 0, "Timeout must be a non negative value, but was [%d]", defaultTimeout); + return defaultTimeout; } static long parseLong(Query query, String key, long defaultValue) diff --git a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java index 2b4bbbf507e2..b6c074e7eeb6 100644 --- a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java @@ -28,7 +28,6 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import org.easymock.EasyMock; -import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; @@ -101,7 +100,7 @@ public Sequence run(Query query, Map responseContext) QueryRunnerTestHelper.NOOP_QUERYWATCHER); Sequence lazy = asyncRunner.run( - query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(1))), + query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1)), Collections.EMPTY_MAP); try { diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index ee251c0e17df..311aaaa798f9 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -31,7 +31,6 @@ import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; -import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -252,7 +251,7 @@ public Void answer() throws Throwable .dataSource("test") .intervals("2014/2015") .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) - .context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(100), "queryId", "test")) + .context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 100, "queryId", "test")) .build(), context ); diff --git a/processing/src/test/java/io/druid/query/QueryContextsTest.java b/processing/src/test/java/io/druid/query/QueryContextsTest.java index 2bafa5f55bdb..c656f0771101 100644 --- a/processing/src/test/java/io/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/io/druid/query/QueryContextsTest.java @@ -25,7 +25,6 @@ import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.QuerySegmentSpec; import org.joda.time.Interval; -import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; @@ -93,7 +92,7 @@ public void testDefaultQueryTimeout() false, new HashMap() ); - Assert.assertEquals(new Period("PT5M"), QueryContexts.getDefaultTimeout(query)); + Assert.assertEquals(300_000, QueryContexts.getDefaultTimeout(query)); } @Test @@ -107,7 +106,7 @@ public void testEmptyQueryTimeout() ); Assert.assertEquals(300_000, QueryContexts.getTimeout(query)); - query = query.withDefaultTimeout(new Period("PT1M")); + query = query.withDefaultTimeout(60_000); Assert.assertEquals(60_000, QueryContexts.getTimeout(query)); } @@ -118,11 +117,11 @@ public void testQueryTimeout() new TableDataSource("test"), new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))), false, - ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period("PT1S")) + ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1000) ); Assert.assertEquals(1000, QueryContexts.getTimeout(query)); - query = query.withDefaultTimeout(new Period("PT1H")); + query = query.withDefaultTimeout(1_000_000); Assert.assertEquals(1000, QueryContexts.getTimeout(query)); } } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index c5108397124c..53e81977e298 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -43,7 +43,6 @@ import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; -import org.joda.time.Period; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -230,7 +229,7 @@ public void testSimpleGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -258,7 +257,7 @@ public void testNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -297,7 +296,7 @@ public void testDoubleNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -349,7 +348,7 @@ public void testTripleNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java index d9ac3ea18ff5..060012bf0eb5 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -47,7 +47,6 @@ import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; import org.hamcrest.CoreMatchers; -import org.joda.time.Period; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -201,7 +200,7 @@ public void testNotEnoughMergeBuffersOnQueryable() throws IOException .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -239,7 +238,7 @@ public void testResourceLimitExceededOnBroker() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -265,7 +264,7 @@ public void testInsufficientResourcesOnBroker() throws IOException .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); try { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index bd846fe7ba22..dfcc6d2b0982 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -1214,7 +1214,7 @@ public void testGroupByTimeoutContextOverride() ) ) .setGranularity(QueryRunnerTestHelper.dayGran) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(60000))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 60000)) .build(); List expectedResults = Arrays.asList( @@ -5422,7 +5422,7 @@ public void testSubqueryWithContextTimeout() .setDimensions(Lists.newArrayList()) .setAggregatorSpecs(ImmutableList.of(new CountAggregatorFactory("count"))) .setGranularity(QueryRunnerTestHelper.allGran) - .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, new Period(10000))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 10000)) .build(); List expectedResults = Arrays.asList( diff --git a/server/src/main/java/io/druid/server/initialization/ServerConfig.java b/server/src/main/java/io/druid/server/initialization/ServerConfig.java index 5772496ad7c2..560975abe186 100644 --- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java @@ -38,8 +38,8 @@ public class ServerConfig private Period maxIdleTime = new Period("PT5m"); @JsonProperty - @NotNull - private Period defaultQueryTimeout = new Period("PT5M"); + @Min(0) + private long defaultQueryTimeout = 300_000; // 5 minutes public int getNumThreads() { @@ -51,7 +51,7 @@ public Period getMaxIdleTime() return maxIdleTime; } - public Period getDefaultQueryTimeout() + public long getDefaultQueryTimeout() { return defaultQueryTimeout; }