diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index 9a883da35cfb..af48f53cf098 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -74,40 +74,57 @@ public int getPoolSize() } /** - * Take a resource from the pool. + * Take a resource from the pool, waiting up to the + * specified wait time if necessary for an element to become available. * - * @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout. + * @param timeoutMs maximum time to wait for a resource, in milliseconds. * * @return a resource, or null if the timeout was reached */ - public ReferenceCountingResourceHolder take(final long timeout) + public ReferenceCountingResourceHolder take(final long timeoutMs) { + Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); checkInitialized(); - final T theObject; try { - if (timeout > -1) { - theObject = timeout > 0 ? poll(timeout) : poll(); - } else { - theObject = take(); - } - return theObject == null ? null : new ReferenceCountingResourceHolder<>( - theObject, - new Closeable() - { - @Override - public void close() throws IOException - { - offer(theObject); - } - } - ); + return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject()); } catch (InterruptedException e) { throw Throwables.propagate(e); } } - private T poll() + /** + * Take a resource from the pool, waiting if necessary until an element becomes available. + * + * @return a resource + */ + public ReferenceCountingResourceHolder take() + { + checkInitialized(); + try { + return wrapObject(takeObject()); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + private ReferenceCountingResourceHolder wrapObject(T theObject) + { + return theObject == null ? null : new ReferenceCountingResourceHolder<>( + theObject, + new Closeable() + { + @Override + public void close() throws IOException + { + offer(theObject); + } + } + ); + } + + private T pollObject() { final ReentrantLock lock = this.lock; lock.lock(); @@ -118,9 +135,9 @@ private T poll() } } - private T poll(long timeout) throws InterruptedException + private T pollObject(long timeoutMs) throws InterruptedException { - long nanos = TIME_UNIT.toNanos(timeout); + long nanos = TIME_UNIT.toNanos(timeoutMs); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { @@ -136,7 +153,7 @@ private T poll(long timeout) throws InterruptedException } } - private T take() throws InterruptedException + private T takeObject() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); @@ -151,41 +168,60 @@ private T take() throws InterruptedException } /** - * Take a resource from the pool. + * Take resources from the pool, waiting up to the + * specified wait time if necessary for elements of the given number to become available. * * @param elementNum number of resources to take - * @param timeout maximum time to wait for resources, in milliseconds. Negative means do not use a timeout. + * @param timeoutMs maximum time to wait for resources, in milliseconds. * * @return a resource, or null if the timeout was reached */ - public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeout) + public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeoutMs) { + Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); checkInitialized(); - final List objects; try { - if (timeout > -1) { - objects = timeout > 0 ? pollBatch(elementNum, timeout) : pollBatch(elementNum); - } else { - objects = takeBatch(elementNum); - } - return objects == null ? null : new ReferenceCountingResourceHolder<>( - objects, - new Closeable() - { - @Override - public void close() throws IOException - { - offerBatch(objects); - } - } - ); + return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum)); } catch (InterruptedException e) { throw Throwables.propagate(e); } } - private List pollBatch(int elementNum) throws InterruptedException + /** + * Take resources from the pool, waiting if necessary until the elements of the given number become available. + * + * @param elementNum number of resources to take + * + * @return a resource + */ + public ReferenceCountingResourceHolder> takeBatch(final int elementNum) + { + checkInitialized(); + try { + return wrapObjects(takeObjects(elementNum)); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + private ReferenceCountingResourceHolder> wrapObjects(List theObjects) + { + return theObjects == null ? null : new ReferenceCountingResourceHolder<>( + theObjects, + new Closeable() + { + @Override + public void close() throws IOException + { + offerBatch(theObjects); + } + } + ); + } + + private List pollObjects(int elementNum) throws InterruptedException { final List list = Lists.newArrayListWithCapacity(elementNum); final ReentrantLock lock = this.lock; @@ -204,9 +240,9 @@ private List pollBatch(int elementNum) throws InterruptedException } } - private List pollBatch(int elementNum, long timeout) throws InterruptedException + private List pollObjects(int elementNum, long timeoutMs) throws InterruptedException { - long nanos = TIME_UNIT.toNanos(timeout); + long nanos = TIME_UNIT.toNanos(timeoutMs); final List list = Lists.newArrayListWithCapacity(elementNum); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); @@ -226,7 +262,7 @@ private List pollBatch(int elementNum, long timeout) throws InterruptedExcept } } - private List takeBatch(int elementNum) throws InterruptedException + private List takeObjects(int elementNum) throws InterruptedException { final List list = Lists.newArrayListWithCapacity(elementNum); final ReentrantLock lock = this.lock; diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index 676d7ac31a08..fa6a01b56476 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -36,6 +36,7 @@ Druid uses Jetty to serve HTTP requests. |--------|-----------|-------| |`druid.server.http.numThreads`|Number of threads for HTTP requests.|10| |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| +|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| |`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20| |`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip| |`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M| diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index 7ad71ae9ea08..e8b8832a4786 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -48,6 +48,7 @@ Druid uses Jetty to serve HTTP requests. |--------|-----------|-------| |`druid.server.http.numThreads`|Number of threads for HTTP requests.|10| |`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| +|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000| #### Processing diff --git a/docs/content/querying/query-context.md b/docs/content/querying/query-context.md index 0c475217a72b..b93ef4dd3258 100644 --- a/docs/content/querying/query-context.md +++ b/docs/content/querying/query-context.md @@ -7,16 +7,16 @@ Query Context The query context is used for various query configuration parameters. The following parameters apply to all queries. -|property |default | description | -|-----------------|---------------------|----------------------| -|timeout | `0` (no timeout) | Query timeout in milliseconds, beyond which unfinished queries will be cancelled. | -|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| -|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | -|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration | -|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration | -|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | -|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | -|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | +|property |default | description | +|-----------------|----------------------------------------|----------------------| +|timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [broker configuration](broker.html) | +|priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| +|queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | +|useCache | `true` | Flag indicating whether to leverage the query cache for this query. This may be overridden in the broker or historical node configuration | +|populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. This may be overriden in the broker or historical node configuration | +|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | +|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | +|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. | In addition, some query types offer context parameters specific to that query type. diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 139cc6d0ad0e..57385f50c116 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -28,6 +28,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.ColumnSelectorPlus; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; @@ -65,6 +66,7 @@ public Sequence process( return Sequences.empty(); } } + final boolean hasTimeout = QueryContexts.hasTimeout(query); final Long timeoutAt = (long) responseContext.get(ScanQueryRunnerFactory.CTX_TIMEOUT_AT); final long start = System.currentTimeMillis(); final StorageAdapter adapter = segment.asStorageAdapter(); @@ -156,7 +158,7 @@ public boolean hasNext() @Override public ScanResultValue next() { - if (System.currentTimeMillis() >= timeoutAt) { + if (hasTimeout && System.currentTimeMillis() >= timeoutAt) { throw new QueryInterruptedException(new TimeoutException()); } long lastOffset = offset; @@ -173,10 +175,12 @@ public ScanResultValue next() ScanQueryRunnerFactory.CTX_COUNT, (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset) ); - responseContext.put( - ScanQueryRunnerFactory.CTX_TIMEOUT_AT, - timeoutAt - (System.currentTimeMillis() - start) - ); + if (hasTimeout) { + responseContext.put( + ScanQueryRunnerFactory.CTX_TIMEOUT_AT, + timeoutAt - (System.currentTimeMillis() - start) + ); + } return new ScanResultValue(segmentId, allColumns, events); } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index 712249eac5b7..fcfe11b43664 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -25,7 +25,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; @@ -36,6 +36,8 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory { + // This variable indicates when a running query should be expired, + // and is effective only when 'timeout' of queryContext has a positive value. public static final String CTX_TIMEOUT_AT = "timeoutAt"; public static final String CTX_COUNT = "count"; private final ScanQueryQueryToolChest toolChest; @@ -71,9 +73,9 @@ public Sequence run( final Query query, final Map responseContext ) { - final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); - final long timeoutAt = (queryTimeout == null || queryTimeout.longValue() == 0L) - ? JodaUtils.MAX_INSTANT : System.currentTimeMillis() + queryTimeout.longValue(); + // Note: this variable is effective only when queryContext has a timeout. + // See the comment of CTX_TIMEOUT_AT. + final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(query); responseContext.put(CTX_TIMEOUT_AT, timeoutAt); return Sequences.concat( Sequences.map( @@ -122,7 +124,7 @@ public Sequence run( final Number timeoutAt = (Number) responseContext.get(CTX_TIMEOUT_AT); if (timeoutAt == null || timeoutAt.longValue() == 0L) { responseContext.put(CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT); - }; + } return engine.process((ScanQuery) query, segment, responseContext); } } diff --git a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java index 2f6e84c8bc69..ce5f1fc64b1b 100644 --- a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java +++ b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java @@ -49,7 +49,7 @@ public AsyncQueryRunner(QueryRunner baseRunner, ExecutorService executor, Que @Override public Sequence run(final Query query, final Map responseContext) { - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query); final ListenableFuture> future = executor.submit(new AbstractPrioritizedCallable>(priority) { @Override @@ -68,11 +68,10 @@ public Sequence call() throws Exception public Sequence get() { try { - Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT); - if (timeout == null) { - return future.get(); + if (QueryContexts.hasTimeout(query)) { + return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS); } else { - return future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + return future.get(); } } catch (ExecutionException | InterruptedException | TimeoutException ex) { throw Throwables.propagate(ex); diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 22d0fb1a4ba5..edb1ca5bf32e 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; -import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.query.spec.QuerySegmentSpec; import org.joda.time.Duration; @@ -37,66 +36,6 @@ */ public abstract class BaseQuery> implements Query { - public static int getContextPriority(Query query, int defaultValue) - { - return parseInt(query, "priority", defaultValue); - } - - public static boolean getContextBySegment(Query query, boolean defaultValue) - { - return parseBoolean(query, "bySegment", defaultValue); - } - - public static boolean getContextPopulateCache(Query query, boolean defaultValue) - { - return parseBoolean(query, "populateCache", defaultValue); - } - - public static boolean getContextUseCache(Query query, boolean defaultValue) - { - return parseBoolean(query, "useCache", defaultValue); - } - - public static boolean getContextFinalize(Query query, boolean defaultValue) - { - return parseBoolean(query, "finalize", defaultValue); - } - - public static int getContextUncoveredIntervalsLimit(Query query, int defaultValue) - { - return parseInt(query, "uncoveredIntervalsLimit", defaultValue); - } - - private static int parseInt(Query query, String key, int defaultValue) - { - Object val = query.getContextValue(key); - if (val == null) { - return defaultValue; - } - if (val instanceof String) { - return Integer.parseInt((String) val); - } else if (val instanceof Integer) { - return (int) val; - } else { - throw new ISE("Unknown type [%s]", val.getClass()); - } - } - - private static boolean parseBoolean(Query query, String key, boolean defaultValue) - { - Object val = query.getContextValue(key); - if (val == null) { - return defaultValue; - } - if (val instanceof String) { - return Boolean.parseBoolean((String) val); - } else if (val instanceof Boolean) { - return (boolean) val; - } else { - throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass()); - } - } - public static void checkInterrupted() { if (Thread.interrupted()) { @@ -203,7 +142,7 @@ public ContextType getContextValue(String key, ContextType default @Override public boolean getContextBoolean(String key, boolean defaultValue) { - return parseBoolean(this, key, defaultValue); + return QueryContexts.parseBoolean(this, key, defaultValue); } protected Map computeOverridenContext(Map overrides) @@ -237,6 +176,12 @@ public Query withId(String id) return withOverriddenContext(ImmutableMap.of(QUERYID, id)); } + @Override + public Query withDefaultTimeout(long defaultTimeout) + { + return withOverriddenContext(ImmutableMap.of(QueryContexts.DEFAULT_TIMEOUT_KEY, defaultTimeout)); + } + @Override public boolean equals(Object o) { diff --git a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java index a2a8a960b4f8..e57b9471ce3b 100644 --- a/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentQueryRunner.java @@ -51,7 +51,7 @@ public BySegmentQueryRunner( @SuppressWarnings("unchecked") public Sequence run(final Query query, Map responseContext) { - if (BaseQuery.getContextBySegment(query, false)) { + if (QueryContexts.isBySegment(query)) { final Sequence baseSequence = base.run(query, responseContext); final List results = Sequences.toList(baseSequence, Lists.newArrayList()); return Sequences.simple( diff --git a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java index c49ced5aa46c..373c2e3b1192 100644 --- a/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/BySegmentSkippingQueryRunner.java @@ -40,7 +40,7 @@ public BySegmentSkippingQueryRunner( @Override public Sequence run(Query query, Map responseContext) { - if (BaseQuery.getContextBySegment(query, false)) { + if (QueryContexts.isBySegment(query)) { return baseRunner.run(query, responseContext); } diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 45363cd3a5c7..117d6436082b 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -91,7 +91,7 @@ public ChainedExecutionQueryRunner( @Override public Sequence run(final Query query, final Map responseContext) { - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query); final Ordering ordering = query.getResultOrdering(); return new BaseSequence>( @@ -152,12 +152,11 @@ public Iterable call() throws Exception queryWatcher.registerQuery(query, futures); try { - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, (Number) null); return new MergeIterable<>( ordering.nullsFirst(), - timeout == null ? - futures.get() : - futures.get(timeout.longValue(), TimeUnit.MILLISECONDS) + QueryContexts.hasTimeout(query) ? + futures.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) : + futures.get() ).iterator(); } catch (InterruptedException e) { diff --git a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java index 1035a4ed751c..7fe58ee06b90 100644 --- a/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java +++ b/processing/src/main/java/io/druid/query/FinalizeResultsQueryRunner.java @@ -49,8 +49,8 @@ public FinalizeResultsQueryRunner( @Override public Sequence run(final Query query, Map responseContext) { - final boolean isBySegment = BaseQuery.getContextBySegment(query, false); - final boolean shouldFinalize = BaseQuery.getContextFinalize(query, true); + final boolean isBySegment = QueryContexts.isBySegment(query); + final boolean shouldFinalize = QueryContexts.isFinalize(query, true); final Query queryToRun; final Function finalizerFn; diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index d416589f199a..77775f295c6f 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -90,8 +90,8 @@ public Sequence run(final Query queryParam, final Map resp true ); final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); - final boolean bySegment = BaseQuery.getContextBySegment(query, false); - final int priority = BaseQuery.getContextPriority(query, 0); + final boolean bySegment = QueryContexts.isBySegment(query); + final int priority = QueryContexts.getPriority(query); ListenableFuture> futures = Futures.allAsList( Lists.newArrayList( @@ -178,11 +178,10 @@ private void waitForFutureCompletion( { try { queryWatcher.registerQuery(query, future); - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, (Number) null); - if (timeout == null) { - future.get(); + if (QueryContexts.hasTimeout(query)) { + future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS); } else { - future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + future.get(); } } catch (InterruptedException e) { diff --git a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java index a42ce69fde0e..5e8b529b1535 100644 --- a/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/IntervalChunkingQueryRunner.java @@ -148,7 +148,7 @@ private Iterable splitInterval(Interval interval, Period period) private Period getChunkPeriod(Query query) { - String p = query.getContextValue(QueryContextKeys.CHUNK_PERIOD, "P0D"); + final String p = QueryContexts.getChunkPeriod(query); return Period.parse(p); } } diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index 9ad178161ead..cfbf6f2d3403 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -99,4 +99,6 @@ public interface Query String getId(); Query withDataSource(DataSource dataSource); + + Query withDefaultTimeout(long defaultTimeout); } diff --git a/processing/src/main/java/io/druid/query/QueryContextKeys.java b/processing/src/main/java/io/druid/query/QueryContextKeys.java deleted file mode 100644 index 480dcd551f4f..000000000000 --- a/processing/src/main/java/io/druid/query/QueryContextKeys.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.query; - -public class QueryContextKeys -{ - public static final String PRIORITY = "priority"; - public static final String TIMEOUT = "timeout"; - public static final String CHUNK_PERIOD = "chunkPeriod"; -} diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java new file mode 100644 index 000000000000..b59c6bc2a2b7 --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -0,0 +1,168 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.google.common.base.Preconditions; +import io.druid.java.util.common.ISE; + +public class QueryContexts +{ + public static final String PRIORITY_KEY = "priority"; + public static final String TIMEOUT_KEY = "timeout"; + public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout"; + public static final String CHUNK_PERIOD_KEY = "chunkPeriod"; + + public static final boolean DEFAULT_BY_SEGMENT = false; + public static final boolean DEFAULT_POPULATE_CACHE = true; + public static final boolean DEFAULT_USE_CACHE = true; + public static final int DEFAULT_PRIORITY = 0; + public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0; + public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes + public static final long NO_TIMEOUT = 0; + + public static boolean isBySegment(Query query) + { + return isBySegment(query, DEFAULT_BY_SEGMENT); + } + + public static boolean isBySegment(Query query, boolean defaultValue) + { + return parseBoolean(query, "bySegment", defaultValue); + } + + public static boolean isPopulateCache(Query query) + { + return isPopulateCache(query, DEFAULT_POPULATE_CACHE); + } + + public static boolean isPopulateCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "populateCache", defaultValue); + } + + public static boolean isUseCache(Query query) + { + return isUseCache(query, DEFAULT_USE_CACHE); + } + + public static boolean isUseCache(Query query, boolean defaultValue) + { + return parseBoolean(query, "useCache", defaultValue); + } + + public static boolean isFinalize(Query query, boolean defaultValue) + { + return parseBoolean(query, "finalize", defaultValue); + } + + public static int getUncoveredIntervalsLimit(Query query) + { + return getUncoveredIntervalsLimit(query, DEFAULT_UNCOVERED_INTERVALS_LIMIT); + } + + public static int getUncoveredIntervalsLimit(Query query, int defaultValue) + { + return parseInt(query, "uncoveredIntervalsLimit", defaultValue); + } + + public static int getPriority(Query query) + { + return getPriority(query, DEFAULT_PRIORITY); + } + + public static int getPriority(Query query, int defaultValue) + { + return parseInt(query, PRIORITY_KEY, defaultValue); + } + + public static String getChunkPeriod(Query query) + { + return query.getContextValue(CHUNK_PERIOD_KEY, "P0D"); + } + + public static boolean hasTimeout(Query query) + { + return getTimeout(query) != NO_TIMEOUT; + } + + public static long getTimeout(Query query) + { + return getTimeout(query, getDefaultTimeout(query)); + } + + public static long getTimeout(Query query, long defaultTimeout) + { + final long timeout = parseLong(query, TIMEOUT_KEY, defaultTimeout); + Preconditions.checkState(timeout >= 0, "Timeout must be a non negative value, but was [%d]", timeout); + return timeout; + } + + static long getDefaultTimeout(Query query) + { + final long defaultTimeout = parseLong(query, DEFAULT_TIMEOUT_KEY, DEFAULT_TIMEOUT_MILLIS); + Preconditions.checkState(defaultTimeout >= 0, "Timeout must be a non negative value, but was [%d]", defaultTimeout); + return defaultTimeout; + } + + static long parseLong(Query query, String key, long defaultValue) + { + Object val = query.getContextValue(key); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Long.parseLong((String) val); + } else if (val instanceof Number) { + return ((Number) val).longValue(); + } else { + throw new ISE("Unknown type [%s]", val.getClass()); + } + } + + static int parseInt(Query query, String key, int defaultValue) + { + Object val = query.getContextValue(key); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Integer.parseInt((String) val); + } else if (val instanceof Number) { + return ((Number) val).intValue(); + } else { + throw new ISE("Unknown type [%s]", val.getClass()); + } + } + + static boolean parseBoolean(Query query, String key, boolean defaultValue) + { + Object val = query.getContextValue(key); + if (val == null) { + return defaultValue; + } + if (val instanceof String) { + return Boolean.parseBoolean((String) val); + } else if (val instanceof Boolean) { + return (boolean) val; + } else { + throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass()); + } + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 85f4cf559adf..ddf9c4277c3f 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -38,11 +38,11 @@ import io.druid.java.util.common.guava.MappedSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; -import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.DataSource; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; @@ -115,7 +115,7 @@ public QueryRunner mergeResults(final QueryRunner runner) @Override public Sequence run(Query query, Map responseContext) { - if (BaseQuery.getContextBySegment(query, false)) { + if (QueryContexts.isBySegment(query)) { return runner.run(query, responseContext); } @@ -204,7 +204,7 @@ private Sequence mergeGroupByResults( ); final Sequence finalizingResults; - if (GroupByQuery.getContextFinalize(subquery, false)) { + if (QueryContexts.isFinalize(subquery, false)) { finalizingResults = new MappedSequence<>( subqueryResult, makePreComputeManipulatorFn( diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index fb0fe1327b99..de6fee13b21a 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -35,7 +35,6 @@ import io.druid.collections.BlockingPool; import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.Releaser; -import io.druid.common.utils.JodaUtils; import io.druid.data.input.Row; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; @@ -45,10 +44,9 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.logger.Logger; import io.druid.query.AbstractPrioritizedCallable; -import io.druid.query.BaseQuery; import io.druid.query.ChainedExecutionQueryRunner; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; @@ -125,7 +123,7 @@ public Sequence run(final Query queryParam, final Map ImmutableMap.of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true) ); - if (BaseQuery.getContextBySegment(query, false) || forceChainedExecution) { + if (QueryContexts.isBySegment(query) || forceChainedExecution) { return new ChainedExecutionQueryRunner(exec, queryWatcher, queryables).run(query, responseContext); } @@ -141,14 +139,13 @@ public Sequence run(final Query queryParam, final Map String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) ); - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query); // Figure out timeoutAt time now, so we can apply the timeout to both the mergeBufferPool.take and the actual // query processing together. - final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); - final long timeoutAt = queryTimeout == null - ? JodaUtils.MAX_INSTANT - : System.currentTimeMillis() + queryTimeout.longValue(); + final long queryTimeout = QueryContexts.getTimeout(query); + final boolean hasTimeout = QueryContexts.hasTimeout(query); + final long timeoutAt = System.currentTimeMillis() + queryTimeout; return new BaseSequence<>( new BaseSequence.IteratorMaker>() @@ -170,9 +167,13 @@ public CloseableGrouperIterator make() final ReferenceCountingResourceHolder mergeBufferHolder; try { // This will potentially block if there are no merge buffers left in the pool. - final long timeout = timeoutAt - System.currentTimeMillis(); - if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { - throw new TimeoutException(); + if (hasTimeout) { + final long timeout = timeoutAt - System.currentTimeMillis(); + if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { + throw new TimeoutException(); + } + } else { + mergeBufferHolder = mergeBufferPool.take(); } resources.add(mergeBufferHolder); } @@ -248,6 +249,7 @@ public AggregateResult call() throws Exception waitForFutureCompletion( query, Futures.allAsList(ImmutableList.of(future)), + hasTimeout, timeoutAt - System.currentTimeMillis() ); } @@ -260,7 +262,7 @@ public AggregateResult call() throws Exception ); if (!isSingleThreaded) { - waitForFutureCompletion(query, futures, timeoutAt - System.currentTimeMillis()); + waitForFutureCompletion(query, futures, hasTimeout, timeoutAt - System.currentTimeMillis()); } return RowBasedGrouperHelper.makeGrouperIterator( @@ -299,6 +301,7 @@ public void cleanup(CloseableGrouperIterator iterFromMake) private void waitForFutureCompletion( GroupByQuery query, ListenableFuture> future, + boolean hasTimeout, long timeout ) { @@ -307,11 +310,11 @@ private void waitForFutureCompletion( queryWatcher.registerQuery(query, future); } - if (timeout <= 0) { + if (hasTimeout && timeout <= 0) { throw new TimeoutException(); } - final List results = future.get(timeout, TimeUnit.MILLISECONDS); + final List results = hasTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get(); for (AggregateResult result : results) { if (!result.isOk()) { diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 45dbd954d1c7..e0e7273f572b 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -31,7 +31,6 @@ import io.druid.collections.BlockingPool; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidPool; -import io.druid.common.utils.JodaUtils; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.guice.annotations.Global; @@ -47,7 +46,7 @@ import io.druid.query.InsufficientResourcesException; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; @@ -141,10 +140,12 @@ public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMerg } else if (requiredMergeBufferNum == 0) { return new GroupByQueryResource(); } else { - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT); - final ResourceHolder> mergeBufferHolders = mergeBufferPool.takeBatch( - requiredMergeBufferNum, timeout.longValue() - ); + final ResourceHolder> mergeBufferHolders; + if (QueryContexts.hasTimeout(query)) { + mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum, QueryContexts.getTimeout(query)); + } else { + mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum); + } if (mergeBufferHolders == null) { throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); } else { diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 5620219f8b0a..475d98453df9 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -32,10 +32,9 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; import io.druid.query.AbstractPrioritizedCallable; -import io.druid.query.BaseQuery; import io.druid.query.ConcatQueryRunner; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; @@ -202,7 +201,7 @@ public Sequence run( final Map responseContext ) { - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query); ListenableFuture> future = queryExecutor.submit( new AbstractPrioritizedCallable>(priority) { @@ -210,15 +209,18 @@ public Sequence run( public Sequence call() throws Exception { return Sequences.simple( - Sequences.toList(input.run(query, responseContext), new ArrayList()) + Sequences.toList(input.run(query, responseContext), new ArrayList<>()) ); } } ); try { queryWatcher.registerQuery(query, future); - final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, (Number) null); - return timeout == null ? future.get() : future.get(timeout.longValue(), TimeUnit.MILLISECONDS); + if (QueryContexts.hasTimeout(query)) { + return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS); + } else { + return future.get(); + } } catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java index f5ab114f6873..d84a7396e3e6 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryQueryToolChest.java @@ -35,12 +35,12 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.nary.BinaryFn; -import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; @@ -391,7 +391,7 @@ public Sequence> run( return runner.run(query, responseContext); } - final boolean isBySegment = BaseQuery.getContextBySegment(query, false); + final boolean isBySegment = QueryContexts.isBySegment(query); return Sequences.map( runner.run(query.withLimit(config.getMaxSearchLimit()), responseContext), diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java index a7826dbdb9d9..7d9be82c6fb4 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryQueryToolChest.java @@ -32,11 +32,11 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.nary.BinaryFn; -import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValue; import io.druid.query.CacheStrategy; import io.druid.query.IntervalChunkingQueryRunnerDecorator; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.Result; @@ -535,7 +535,7 @@ public Sequence> run( return runner.run(query, responseContext); } - final boolean isBySegment = BaseQuery.getContextBySegment(query, false); + final boolean isBySegment = QueryContexts.isBySegment(query); return Sequences.map( runner.run(query.withThreshold(minTopNThreshold), responseContext), diff --git a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java index 1e3d931b5caf..b6c074e7eeb6 100644 --- a/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/AsyncQueryRunnerTest.java @@ -100,7 +100,7 @@ public Sequence run(Query query, Map responseContext) QueryRunnerTestHelper.NOOP_QUERYWATCHER); Sequence lazy = asyncRunner.run( - query.withOverriddenContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, 1)), + query.withOverriddenContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1)), Collections.EMPTY_MAP); try { diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index 67918aef4819..311aaaa798f9 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -251,7 +251,7 @@ public Void answer() throws Throwable .dataSource("test") .intervals("2014/2015") .aggregators(Lists.newArrayList(new CountAggregatorFactory("count"))) - .context(ImmutableMap.of(QueryContextKeys.TIMEOUT, 100, "queryId", "test")) + .context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 100, "queryId", "test")) .build(), context ); diff --git a/processing/src/test/java/io/druid/query/QueryContextsTest.java b/processing/src/test/java/io/druid/query/QueryContextsTest.java new file mode 100644 index 000000000000..c656f0771101 --- /dev/null +++ b/processing/src/test/java/io/druid/query/QueryContextsTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.query.filter.DimFilter; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class QueryContextsTest +{ + private static class TestQuery extends BaseQuery + { + + public TestQuery(DataSource dataSource, QuerySegmentSpec querySegmentSpec, boolean descending, Map context) + { + super(dataSource, querySegmentSpec, descending, context); + } + + @Override + public boolean hasFilters() + { + return false; + } + + @Override + public DimFilter getFilter() + { + return null; + } + + @Override + public String getType() + { + return null; + } + + @Override + public Query withQuerySegmentSpec(QuerySegmentSpec spec) + { + return null; + } + + @Override + public Query withDataSource(DataSource dataSource) + { + return null; + } + + @Override + public Query withOverriddenContext(Map contextOverride) + { + return new TestQuery( + getDataSource(), + getQuerySegmentSpec(), + isDescending(), + computeOverridenContext(contextOverride) + ); + } + } + + @Test + public void testDefaultQueryTimeout() + { + final Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))), + false, + new HashMap() + ); + Assert.assertEquals(300_000, QueryContexts.getDefaultTimeout(query)); + } + + @Test + public void testEmptyQueryTimeout() + { + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))), + false, + new HashMap() + ); + Assert.assertEquals(300_000, QueryContexts.getTimeout(query)); + + query = query.withDefaultTimeout(60_000); + Assert.assertEquals(60_000, QueryContexts.getTimeout(query)); + } + + @Test + public void testQueryTimeout() + { + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(new Interval("0/100"))), + false, + ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1000) + ); + Assert.assertEquals(1000, QueryContexts.getTimeout(query)); + + query = query.withDefaultTimeout(1_000_000); + Assert.assertEquals(1000, QueryContexts.getTimeout(query)); + } +} diff --git a/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java b/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java index e05caa4336d2..f66d577ec12d 100644 --- a/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java @@ -33,6 +33,7 @@ import io.druid.query.Druids; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; @@ -101,7 +102,7 @@ public void testContextSerde() throws Exception ), Query.class ); - Assert.assertEquals(1, serdeQuery.getContextValue("priority")); + Assert.assertEquals(1, serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY)); Assert.assertEquals(true, serdeQuery.getContextValue("useCache")); Assert.assertEquals("true", serdeQuery.getContextValue("populateCache")); Assert.assertEquals(true, serdeQuery.getContextValue("finalize")); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index 460c9d8c8519..53e81977e298 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -32,7 +32,7 @@ import io.druid.data.input.Row; import io.druid.java.util.common.granularity.Granularities; import io.druid.query.DruidProcessingConfig; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; @@ -43,7 +43,6 @@ import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; -import org.bouncycastle.util.Integers; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -230,7 +229,7 @@ public void testSimpleGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -258,7 +257,7 @@ public void testNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -297,7 +296,7 @@ public void testDoubleNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -349,7 +348,7 @@ public void testTripleNestedGroupBy() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java index a2f6f6b8d770..060012bf0eb5 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -33,7 +33,7 @@ import io.druid.java.util.common.granularity.Granularities; import io.druid.query.DruidProcessingConfig; import io.druid.query.InsufficientResourcesException; -import io.druid.query.QueryContextKeys; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; @@ -46,7 +46,6 @@ import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; -import org.bouncycastle.util.Integers; import org.hamcrest.CoreMatchers; import org.junit.Rule; import org.junit.Test; @@ -201,7 +200,7 @@ public void testNotEnoughMergeBuffersOnQueryable() throws IOException .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -239,7 +238,7 @@ public void testResourceLimitExceededOnBroker() .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); @@ -265,7 +264,7 @@ public void testInsufficientResourcesOnBroker() throws IOException .setGranularity(Granularities.ALL) .setInterval(QueryRunnerTestHelper.firstToThird) .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) - .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); try { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 9d88c8378c63..dfcc6d2b0982 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -52,6 +52,7 @@ import io.druid.query.Druids; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; @@ -1213,7 +1214,7 @@ public void testGroupByTimeoutContextOverride() ) ) .setGranularity(QueryRunnerTestHelper.dayGran) - .setContext(ImmutableMap.of("timeout", Integer.valueOf(60000))) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 60000)) .build(); List expectedResults = Arrays.asList( @@ -5421,7 +5422,7 @@ public void testSubqueryWithContextTimeout() .setDimensions(Lists.newArrayList()) .setAggregatorSpecs(ImmutableList.of(new CountAggregatorFactory("count"))) .setGranularity(QueryRunnerTestHelper.allGran) - .setContext(ImmutableMap.of("timeout", 10000)) + .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 10000)) .build(); List expectedResults = Arrays.asList( diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java index 1c332fb23dc6..cad7fe53119a 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryTest.java @@ -24,6 +24,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Druids; import io.druid.query.Query; +import io.druid.query.QueryContexts; import org.junit.Assert; import org.junit.Test; @@ -78,7 +79,7 @@ public void testContextSerde() throws Exception ); - Assert.assertEquals(new Integer(1), serdeQuery.getContextValue("priority")); + Assert.assertEquals(new Integer(1), serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY)); Assert.assertEquals(true, serdeQuery.getContextValue("useCache")); Assert.assertEquals(true, serdeQuery.getContextValue("populateCache")); Assert.assertEquals(true, serdeQuery.getContextValue("finalize")); @@ -116,7 +117,7 @@ public void testContextSerde2() throws Exception ); - Assert.assertEquals("1", serdeQuery.getContextValue("priority")); + Assert.assertEquals("1", serdeQuery.getContextValue(QueryContexts.PRIORITY_KEY)); Assert.assertEquals("true", serdeQuery.getContextValue("useCache")); Assert.assertEquals("true", serdeQuery.getContextValue("populateCache")); Assert.assertEquals("true", serdeQuery.getContextValue("finalize")); diff --git a/server/src/main/java/io/druid/client/CacheUtil.java b/server/src/main/java/io/druid/client/CacheUtil.java index 6210f2c5ee37..a5bce97feadd 100644 --- a/server/src/main/java/io/druid/client/CacheUtil.java +++ b/server/src/main/java/io/druid/client/CacheUtil.java @@ -24,9 +24,9 @@ import com.google.common.collect.Lists; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; -import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.SegmentDescriptor; import org.joda.time.Interval; @@ -123,7 +123,7 @@ private static boolean useCache( CacheConfig cacheConfig ) { - return BaseQuery.getContextUseCache(query, true) + return QueryContexts.isUseCache(query) && strategy != null && cacheConfig.isUseCache() && cacheConfig.isQueryCacheable(query); @@ -135,7 +135,7 @@ private static boolean populateCache( CacheConfig cacheConfig ) { - return BaseQuery.getContextPopulateCache(query, true) + return QueryContexts.isPopulateCache(query) && strategy != null && cacheConfig.isPopulateCache() && cacheConfig.isQueryCacheable(query); diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index b15e912fd3f3..c86123b0ec85 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -55,10 +55,10 @@ import io.druid.java.util.common.guava.MergeSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; -import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValueClass; import io.druid.query.CacheStrategy; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -151,12 +151,12 @@ public Sequence run(final Query query, final Map responseC final boolean useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig); final boolean populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig); - final boolean isBySegment = BaseQuery.getContextBySegment(query, false); + final boolean isBySegment = QueryContexts.isBySegment(query); final ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder<>(); - final int priority = BaseQuery.getContextPriority(query, 0); - contextBuilder.put("priority", priority); + final int priority = QueryContexts.getPriority(query); + contextBuilder.put(QueryContexts.PRIORITY_KEY, priority); if (populateCache) { // prevent down-stream nodes from caching results as well if we are populating the cache @@ -177,7 +177,7 @@ public Sequence run(final Query query, final Map responseC // Note that enabling this leads to putting uncovered intervals information in the response headers // and might blow up in some cases https://github.com/druid-io/druid/issues/2108 - int uncoveredIntervalsLimit = BaseQuery.getContextUncoveredIntervalsLimit(query, 0); + int uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query); if (uncoveredIntervalsLimit > 0) { List uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit); diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index cd6401462a0c..d8dc7882a74b 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -50,9 +50,9 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; -import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValueClass; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; @@ -134,7 +134,7 @@ public int getNumOpenConnections() public Sequence run(final Query query, final Map context) { QueryToolChest> toolChest = warehouse.getToolChest(query); - boolean isBySegment = BaseQuery.getContextBySegment(query, false); + boolean isBySegment = QueryContexts.isBySegment(query); Pair types = typesMap.get(query.getClass()); if (types == null) { diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 609c7cd53bbb..554cc34de83c 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -40,7 +40,6 @@ import io.druid.query.DruidMetrics; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryMetrics; import io.druid.query.QuerySegmentWalker; @@ -177,7 +176,7 @@ public Response doPost( ) throws IOException { final long startNs = System.nanoTime(); - Query query = null; + Query query = null; QueryToolChest toolChest = null; String queryId = null; @@ -191,14 +190,8 @@ public Response doPost( queryId = UUID.randomUUID().toString(); query = query.withId(queryId); } - if (query.getContextValue(QueryContextKeys.TIMEOUT) == null) { - query = query.withOverriddenContext( - ImmutableMap.of( - QueryContextKeys.TIMEOUT, - config.getMaxIdleTime().toStandardDuration().getMillis() - ) - ); - } + query = query.withDefaultTimeout(config.getDefaultQueryTimeout()); + toolChest = warehouse.getToolChest(query); Thread.currentThread() diff --git a/server/src/main/java/io/druid/server/initialization/ServerConfig.java b/server/src/main/java/io/druid/server/initialization/ServerConfig.java index cad5ace2c96c..560975abe186 100644 --- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java @@ -37,6 +37,10 @@ public class ServerConfig @NotNull private Period maxIdleTime = new Period("PT5m"); + @JsonProperty + @Min(0) + private long defaultQueryTimeout = 300_000; // 5 minutes + public int getNumThreads() { return numThreads; @@ -47,12 +51,18 @@ public Period getMaxIdleTime() return maxIdleTime; } + public long getDefaultQueryTimeout() + { + return defaultQueryTimeout; + } + @Override public String toString() { return "ServerConfig{" + - "numThreads=" + numThreads + - ", maxIdleTime=" + maxIdleTime + - '}'; + "numThreads=" + numThreads + + ", maxIdleTime=" + maxIdleTime + + ", defaultQueryTimeout=" + defaultQueryTimeout + + '}'; } } diff --git a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java index 66164f91c706..c4af26e0b10d 100644 --- a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java +++ b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java @@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.collect.Iterables; -import io.druid.query.BaseQuery; import io.druid.query.Query; +import io.druid.query.QueryContexts; /** */ @@ -46,7 +46,7 @@ public PriorityTieredBrokerSelectorStrategy( @Override public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) { - final int priority = BaseQuery.getContextPriority(query, 0); + final int priority = QueryContexts.getPriority(query); if (priority < minPriority) { return Optional.of(