diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index b89ae495b7de..906c8f140a71 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -104,7 +104,10 @@ import org.apache.druid.query.topn.TopNResultValue; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.SegmentId; @@ -338,7 +341,8 @@ public > QueryToolChest getToolChest new CacheConfig(), new DruidHttpClientConfig(), processingConfig, - forkJoinPool + forkJoinPool, + new QueryScheduler(0, NoQueryLaningStrategy.INSTANCE, new ServerConfig()) ); } diff --git a/distribution/bin/check-licenses.py b/distribution/bin/check-licenses.py index 4b47947239a5..9f71fa8ba588 100755 --- a/distribution/bin/check-licenses.py +++ b/distribution/bin/check-licenses.py @@ -214,6 +214,7 @@ def build_compatible_license_names(): compatible_licenses['Apache License, Version 2.0'] = 'Apache License version 2.0' compatible_licenses['The Apache Software License, Version 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache 2.0'] = 'Apache License version 2.0' + compatible_licenses['Apache-2.0'] = 'Apache License version 2.0' compatible_licenses['Apache 2'] = 'Apache License version 2.0' compatible_licenses['Apache License 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache Software License - Version 2.0'] = 'Apache License version 2.0' diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 9b171ad4dfab..7f95e4b858e5 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1481,9 +1481,35 @@ These Broker configurations can be defined in the `broker/runtime.properties` fi |`druid.broker.select.tier`|`highestPriority`, `lowestPriority`, `custom`|If segments are cross-replicated across tiers in a cluster, you can tell the broker to prefer to select segments in a tier with a certain priority.|`highestPriority`| |`druid.broker.select.tier.custom.priorities`|`An array of integer priorities.`|Select servers in tiers with a custom priority list.|None| +##### Query laning + +*Laning strategies* allow you to control capacity utilization for heterogeneous query workloads. With laning, the broker examines and classifies a query for the purpose of assigning it to a 'lane'. Lanes have capacity limits, enforced by the broker, that can be used to ensure sufficient resources are available for other lanes or for interactive queries (with no lane), or to limit overall throughput for queries within the lane. Requests in excess of the capacity are discarded with an HTTP 429 status code. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.query.scheduler.numThreads`|Maximum number of HTTP threads to dedicate to query processing. To save HTTP thread capacity, this should be lower than `druid.server.http.numThreads`.|Unbounded| +|`druid.query.scheduler.laning.strategy`|Query laning strategy to use to assign queries to a lane in order to control capacities for certain classes of queries.|`none`| + +##### Laning strategies + +###### No laning strategy + +In this mode, queries are never assigned a lane, and the concurrent query count will only be limited by `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, if set. This is the default Druid query scheduler operating mode. Enable this strategy explicitly by setting `druid.query.scheduler.laning.strategy` to `none`. + +###### 'High/Low' laning strategy +This laning strategy splits queries with a `priority` below zero into a `low` query lane, automatically. Queries with priority of zero (the default) or above are considered 'interactive'. The limit on `low` queries can be set to some desired percentage of the total capacity (or HTTP thread pool size), reserving capacity for interactive queries. Queries in the `low` lane are _not_ guaranteed their capacity, which may be consumed by interactive queries, but may use up to this limit if total capacity is available. + +If the `low` lane is specified in the [query context](../querying/query-context.md) `lane` parameter, this will override the computed lane. + +This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=hilo`. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of`druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be in the range 1 to 100, and will be rounded up|No default, must be set if using this mode| + ##### Server Configuration -Druid uses Jetty to serve HTTP requests. +Druid uses Jetty to serve HTTP requests. Each query being processed consumes a single thread from `druid.server.http.numThreads`, so consider defining `druid.query.scheduler.numThreads` to a lower value in order to reserve HTTP threads for responding to health checks, lookup loading, and other non-query, and in most cases comparatively very short lived, HTTP requests. |Property|Description|Default| |--------|-----------|-------| diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index 28cfb3bd5228..020527309530 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -29,6 +29,7 @@ The query context is used for various query configuration parameters. The follow |-----------------|----------------------------------------|----------------------| |timeout | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout`. To set the default timeout, see [Broker configuration](../configuration/index.html#broker) | |priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| +|lane | `null` | Query lane, used to control usage limits on classes of queries. See [Broker configuration](../configuration/index.html#broker) for more details.| |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | |useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Apache Druid uses `druid.broker.cache.useCache` or `druid.historical.cache.useCache` to determine whether or not to read from the query cache | |populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateCache` or `druid.historical.cache.populateCache` to determine whether or not to save the results of this query to the query cache | diff --git a/docs/querying/querying.md b/docs/querying/querying.md index f102173263ab..20f9feddbdcb 100644 --- a/docs/querying/querying.md +++ b/docs/querying/querying.md @@ -109,6 +109,8 @@ If a query fails, you will get an HTTP 500 response containing a JSON object wit } ``` +If a query request fails due to being limited by the [query scheduler laning configuration](../configuration/index.md#broker), an HTTP 429 response with the same JSON object schema as an error response, but with `errorMessage` of the form: "Total query capacity exceeded" or "Query capacity exceeded for lane 'low'". + The fields in the response are: |field|description| diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 9090bfe168d8..033ffb47f820 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -65,7 +65,9 @@ import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.apache.druid.server.ClientQuerySegmentWalker; +import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.timeline.TimelineLookup; import org.hamcrest.core.IsInstanceOf; @@ -361,7 +363,8 @@ public String getFormatString() return null; } }, - ForkJoinPool.commonPool() + ForkJoinPool.commonPool(), + new QueryScheduler(0, NoQueryLaningStrategy.INSTANCE, new ServerConfig()) ); ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 6437488b1f16..8cc21d02e19a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -909,7 +909,7 @@ private TaskToolbox makeToolbox( new QueryWatcher() { @Override - public void registerQuery(Query query, ListenableFuture future) + public void registerQueryFuture(Query query, ListenableFuture future) { // do nothing } diff --git a/licenses.yaml b/licenses.yaml index c352faa57dd7..62802831e45b 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -1869,6 +1869,17 @@ libraries: --- +name: Resilience4j +license_category: binary +module: java-core +license_name: Apache License version 2.0 +version: 1.3.1 +libraries: + - io.github.resilience4j: resilience4j-core + - io.github.resilience4j: resilience4j-bulkhead + +--- + name: RoaringBitmap license_category: binary module: java-core @@ -1880,6 +1891,17 @@ libraries: --- +name: vavr +license_category: binary +module: java-core +license_name: Apache License version 2.0 +version: 0.10.2 +libraries: + - io.vavr: vavr + - io.vavr: vavr-match + +--- + name: Config Magic license_category: binary module: java-core diff --git a/pom.xml b/pom.xml index f229d32dd74f..e61a22d3dfc3 100644 --- a/pom.xml +++ b/pom.xml @@ -94,6 +94,7 @@ 1.9.13 2.8.2 3.10.6.Final + 1.3.1 4.1.45.Final v10.14.2 @@ -1181,6 +1182,11 @@ + + io.github.resilience4j + resilience4j-bulkhead + ${resilience4j.version} + org.testng diff --git a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java index 01cd0d9f7a6d..cf149138ff28 100644 --- a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java @@ -144,7 +144,7 @@ public Iterable call() ) ); - queryWatcher.registerQuery(query, futures); + queryWatcher.registerQueryFuture(query, futures); try { return new MergeIterable<>( diff --git a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java index 90e9f6f3ff58..1653fb170637 100644 --- a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java @@ -178,7 +178,7 @@ private void waitForFutureCompletion( ) { try { - queryWatcher.registerQuery(query, future); + queryWatcher.registerQueryFuture(query, future); if (QueryContexts.hasTimeout(query)) { future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS); } else { diff --git a/processing/src/main/java/org/apache/druid/query/Query.java b/processing/src/main/java/org/apache/druid/query/Query.java index 37868b4a25d4..e538c09ec14b 100644 --- a/processing/src/main/java/org/apache/druid/query/Query.java +++ b/processing/src/main/java/org/apache/druid/query/Query.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Ordering; import org.apache.druid.guice.annotations.ExtensionPoint; import org.apache.druid.java.util.common.granularity.Granularity; @@ -146,4 +147,10 @@ default Query optimizeForSegment(PerSegmentQueryOptimizationContext optimizat { return this; } + + default Query withLane(String lane) + { + return withOverriddenContext(ImmutableMap.of(QueryContexts.LANE_KEY, lane)); + } + } diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 2af3b95156b0..ba8e12b09eae 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -35,6 +35,7 @@ public class QueryContexts { public static final String PRIORITY_KEY = "priority"; + public static final String LANE_KEY = "lane"; public static final String TIMEOUT_KEY = "timeout"; public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes"; public static final String MAX_QUEUED_BYTES_KEY = "maxQueuedBytes"; @@ -202,6 +203,11 @@ public static int getPriority(Query query, int defaultValue) return parseInt(query, PRIORITY_KEY, defaultValue); } + public static String getLane(Query query) + { + return (String) query.getContextValue(LANE_KEY); + } + public static boolean getEnableParallelMerges(Query query) { return parseBoolean(query, BROKER_PARALLEL_MERGE_KEY, DEFAULT_ENABLE_PARALLEL_MERGE); diff --git a/processing/src/main/java/org/apache/druid/query/QueryException.java b/processing/src/main/java/org/apache/druid/query/QueryException.java new file mode 100644 index 000000000000..8a835573a2c4 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/QueryException.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +/** + * Base serializable error response + * + * QueryResource and SqlResource are expected to emit the JSON form of this object when errors happen. + */ +public class QueryException extends RuntimeException +{ + private final String errorCode; + private final String errorClass; + private final String host; + + public QueryException(Throwable cause, String errorCode, String errorClass, String host) + { + super(cause == null ? null : cause.getMessage(), cause); + this.errorCode = errorCode; + this.errorClass = errorClass; + this.host = host; + } + + @JsonCreator + public QueryException( + @JsonProperty("error") @Nullable String errorCode, + @JsonProperty("errorMessage") String errorMessage, + @JsonProperty("errorClass") @Nullable String errorClass, + @JsonProperty("host") @Nullable String host + ) + { + super(errorMessage); + this.errorCode = errorCode; + this.errorClass = errorClass; + this.host = host; + } + + @Nullable + @JsonProperty("error") + public String getErrorCode() + { + return errorCode; + } + + @JsonProperty("errorMessage") + @Override + public String getMessage() + { + return super.getMessage(); + } + + @JsonProperty + public String getErrorClass() + { + return errorClass; + } + + @JsonProperty + public String getHost() + { + return host; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java index bc2ffeb4c325..acb86a64a956 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java +++ b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java @@ -42,7 +42,7 @@ * The QueryResource is expected to emit the JSON form of this object when errors happen, and the DirectDruidClient * deserializes and wraps them. */ -public class QueryInterruptedException extends RuntimeException +public class QueryInterruptedException extends QueryException { public static final String QUERY_INTERRUPTED = "Query interrupted"; public static final String QUERY_TIMEOUT = "Query timeout"; @@ -52,10 +52,6 @@ public class QueryInterruptedException extends RuntimeException public static final String UNSUPPORTED_OPERATION = "Unsupported operation"; public static final String UNKNOWN_EXCEPTION = "Unknown exception"; - private final String errorCode; - private final String errorClass; - private final String host; - @JsonCreator public QueryInterruptedException( @JsonProperty("error") @Nullable String errorCode, @@ -64,10 +60,7 @@ public QueryInterruptedException( @JsonProperty("host") @Nullable String host ) { - super(errorMessage); - this.errorCode = errorCode; - this.errorClass = errorClass; - this.host = host; + super(errorCode, errorMessage, errorClass, host); } /** @@ -83,36 +76,7 @@ public QueryInterruptedException(Throwable cause) public QueryInterruptedException(Throwable cause, String host) { - super(cause == null ? null : cause.getMessage(), cause); - this.errorCode = getErrorCodeFromThrowable(cause); - this.errorClass = getErrorClassFromThrowable(cause); - this.host = host; - } - - @Nullable - @JsonProperty("error") - public String getErrorCode() - { - return errorCode; - } - - @JsonProperty("errorMessage") - @Override - public String getMessage() - { - return super.getMessage(); - } - - @JsonProperty - public String getErrorClass() - { - return errorClass; - } - - @JsonProperty - public String getHost() - { - return host; + super(cause, getErrorCodeFromThrowable(cause), getErrorClassFromThrowable(cause), host); } @Override @@ -121,9 +85,9 @@ public String toString() return StringUtils.format( "QueryInterruptedException{msg=%s, code=%s, class=%s, host=%s}", getMessage(), - errorCode, - errorClass, - host + getErrorCode(), + getErrorClass(), + getHost() ); } diff --git a/processing/src/main/java/org/apache/druid/query/QueryWatcher.java b/processing/src/main/java/org/apache/druid/query/QueryWatcher.java index 0457bea25c45..b10834b54241 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryWatcher.java +++ b/processing/src/main/java/org/apache/druid/query/QueryWatcher.java @@ -43,5 +43,5 @@ public interface QueryWatcher * @param query a query, which may be a subset of a larger query, as long as the underlying queryId is unchanged * @param future the future holding the execution status of the query */ - void registerQuery(Query query, ListenableFuture future); + void registerQueryFuture(Query query, ListenableFuture future); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 398410ba8ca3..2de214c4c4eb 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -346,7 +346,7 @@ private void waitForFutureCompletion( { try { if (queryWatcher != null) { - queryWatcher.registerQuery(query, future); + queryWatcher.registerQueryFuture(query, future); } if (hasTimeout && timeout <= 0) { diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index ba5c5824637a..5e2ba0e54a1c 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -219,7 +219,7 @@ public Sequence call() } ); try { - queryWatcher.registerQuery(query, future); + queryWatcher.registerQueryFuture(query, future); if (QueryContexts.hasTimeout(query)) { return future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS); } else { diff --git a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java index 275fb637f722..b2385044b560 100644 --- a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java @@ -84,7 +84,7 @@ public int getNumThreads() Capture capturedFuture = EasyMock.newCapture(); QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class); - watcher.registerQuery( + watcher.registerQueryFuture( EasyMock.anyObject(), EasyMock.and(EasyMock.anyObject(), EasyMock.capture(capturedFuture)) ); @@ -207,7 +207,7 @@ public int getNumThreads() Capture capturedFuture = Capture.newInstance(); QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class); - watcher.registerQuery( + watcher.registerQueryFuture( EasyMock.anyObject(), EasyMock.and(EasyMock.anyObject(), EasyMock.capture(capturedFuture)) ); diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java index 25877f10dfdd..53d2b97696d4 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java @@ -676,7 +676,7 @@ public static > QueryRunner makeQueryRunner( public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher() { @Override - public void registerQuery(Query query, ListenableFuture future) + public void registerQueryFuture(Query query, ListenableFuture future) { } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java index 35fca34e1c3c..180f1af01e81 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java @@ -411,7 +411,7 @@ public static > QueryRunner makeQueryRunner( public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher() { @Override - public void registerQuery(Query query, ListenableFuture future) + public void registerQueryFuture(Query query, ListenableFuture future) { } diff --git a/server/pom.xml b/server/pom.xml index cf903f7e4d65..7bfa26651c4b 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -311,6 +311,10 @@ org.slf4j slf4j-api + + io.github.resilience4j + resilience4j-bulkhead + diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 5254591bb0e1..392483144662 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -69,6 +69,7 @@ import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.server.QueryResource; +import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -113,6 +114,7 @@ public class CachingClusteredClient implements QuerySegmentWalker private final DruidHttpClientConfig httpClientConfig; private final DruidProcessingConfig processingConfig; private final ForkJoinPool pool; + private final QueryScheduler scheduler; @Inject public CachingClusteredClient( @@ -124,7 +126,8 @@ public CachingClusteredClient( CacheConfig cacheConfig, @Client DruidHttpClientConfig httpClientConfig, DruidProcessingConfig processingConfig, - @Merging ForkJoinPool pool + @Merging ForkJoinPool pool, + QueryScheduler scheduler ) { this.warehouse = warehouse; @@ -136,6 +139,7 @@ public CachingClusteredClient( this.httpClientConfig = httpClientConfig; this.processingConfig = processingConfig; this.pool = pool; + this.scheduler = scheduler; if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) { log.warn( @@ -222,9 +226,9 @@ public Sequence run(final QueryPlus queryPlus, final ResponseContext respo */ private class SpecificQueryRunnable { - private final QueryPlus queryPlus; private final ResponseContext responseContext; - private final Query query; + private QueryPlus queryPlus; + private Query query; private final QueryToolChest> toolChest; @Nullable private final CacheStrategy> strategy; @@ -232,7 +236,6 @@ private class SpecificQueryRunnable private final boolean populateCache; private final boolean isBySegment; private final int uncoveredIntervalsLimit; - private final Query downstreamQuery; private final Map cachePopulatorKeyMap = new HashMap<>(); private final DataSourceAnalysis dataSourceAnalysis; private final List intervals; @@ -251,7 +254,6 @@ private class SpecificQueryRunnable // Note that enabling this leads to putting uncovered intervals information in the response headers // and might blow up in some cases https://github.com/apache/druid/issues/2108 this.uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(query); - this.downstreamQuery = query.withOverriddenContext(makeDownstreamQueryContext()); this.dataSourceAnalysis = DataSourceAnalysis.forDataSource(query.getDataSource()); // For nested queries, we need to look at the intervals of the inner most query. this.intervals = dataSourceAnalysis.getBaseQuerySegmentSpec() @@ -265,6 +267,10 @@ private ImmutableMap makeDownstreamQueryContext() final int priority = QueryContexts.getPriority(query); contextBuilder.put(QueryContexts.PRIORITY_KEY, priority); + final String lane = QueryContexts.getLane(query); + if (lane != null) { + contextBuilder.put(QueryContexts.LANE_KEY, lane); + } if (populateCache) { // prevent down-stream nodes from caching results as well if we are populating the cache @@ -288,27 +294,34 @@ Sequence run(final UnaryOperator> time computeUncoveredIntervals(timeline); } - final Set segments = computeSegmentsToQuery(timeline); + final Set segmentServers = computeSegmentsToQuery(timeline); @Nullable final byte[] queryCacheKey = computeQueryCacheKey(); if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) { @Nullable final String prevEtag = (String) query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH); @Nullable - final String currentEtag = computeCurrentEtag(segments, queryCacheKey); + final String currentEtag = computeCurrentEtag(segmentServers, queryCacheKey); if (currentEtag != null && currentEtag.equals(prevEtag)) { return Sequences.empty(); } } - final List> alreadyCachedResults = pruneSegmentsWithCachedResults(queryCacheKey, segments); - final SortedMap> segmentsByServer = groupSegmentsByServer(segments); - return new LazySequence<>(() -> { + final List> alreadyCachedResults = + pruneSegmentsWithCachedResults(queryCacheKey, segmentServers); + + query = scheduler.laneQuery(queryPlus, segmentServers); + queryPlus = queryPlus.withQuery(query); + + final SortedMap> segmentsByServer = groupSegmentsByServer(segmentServers); + LazySequence mergedResultSequence = new LazySequence<>(() -> { List> sequencesByInterval = new ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size()); addSequencesFromCache(sequencesByInterval, alreadyCachedResults); addSequencesFromServer(sequencesByInterval, segmentsByServer); return merge(sequencesByInterval); }); + + return scheduler.run(query, mergedResultSequence); } private Sequence merge(List> sequencesByInterval) @@ -347,14 +360,14 @@ private Sequence merge(List> sequencesByInterval) } } - private Set computeSegmentsToQuery(TimelineLookup timeline) + private Set computeSegmentsToQuery(TimelineLookup timeline) { final List> serversLookup = toolChest.filterSegments( query, intervals.stream().flatMap(i -> timeline.lookup(i).stream()).collect(Collectors.toList()) ); - final Set segments = new LinkedHashSet<>(); + final Set segments = new LinkedHashSet<>(); final Map>> dimensionRangeCache = new HashMap<>(); // Filter unneeded chunks based on partition dimension for (TimelineObjectHolder holder : serversLookup) { @@ -371,7 +384,7 @@ private Set computeSegmentsToQuery(TimelineLookup segments, @Nullable byte[] queryCacheKey) + private String computeCurrentEtag(final Set segments, @Nullable byte[] queryCacheKey) { Hasher hasher = Hashing.sha1().newHasher(); boolean hasOnlyHistoricalSegments = true; - for (ServerToSegment p : segments) { + for (SegmentServerSelector p : segments) { if (!p.getServer().pick().getServer().segmentReplicatable()) { hasOnlyHistoricalSegments = false; break; @@ -460,14 +473,14 @@ private String computeCurrentEtag(final Set segments, @Nullable private List> pruneSegmentsWithCachedResults( final byte[] queryCacheKey, - final Set segments + final Set segments ) { if (queryCacheKey == null) { return Collections.emptyList(); } final List> alreadyCachedResults = new ArrayList<>(); - Map perSegmentCacheKeys = computePerSegmentCacheKeys(segments, queryCacheKey); + Map perSegmentCacheKeys = computePerSegmentCacheKeys(segments, queryCacheKey); // Pull cached segments from cache and remove from set of segments to query final Map cachedValues = computeCachedValues(perSegmentCacheKeys); @@ -488,25 +501,25 @@ private List> pruneSegmentsWithCachedResults( return alreadyCachedResults; } - private Map computePerSegmentCacheKeys( - Set segments, + private Map computePerSegmentCacheKeys( + Set segments, byte[] queryCacheKey ) { // cacheKeys map must preserve segment ordering, in order for shards to always be combined in the same order - Map cacheKeys = Maps.newLinkedHashMap(); - for (ServerToSegment serverToSegment : segments) { + Map cacheKeys = Maps.newLinkedHashMap(); + for (SegmentServerSelector segmentServer : segments) { final Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey( - serverToSegment.getServer().getSegment().getId().toString(), - serverToSegment.getSegmentDescriptor(), + segmentServer.getServer().getSegment().getId().toString(), + segmentServer.getSegmentDescriptor(), queryCacheKey ); - cacheKeys.put(serverToSegment, segmentCacheKey); + cacheKeys.put(segmentServer, segmentCacheKey); } return cacheKeys; } - private Map computeCachedValues(Map cacheKeys) + private Map computeCachedValues(Map cacheKeys) { if (useCache) { return cache.getBulk(Iterables.limit(cacheKeys.values(), cacheConfig.getCacheBulkMergeLimit())); @@ -530,21 +543,21 @@ private Cache.NamedKey getCachePopulatorKey(String segmentId, Interval segmentIn return cachePopulatorKeyMap.get(StringUtils.format("%s_%s", segmentId, segmentInterval)); } - private SortedMap> groupSegmentsByServer(Set segments) + private SortedMap> groupSegmentsByServer(Set segments) { final SortedMap> serverSegments = new TreeMap<>(); - for (ServerToSegment serverToSegment : segments) { - final QueryableDruidServer queryableDruidServer = serverToSegment.getServer().pick(); + for (SegmentServerSelector segmentServer : segments) { + final QueryableDruidServer queryableDruidServer = segmentServer.getServer().pick(); if (queryableDruidServer == null) { log.makeAlert( "No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!", - serverToSegment.getSegmentDescriptor(), + segmentServer.getSegmentDescriptor(), query.getDataSource() ).emit(); } else { final DruidServer server = queryableDruidServer.getServer(); - serverSegments.computeIfAbsent(server, s -> new ArrayList<>()).add(serverToSegment.getSegmentDescriptor()); + serverSegments.computeIfAbsent(server, s -> new ArrayList<>()).add(segmentServer.getSegmentDescriptor()); } } return serverSegments; @@ -668,11 +681,12 @@ private Sequence getAndCacheServerResults( ) { @SuppressWarnings("unchecked") + final Query downstreamQuery = query.withOverriddenContext(makeDownstreamQueryContext()); final Sequence>> resultsBySegments = serverRunner.run( queryPlus .withQuery( Queries.withSpecificSegments( - (Query>>) downstreamQuery, + downstreamQuery, segmentsOfServer ) ) @@ -697,22 +711,4 @@ private Sequence getAndCacheServerResults( .flatMerge(seq -> seq, query.getResultOrdering()); } } - - private static class ServerToSegment extends Pair - { - private ServerToSegment(ServerSelector server, SegmentDescriptor segment) - { - super(server, segment); - } - - ServerSelector getServer() - { - return lhs; - } - - SegmentDescriptor getSegmentDescriptor() - { - return rhs; - } - } } diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 06976c1a5b41..020446d69f84 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -451,7 +451,7 @@ private void checkTotalBytesLimit(long bytes) Duration.millis(timeLeft) ); - queryWatcher.registerQuery(query, future); + queryWatcher.registerQueryFuture(query, future); openConnections.getAndIncrement(); Futures.addCallback( diff --git a/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java b/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java new file mode 100644 index 000000000000..007b7a254591 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.SegmentDescriptor; + +/** + * Given a {@link SegmentDescriptor}, get a {@link ServerSelector} to use to pick a {@link DruidServer} to query. + * + * Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data + */ +public class SegmentServerSelector extends Pair +{ + public SegmentServerSelector(ServerSelector server, SegmentDescriptor segment) + { + super(server, segment); + } + + public ServerSelector getServer() + { + return lhs; + } + + public SegmentDescriptor getSegmentDescriptor() + { + return rhs; + } +} diff --git a/server/src/main/java/org/apache/druid/guice/QueryRunnerFactoryModule.java b/server/src/main/java/org/apache/druid/guice/QueryRunnerFactoryModule.java index 6071973cc37a..102c72d890cc 100644 --- a/server/src/main/java/org/apache/druid/guice/QueryRunnerFactoryModule.java +++ b/server/src/main/java/org/apache/druid/guice/QueryRunnerFactoryModule.java @@ -21,7 +21,10 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; +import com.google.inject.Key; +import com.google.inject.Provides; import com.google.inject.multibindings.MapBinder; +import org.apache.druid.guice.annotations.Global; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryWatcher; @@ -42,7 +45,8 @@ import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; import org.apache.druid.query.topn.TopNQuery; import org.apache.druid.query.topn.TopNQueryRunnerFactory; -import org.apache.druid.server.QueryManager; +import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.QuerySchedulerProvider; import java.util.Map; @@ -50,8 +54,8 @@ */ public class QueryRunnerFactoryModule extends QueryToolChestModule { - private static final Map, Class> MAPPINGS = - ImmutableMap., Class>builder() + private static final Map>, Class>> MAPPINGS = + ImmutableMap.>, Class>>builder() .put(TimeseriesQuery.class, TimeseriesQueryRunnerFactory.class) .put(SearchQuery.class, SearchQueryRunnerFactory.class) .put(TimeBoundaryQuery.class, TimeBoundaryQueryRunnerFactory.class) @@ -67,21 +71,28 @@ public void configure(Binder binder) { super.configure(binder); - binder.bind(QueryWatcher.class) - .to(QueryManager.class) - .in(LazySingleton.class); - binder.bind(QueryManager.class) + binder.bind(QueryScheduler.class) + .toProvider(Key.get(QuerySchedulerProvider.class, Global.class)) .in(LazySingleton.class); + binder.bind(QuerySchedulerProvider.class).in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.query.scheduler", QuerySchedulerProvider.class, Global.class); final MapBinder, QueryRunnerFactory> queryFactoryBinder = DruidBinders.queryRunnerFactoryBinder( binder ); - for (Map.Entry, Class> entry : MAPPINGS.entrySet()) { + for (Map.Entry>, Class>> entry : MAPPINGS.entrySet()) { queryFactoryBinder.addBinding(entry.getKey()).to(entry.getValue()); binder.bind(entry.getValue()).in(LazySingleton.class); } binder.bind(GroupByQueryEngine.class).in(LazySingleton.class); } + + @LazySingleton + @Provides + public QueryWatcher getWatcher(QueryScheduler scheduler) + { + return scheduler; + } } diff --git a/server/src/main/java/org/apache/druid/server/BrokerQueryResource.java b/server/src/main/java/org/apache/druid/server/BrokerQueryResource.java index b75ce5df4cb5..39e6291a134e 100644 --- a/server/src/main/java/org/apache/druid/server/BrokerQueryResource.java +++ b/server/src/main/java/org/apache/druid/server/BrokerQueryResource.java @@ -58,7 +58,7 @@ public BrokerQueryResource( QueryLifecycleFactory queryLifecycleFactory, @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, - QueryManager queryManager, + QueryScheduler queryScheduler, AuthConfig authConfig, AuthorizerMapper authorizerMapper, GenericQueryMetricsFactory queryMetricsFactory, @@ -69,7 +69,7 @@ public BrokerQueryResource( queryLifecycleFactory, jsonMapper, smileMapper, - queryManager, + queryScheduler, authConfig, authorizerMapper, queryMetricsFactory diff --git a/server/src/main/java/org/apache/druid/server/QueryCapacityExceededException.java b/server/src/main/java/org/apache/druid/server/QueryCapacityExceededException.java new file mode 100644 index 000000000000..9085447f2dbe --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/QueryCapacityExceededException.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.QueryException; + +/** + * This exception is for {@link QueryResource} and SqlResource to surface when a query is cast away by + * {@link QueryScheduler}. + * + * As a {@link QueryException} it is expected to be serialied to a json response, but will be mapped to + * {@link #STATUS_CODE} instead of the default HTTP 500 status. + */ +public class QueryCapacityExceededException extends QueryException +{ + private static final String ERROR_CLASS = QueryCapacityExceededException.class.getName(); + public static final String ERROR_CODE = "Query capacity exceeded"; + public static final String ERROR_MESSAGE = "Total query capacity exceeded"; + public static final String ERROR_MESSAGE_TEMPLATE = "Query capacity exceeded for lane '%s'"; + public static final int STATUS_CODE = 429; + + public QueryCapacityExceededException() + { + super(ERROR_CODE, ERROR_MESSAGE, ERROR_CLASS, null); + } + + public QueryCapacityExceededException(String lane) + { + super(ERROR_CODE, StringUtils.format(ERROR_MESSAGE_TEMPLATE, lane), ERROR_CLASS, null); + } + + @JsonCreator + public QueryCapacityExceededException( + @JsonProperty("error") String errorCode, + @JsonProperty("errorMessage") String errorMessage, + @JsonProperty("errorClass") String errorClass) + { + super(errorCode, errorMessage, errorClass, null); + } +} diff --git a/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java new file mode 100644 index 000000000000..a2e922ef48db --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.client.SegmentServerSelector; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; +import org.apache.druid.server.scheduling.NoQueryLaningStrategy; + +import java.util.Optional; +import java.util.Set; + + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = NoQueryLaningStrategy.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "none", value = NoQueryLaningStrategy.class), + @JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class) +}) +public interface QueryLaningStrategy +{ + /** + * Provide a map of lane names to the limit on the number of concurrent queries for that lane + * @param totalLimit + */ + Object2IntMap getLaneLimits(int totalLimit); + + /** + * For a given {@link QueryPlus} and set of {@link SegmentServerSelector}, compute if a query belongs to a lane + * + * This method must be thread safe + */ + Optional computeLane(QueryPlus query, Set segments); +} diff --git a/server/src/main/java/org/apache/druid/server/QueryManager.java b/server/src/main/java/org/apache/druid/server/QueryManager.java deleted file mode 100644 index 0fd1807d86e3..000000000000 --- a/server/src/main/java/org/apache/druid/server/QueryManager.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimaps; -import com.google.common.collect.SetMultimap; -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.query.Query; -import org.apache.druid.query.QueryWatcher; - -import java.util.Set; - -public class QueryManager implements QueryWatcher -{ - - private final SetMultimap queries; - private final SetMultimap queryDatasources; - - public QueryManager() - { - this.queries = Multimaps.synchronizedSetMultimap( - HashMultimap.create() - ); - this.queryDatasources = Multimaps.synchronizedSetMultimap( - HashMultimap.create() - ); - } - - public boolean cancelQuery(String id) - { - queryDatasources.removeAll(id); - Set futures = queries.removeAll(id); - boolean success = true; - for (ListenableFuture future : futures) { - success = success && future.cancel(true); - } - return success; - } - - @Override - public void registerQuery(Query query, final ListenableFuture future) - { - final String id = query.getId(); - final Set datasources = query.getDataSource().getTableNames(); - queries.put(id, future); - queryDatasources.putAll(id, datasources); - future.addListener( - new Runnable() - { - @Override - public void run() - { - queries.remove(id, future); - for (String datasource : datasources) { - queryDatasources.remove(id, datasource); - } - } - }, - Execs.directExecutor() - ); - } - - public Set getQueryDatasources(final String queryId) - { - return queryDatasources.get(queryId); - } -} diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index e2fb263def4c..d270799b62af 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -99,7 +99,7 @@ public class QueryResource implements QueryCountStatsProvider protected final ObjectMapper smileMapper; protected final ObjectMapper serializeDateTimeAsLongJsonMapper; protected final ObjectMapper serializeDateTimeAsLongSmileMapper; - protected final QueryManager queryManager; + protected final QueryScheduler queryScheduler; protected final AuthConfig authConfig; protected final AuthorizerMapper authorizerMapper; @@ -113,7 +113,7 @@ public QueryResource( QueryLifecycleFactory queryLifecycleFactory, @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, - QueryManager queryManager, + QueryScheduler queryScheduler, AuthConfig authConfig, AuthorizerMapper authorizerMapper, GenericQueryMetricsFactory queryMetricsFactory @@ -124,7 +124,7 @@ public QueryResource( this.smileMapper = smileMapper; this.serializeDateTimeAsLongJsonMapper = serializeDataTimeAsLong(jsonMapper); this.serializeDateTimeAsLongSmileMapper = serializeDataTimeAsLong(smileMapper); - this.queryManager = queryManager; + this.queryScheduler = queryScheduler; this.authConfig = authConfig; this.authorizerMapper = authorizerMapper; this.queryMetricsFactory = queryMetricsFactory; @@ -138,9 +138,9 @@ public Response cancelQuery(@PathParam("id") String queryId, @Context final Http if (log.isDebugEnabled()) { log.debug("Received cancel request for query [%s]", queryId); } - Set datasources = queryManager.getQueryDatasources(queryId); + Set datasources = queryScheduler.getQueryDatasources(queryId); if (datasources == null) { - log.warn("QueryId [%s] not registered with QueryManager, cannot cancel", queryId); + log.warn("QueryId [%s] not registered with QueryScheduler, cannot cancel", queryId); datasources = new TreeSet<>(); } @@ -154,7 +154,7 @@ public Response cancelQuery(@PathParam("id") String queryId, @Context final Http throw new ForbiddenException(authResult.toString()); } - queryManager.cancelQuery(queryId); + queryScheduler.cancelQuery(queryId); return Response.status(Response.Status.ACCEPTED).build(); } @@ -310,6 +310,11 @@ public void write(OutputStream outputStream) throws WebApplicationException queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1); return ioReaderWriter.gotError(e); } + catch (QueryCapacityExceededException cap) { + failedQueryCount.incrementAndGet(); + queryLifecycle.emitLogsAndMetrics(cap, req.getRemoteAddr(), -1); + return ioReaderWriter.gotLimited(cap); + } catch (ForbiddenException e) { // don't do anything for an authorization failure, ForbiddenExceptionMapper will catch this later and // send an error response if this is thrown. @@ -434,6 +439,13 @@ Response gotError(Exception e) throws IOException ) .build(); } + + Response gotLimited(QueryCapacityExceededException e) throws IOException + { + return Response.status(QueryCapacityExceededException.STATUS_CODE) + .entity(newOutputWriter(null, null, false).writeValueAsBytes(e)) + .build(); + } } @Override diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java b/server/src/main/java/org/apache/druid/server/QueryScheduler.java new file mode 100644 index 000000000000..50df2720aaa6 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; +import com.google.common.util.concurrent.ListenableFuture; +import io.github.resilience4j.bulkhead.Bulkhead; +import io.github.resilience4j.bulkhead.BulkheadConfig; +import io.github.resilience4j.bulkhead.BulkheadRegistry; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.client.SegmentServerSelector; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryWatcher; +import org.apache.druid.server.initialization.ServerConfig; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * QueryScheduler (potentially) assigns any {@link Query} that is to be executed to a 'query lane' using the + * {@link QueryLaningStrategy} that is defined in {@link QuerySchedulerConfig}. + * + * As a {@link QueryWatcher}, it also provides cancellation facilities to brokers, historicals, and realtime tasks. + * + * This class is shared by all requests on the HTTP theadpool and must be thread safe. + */ +public class QueryScheduler implements QueryWatcher +{ + private static final int NO_CAPACITY = -1; + static final String TOTAL = "default"; + private final int totalCapacity; + private final QueryLaningStrategy laningStrategy; + private final BulkheadRegistry laneRegistry; + /** + * mapping of query id to set of futures associated with the query + */ + private final SetMultimap> queryFutures; + /** + * mapping of query id to set of datasource names that are being queried, used for authorization + */ + private final SetMultimap queryDatasources; + + public QueryScheduler(int totalNumThreads, QueryLaningStrategy laningStrategy, ServerConfig serverConfig) + { + this.laningStrategy = laningStrategy; + this.queryFutures = Multimaps.synchronizedSetMultimap(HashMultimap.create()); + this.queryDatasources = Multimaps.synchronizedSetMultimap(HashMultimap.create()); + // if totalNumThreads is above 0 and less than druid.server.http.numThreads, enforce total limit + final boolean limitTotal; + if (totalNumThreads > 0 && totalNumThreads < serverConfig.getNumThreads()) { + limitTotal = true; + this.totalCapacity = totalNumThreads; + } else { + limitTotal = false; + this.totalCapacity = serverConfig.getNumThreads(); + } + this.laneRegistry = BulkheadRegistry.of(getLaneConfigs(limitTotal)); + } + + @Override + public void registerQueryFuture(Query query, ListenableFuture future) + { + final String id = query.getId(); + final Set datasources = query.getDataSource().getTableNames(); + queryFutures.put(id, future); + queryDatasources.putAll(id, datasources); + future.addListener( + () -> { + queryFutures.remove(id, future); + for (String datasource : datasources) { + queryDatasources.remove(id, datasource); + } + }, + Execs.directExecutor() + ); + } + + /** + * Assign a query a lane (if not set) + */ + public Query laneQuery(QueryPlus queryPlus, Set segments) + { + Query query = queryPlus.getQuery(); + Optional lane = laningStrategy.computeLane(queryPlus, segments); + return lane.map(query::withLane).orElse(query); + } + + /** + * Run a query with the scheduler, attempting to acquire a semaphore from the total and lane specific query capacities + * + * Note that {@link #cancelQuery} should not interrupt the thread that calls run, in all current usages it only + * cancels any {@link ListenableFuture} created downstream. If this ever commonly changes, we should add + * synchronization between {@link #cancelQuery} and the acquisition of the {@link Bulkhead} to continue to ensure that + * anything acquired is also released. + * + * In the meantime, if a {@link ListenableFuture} is registered for the query that calls this method, it MUST handle + * this synchronization itself to ensure that no {@link Bulkhead} is acquired without releasing it. + */ + public Sequence run(Query query, Sequence resultSequence) + { + List bulkheads = acquireLanes(query); + return resultSequence.withBaggage(() -> finishLanes(bulkheads)); + } + + /** + * Forcibly cancel all futures that have been registered to a specific query id + */ + public boolean cancelQuery(String id) + { + // if multiple independent queries from the same or different users share a query id, all will be cancelled due + // to the collision + queryDatasources.removeAll(id); + Set> futures = queryFutures.removeAll(id); + boolean success = true; + for (ListenableFuture future : futures) { + success = success && future.cancel(true); + } + return success; + } + + /** + * Get a {@link Set} of datasource names for a {@link Query} id, used by {@link QueryResource#cancelQuery} to + * authorize that a user may call {@link #cancelQuery} for the given id and datasources + */ + public Set getQueryDatasources(final String queryId) + { + return queryDatasources.get(queryId); + } + + /** + * Get the maximum number of concurrent queries that {@link #run} can support + */ + @VisibleForTesting + int getTotalAvailableCapacity() + { + return laneRegistry.getConfiguration(TOTAL) + .map(config -> laneRegistry.bulkhead(TOTAL, config).getMetrics().getAvailableConcurrentCalls()) + .orElse(NO_CAPACITY); + } + + /** + * Get the maximum number of concurrent queries that {@link #run} can support for a given lane + */ + @VisibleForTesting + int getLaneAvailableCapacity(String lane) + { + return laneRegistry.getConfiguration(lane) + .map(config -> laneRegistry.bulkhead(lane, config).getMetrics().getAvailableConcurrentCalls()) + .orElse(NO_CAPACITY); + } + + /** + * Acquire a semaphore for both the 'total' and a lane, if any is associated with a query + */ + @VisibleForTesting + List acquireLanes(Query query) + { + final String lane = QueryContexts.getLane(query); + final Optional laneConfig = lane == null ? Optional.empty() : laneRegistry.getConfiguration(lane); + final Optional totalConfig = laneRegistry.getConfiguration(TOTAL); + List hallPasses = new ArrayList<>(2); + try { + // if we have a lane, get it first + laneConfig.ifPresent(config -> { + Bulkhead laneLimiter = laneRegistry.bulkhead(lane, config); + if (!laneLimiter.tryAcquirePermission()) { + throw new QueryCapacityExceededException(lane); + } + hallPasses.add(laneLimiter); + }); + + // everyone needs to take one from the total lane; to ensure we don't acquire a lane and never release it, we want + // to check for total capacity exceeded and release the lane (if present) before throwing capacity exceeded + // note that this isn't strictly fair: the bulkhead doesn't use a fair semaphore, the first to acquire the lane + // might lose to one that came after it when acquiring the total, or an unlaned query might lose to a laned query + totalConfig.ifPresent(config -> { + Bulkhead totalLimiter = laneRegistry.bulkhead(TOTAL, config); + if (!totalLimiter.tryAcquirePermission()) { + throw new QueryCapacityExceededException(); + } + hallPasses.add(totalLimiter); + }); + return hallPasses; + } + catch (Exception ex) { + releaseLanes(hallPasses); + throw ex; + } + } + + /** + * Release all {@link Bulkhead} semaphores in the list + */ + @VisibleForTesting + void releaseLanes(List bulkheads) + { + bulkheads.forEach(Bulkhead::releasePermission); + } + + @VisibleForTesting + void finishLanes(List bulkheads) + { + bulkheads.forEach(Bulkhead::onComplete); + } + + /** + * With a total thread count and {@link QueryLaningStrategy#getLaneLimits}, create a map of lane name to + * {@link BulkheadConfig} to be used to create the {@link #laneRegistry}. This accepts the configured value of + * numThreads rather than using {@link #totalCapacity} so that we only have a total {@link Bulkhead} if + * {@link QuerySchedulerConfig#getNumThreads()} is set + */ + private Map getLaneConfigs(boolean hasTotalLimit) + { + Map configs = new HashMap<>(); + if (hasTotalLimit) { + configs.put( + TOTAL, + BulkheadConfig.custom().maxConcurrentCalls(totalCapacity).maxWaitDuration(Duration.ZERO).build() + ); + } + for (Object2IntMap.Entry entry : laningStrategy.getLaneLimits(totalCapacity).object2IntEntrySet()) { + configs.put( + entry.getKey(), + BulkheadConfig.custom().maxConcurrentCalls(entry.getIntValue()).maxWaitDuration(Duration.ZERO).build() + ); + } + return configs; + } +} diff --git a/server/src/main/java/org/apache/druid/server/QuerySchedulerConfig.java b/server/src/main/java/org/apache/druid/server/QuerySchedulerConfig.java new file mode 100644 index 000000000000..350d34ae01da --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/QuerySchedulerConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.server.scheduling.NoQueryLaningStrategy; + +public class QuerySchedulerConfig +{ + @JsonProperty + private Integer numThreads = 0; + + @JsonProperty("laning") + private QueryLaningStrategy laningStrategy = NoQueryLaningStrategy.INSTANCE; + + public int getNumThreads() + { + return numThreads; + } + + public QueryLaningStrategy getLaningStrategy() + { + return laningStrategy; + } +} diff --git a/server/src/main/java/org/apache/druid/server/QuerySchedulerProvider.java b/server/src/main/java/org/apache/druid/server/QuerySchedulerProvider.java new file mode 100644 index 000000000000..f73601f69faf --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/QuerySchedulerProvider.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.google.inject.Inject; +import com.google.inject.Provider; +import org.apache.druid.server.initialization.ServerConfig; + + +public class QuerySchedulerProvider extends QuerySchedulerConfig implements Provider +{ + private final ServerConfig serverConfig; + + /** + * This needs to be both marked as guice injected to be bound correctly, and also marked with json creator and + * jackson inject to work with {@link org.apache.druid.guice.JsonConfigProvider} + */ + @Inject + @JsonCreator + public QuerySchedulerProvider(@JacksonInject ServerConfig serverConfig) + { + this.serverConfig = serverConfig; + } + + @Override + public QueryScheduler get() + { + return new QueryScheduler(getNumThreads(), getLaningStrategy(), serverConfig); + } +} diff --git a/server/src/main/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategy.java new file mode 100644 index 000000000000..af4e23bddf7f --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategy.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.scheduling; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import it.unimi.dsi.fastutil.objects.Object2IntArrayMap; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.client.SegmentServerSelector; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.server.QueryLaningStrategy; + +import java.util.Optional; +import java.util.Set; + +/** + * Query laning strategy which associates all {@link Query} with priority lower than 0 into a 'low' lane + */ +public class HiLoQueryLaningStrategy implements QueryLaningStrategy +{ + public static final String LOW = "low"; + + @JsonProperty + private final int maxLowPercent; + + @JsonCreator + public HiLoQueryLaningStrategy( + @JsonProperty("maxLowPercent") Integer maxLowPercent + ) + { + this.maxLowPercent = Preconditions.checkNotNull(maxLowPercent, "maxLowPercent must be set"); + Preconditions.checkArgument( + 0 < maxLowPercent && maxLowPercent <= 100, + "maxLowPercent must be in the range 1 to 100" + ); + } + + @Override + public Object2IntMap getLaneLimits(int totalLimit) + { + Object2IntMap onlyLow = new Object2IntArrayMap<>(1); + onlyLow.put(LOW, Ints.checkedCast((long) Math.ceil(totalLimit * ((double) maxLowPercent / 100)))); + return onlyLow; + } + + @Override + public Optional computeLane(QueryPlus query, Set segments) + { + final Query theQuery = query.getQuery(); + // QueryContexts.getPriority gives a default, since we are setting priority + final Integer priority = theQuery.getContextValue(QueryContexts.PRIORITY_KEY); + final String lane = theQuery.getContextValue(QueryContexts.LANE_KEY); + if (lane == null && priority != null && priority < 0) { + return Optional.of(LOW); + } + return Optional.ofNullable(lane); + } +} diff --git a/server/src/main/java/org/apache/druid/server/scheduling/NoQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/NoQueryLaningStrategy.java new file mode 100644 index 000000000000..8f830d6b555d --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/scheduling/NoQueryLaningStrategy.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.scheduling; + +import it.unimi.dsi.fastutil.objects.Object2IntArrayMap; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.client.SegmentServerSelector; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.server.QueryLaningStrategy; + +import java.util.Optional; +import java.util.Set; + +/** + * Query laning strategy that does nothing and provides the default, unlimited behavior + */ +public class NoQueryLaningStrategy implements QueryLaningStrategy +{ + private static final Object2IntMap NONE = new Object2IntArrayMap<>(); + + public static final NoQueryLaningStrategy INSTANCE = new NoQueryLaningStrategy(); + + @Override + public Object2IntMap getLaneLimits(int totalLimit) + { + return NONE; + } + + @Override + public Optional computeLane(QueryPlus query, Set segments) + { + return Optional.ofNullable(QueryContexts.getLane(query.getQuery())); + } +} diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 3efe7bc5d759..ac983eb3d4cf 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -47,7 +47,10 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineLookup; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -330,7 +333,8 @@ public int getMergePoolParallelism() return 4; } }, - ForkJoinPool.commonPool() + ForkJoinPool.commonPool(), + new QueryScheduler(0, NoQueryLaningStrategy.INSTANCE, new ServerConfig()) ); } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 9c0588837ffd..25c027e9e56e 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -116,7 +116,10 @@ import org.apache.druid.query.topn.TopNQueryQueryToolChest; import org.apache.druid.query.topn.TopNResultValue; import org.apache.druid.segment.TestHelper; +import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -2473,7 +2476,8 @@ public int getMergePoolParallelism() return 4; } }, - ForkJoinPool.commonPool() + ForkJoinPool.commonPool(), + new QueryScheduler(0, NoQueryLaningStrategy.INSTANCE, new ServerConfig()) ); } diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index 202ade04caed..a12b69025dd7 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -22,28 +22,32 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.LazySequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.MapQueryToolChestWarehouse; import org.apache.druid.query.Query; -import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.Result; import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.timeboundary.TimeBoundaryResultValue; +import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.log.TestRequestLogger; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; +import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthConfig; @@ -69,33 +73,31 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.function.Consumer; /** * */ public class QueryResourceTest { + private static final QueryToolChestWarehouse WAREHOUSE = new MapQueryToolChestWarehouse(ImmutableMap.of()); private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); private static final AuthenticationResult AUTHENTICATION_RESULT = new AuthenticationResult("druid", "druid", null, null); private final HttpServletRequest testServletRequest = EasyMock.createMock(HttpServletRequest.class); - public static final QuerySegmentWalker TEST_SEGMENT_WALKER = new QuerySegmentWalker() + + private static final QuerySegmentWalker TEST_SEGMENT_WALKER = new QuerySegmentWalker() { @Override public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) { - return new QueryRunner() - { - @Override - public Sequence run(QueryPlus query, ResponseContext responseContext) - { - return Sequences.empty(); - } - }; + return (queryPlus, responseContext) -> Sequences.empty(); } @Override @@ -105,11 +107,43 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()) + ); + assertResponseAndCountdownOrBlockForever( + SIMPLE_TIMESERIES_QUERY, + waitAllFinished, + response -> Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()) + ); + waitTwoScheduled.await(); + assertResponseAndCountdownOrBlockForever( + SIMPLE_TIMESERIES_QUERY, + waitAllFinished, + response -> { + Assert.assertEquals(QueryCapacityExceededException.STATUS_CODE, response.getStatus()); + QueryCapacityExceededException ex; + try { + ex = JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryCapacityExceededException.class); + } + catch (IOException e) { + throw new RuntimeException(e); + } + Assert.assertEquals(QueryCapacityExceededException.ERROR_MESSAGE, ex.getMessage()); + Assert.assertEquals(QueryCapacityExceededException.ERROR_CODE, ex.getErrorCode()); + } + ); + waitAllFinished.await(); + } + + @Test(timeout = 10_000L) + public void testTooManyQueryInLane() throws InterruptedException + { + expectPermissiveHappyPathAuth(); + final CountDownLatch waitTwoStarted = new CountDownLatch(2); + final CountDownLatch waitOneScheduled = new CountDownLatch(1); + final CountDownLatch waitAllFinished = new CountDownLatch(3); + final QueryScheduler scheduler = new QueryScheduler(40, new HiLoQueryLaningStrategy(1), new ServerConfig()); + + createScheduledQueryResource(scheduler, ImmutableList.of(waitTwoStarted), ImmutableList.of(waitOneScheduled)); + + assertResponseAndCountdownOrBlockForever( + SIMPLE_TIMESERIES_QUERY_LOW_PRIORITY, + waitAllFinished, + response -> Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()) + ); + waitOneScheduled.await(); + assertResponseAndCountdownOrBlockForever( + SIMPLE_TIMESERIES_QUERY_LOW_PRIORITY, + waitAllFinished, + response -> { + Assert.assertEquals(QueryCapacityExceededException.STATUS_CODE, response.getStatus()); + QueryCapacityExceededException ex; + try { + ex = JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryCapacityExceededException.class); + } + catch (IOException e) { + throw new RuntimeException(e); + } + Assert.assertEquals( + StringUtils.format( + QueryCapacityExceededException.ERROR_MESSAGE_TEMPLATE, + HiLoQueryLaningStrategy.LOW + ), + ex.getMessage() + ); + Assert.assertEquals(QueryCapacityExceededException.ERROR_CODE, ex.getErrorCode()); + + } + ); + waitTwoStarted.await(); + assertResponseAndCountdownOrBlockForever( + SIMPLE_TIMESERIES_QUERY, + waitAllFinished, + response -> Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()) + ); + + waitAllFinished.await(); + } + + private void createScheduledQueryResource( + QueryScheduler scheduler, + Collection beforeScheduler, + Collection inScheduler + ) + { + + QuerySegmentWalker texasRanger = new QuerySegmentWalker() + { + @Override + public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals) + { + return (queryPlus, responseContext) -> { + beforeScheduler.forEach(CountDownLatch::countDown); + + return scheduler.run( + scheduler.laneQuery(queryPlus, ImmutableSet.of()), + new LazySequence(() -> { + inScheduler.forEach(CountDownLatch::countDown); + try { + // pretend to be a query that is waiting on results + Thread.sleep(500); + } + catch (InterruptedException ignored) { + } + // all that waiting for nothing :( + return Sequences.empty(); + }) + ); + }; + } + + @Override + public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) + { + return getQueryRunnerForIntervals(null, null); + } + }; + + queryResource = new QueryResource( + new QueryLifecycleFactory( + WAREHOUSE, + texasRanger, + new DefaultGenericQueryMetricsFactory(), + new NoopServiceEmitter(), + testRequestLogger, + new AuthConfig(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER + ), + JSON_MAPPER, + JSON_MAPPER, + scheduler, + new AuthConfig(), + null, + new DefaultGenericQueryMetricsFactory() + ); + } + + private void assertResponseAndCountdownOrBlockForever(String query, CountDownLatch done, Consumer asserts) + { + Executors.newSingleThreadExecutor().submit(() -> { + try { + Response response = queryResource.doPost( + new ByteArrayInputStream(query.getBytes("UTF-8")), + null, + testServletRequest + ); + asserts.accept(response); + } + catch (IOException e) { + throw new RuntimeException(e); + } + done.countDown(); + }); + } + + private void expectPermissiveHappyPathAuth() + { + EasyMock.expect(testServletRequest.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)) + .andReturn(null) + .anyTimes(); + EasyMock.expect(testServletRequest.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes(); + + EasyMock.expect(testServletRequest.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) + .andReturn(AUTHENTICATION_RESULT) + .anyTimes(); + + testServletRequest.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.replay(testServletRequest); } } diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java new file mode 100644 index 000000000000..0782be0a1936 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -0,0 +1,610 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.ProvisionException; +import io.github.resilience4j.bulkhead.Bulkhead; +import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.JsonConfigurator; +import org.apache.druid.guice.annotations.Global; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.BaseSequence; +import org.apache.druid.java.util.common.guava.LazySequence; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.SequenceWrapper; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.query.topn.TopNQueryBuilder; +import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +public class QuerySchedulerTest +{ + private static final int NUM_CONCURRENT_QUERIES = 10000; + private static final int NUM_ROWS = 10000; + + @Rule + public ExpectedException expected = ExpectedException.none(); + + private ListeningExecutorService executorService; + private QueryScheduler scheduler; + + private AtomicLong totalAcquired; + private AtomicLong totalReleased; + private AtomicLong laneAcquired; + private AtomicLong laneNotAcquired; + private AtomicLong laneReleased; + + @Before + public void setup() + { + executorService = MoreExecutors.listeningDecorator( + Execs.multiThreaded(8, "test_query_scheduler_%s") + ); + totalAcquired = new AtomicLong(); + totalReleased = new AtomicLong(); + laneAcquired = new AtomicLong(); + laneNotAcquired = new AtomicLong(); + laneReleased = new AtomicLong(); + scheduler = new QueryScheduler(5, new HiLoQueryLaningStrategy(40), new ServerConfig()) { + @Override + List acquireLanes(Query query) + { + List bulkheads = super.acquireLanes(query); + if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { + totalAcquired.incrementAndGet(); + } + if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { + laneAcquired.incrementAndGet(); + } + + return bulkheads; + } + + @Override + void releaseLanes(List bulkheads) + { + super.releaseLanes(bulkheads); + if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { + totalReleased.incrementAndGet(); + } + if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { + laneReleased.incrementAndGet(); + if (bulkheads.size() == 1) { + laneNotAcquired.incrementAndGet(); + } + } + } + + @Override + void finishLanes(List bulkheads) + { + super.finishLanes(bulkheads); + if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) { + totalReleased.incrementAndGet(); + } + if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) { + laneReleased.incrementAndGet(); + } + } + }; + } + + @After + public void teardown() + { + executorService.shutdownNow(); + } + + @Test + public void testHiLoHi() throws ExecutionException, InterruptedException + { + TopNQuery interactive = makeInteractiveQuery(); + ListenableFuture future = executorService.submit(() -> { + try { + Query scheduled = scheduler.laneQuery(QueryPlus.wrap(interactive), ImmutableSet.of()); + + Assert.assertNotNull(scheduled); + + Sequence underlyingSequence = makeSequence(10); + underlyingSequence = Sequences.wrap(underlyingSequence, new SequenceWrapper() + { + @Override + public void before() + { + Assert.assertEquals(4, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + } + }); + Sequence results = scheduler.run(scheduled, underlyingSequence); + int rowCount = consumeAndCloseSequence(results); + + Assert.assertEquals(10, rowCount); + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + future.get(); + Assert.assertEquals(5, scheduler.getTotalAvailableCapacity()); + } + + @Test + public void testHiLoLo() throws ExecutionException, InterruptedException + { + TopNQuery report = makeReportQuery(); + ListenableFuture future = executorService.submit(() -> { + try { + Query scheduledReport = scheduler.laneQuery(QueryPlus.wrap(report), ImmutableSet.of()); + Assert.assertNotNull(scheduledReport); + Assert.assertEquals(HiLoQueryLaningStrategy.LOW, QueryContexts.getLane(scheduledReport)); + + Sequence underlyingSequence = makeSequence(10); + underlyingSequence = Sequences.wrap(underlyingSequence, new SequenceWrapper() + { + @Override + public void before() + { + Assert.assertEquals(4, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + } + }); + Sequence results = scheduler.run(scheduledReport, underlyingSequence); + + int rowCount = consumeAndCloseSequence(results); + Assert.assertEquals(10, rowCount); + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + future.get(); + Assert.assertEquals(5, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + } + + @Test + public void testHiLoReleaseLaneWhenSequenceExplodes() throws Exception + { + expected.expectMessage("exploded"); + expected.expect(ExecutionException.class); + TopNQuery interactive = makeInteractiveQuery(); + ListenableFuture future = executorService.submit(() -> { + try { + Query scheduled = scheduler.laneQuery(QueryPlus.wrap(interactive), ImmutableSet.of()); + + Assert.assertNotNull(scheduled); + + Sequence underlyingSequence = makeExplodingSequence(10); + underlyingSequence = Sequences.wrap(underlyingSequence, new SequenceWrapper() + { + @Override + public void before() + { + Assert.assertEquals(4, scheduler.getTotalAvailableCapacity()); + } + }); + Sequence results = scheduler.run(scheduled, underlyingSequence); + + consumeAndCloseSequence(results); + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + future.get(); + } + + @Test + public void testHiLoFailsWhenOutOfLaneCapacity() + { + expected.expectMessage( + StringUtils.format(QueryCapacityExceededException.ERROR_MESSAGE_TEMPLATE, HiLoQueryLaningStrategy.LOW) + ); + expected.expect(QueryCapacityExceededException.class); + + Query report1 = scheduler.laneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()); + scheduler.run(report1, Sequences.empty()); + Assert.assertNotNull(report1); + Assert.assertEquals(4, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + + Query report2 = scheduler.laneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()); + scheduler.run(report2, Sequences.empty()); + Assert.assertNotNull(report2); + Assert.assertEquals(3, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(0, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + + // too many reports + scheduler.run(scheduler.laneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()), Sequences.empty()); + } + + @Test + public void testHiLoFailsWhenOutOfTotalCapacity() + { + expected.expectMessage(QueryCapacityExceededException.ERROR_MESSAGE); + expected.expect(QueryCapacityExceededException.class); + + Query interactive1 = scheduler.laneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()); + scheduler.run(interactive1, Sequences.empty()); + Assert.assertNotNull(interactive1); + Assert.assertEquals(4, scheduler.getTotalAvailableCapacity()); + + Query report1 = scheduler.laneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()); + scheduler.run(report1, Sequences.empty()); + Assert.assertNotNull(report1); + Assert.assertEquals(3, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + + Query interactive2 = scheduler.laneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()); + scheduler.run(interactive2, Sequences.empty()); + Assert.assertNotNull(interactive2); + Assert.assertEquals(2, scheduler.getTotalAvailableCapacity()); + + Query report2 = scheduler.laneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()); + scheduler.run(report2, Sequences.empty()); + Assert.assertNotNull(report2); + Assert.assertEquals(1, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(0, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + + Query interactive3 = scheduler.laneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()); + scheduler.run(interactive3, Sequences.empty()); + Assert.assertNotNull(interactive3); + Assert.assertEquals(0, scheduler.getTotalAvailableCapacity()); + + // one too many + scheduler.run(scheduler.laneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()), Sequences.empty()); + } + + @Test + public void testConcurrency() throws Exception + { + List> futures = new ArrayList<>(NUM_CONCURRENT_QUERIES); + for (int i = 0; i < NUM_CONCURRENT_QUERIES; i++) { + futures.add(makeQueryFuture(executorService, scheduler, makeRandomQuery(), NUM_ROWS)); + maybeDelayNextIteration(i); + } + getFuturesAndAssertAftermathIsChill(futures, scheduler, false); + } + + @Test + public void testConcurrencyLo() throws Exception + { + List> futures = new ArrayList<>(NUM_CONCURRENT_QUERIES); + for (int i = 0; i < NUM_CONCURRENT_QUERIES; i++) { + futures.add(makeQueryFuture(executorService, scheduler, makeReportQuery(), NUM_ROWS)); + maybeDelayNextIteration(i); + } + getFuturesAndAssertAftermathIsChill(futures, scheduler, false); + } + + @Test + public void testConcurrencyHi() throws Exception + { + List> futures = new ArrayList<>(NUM_CONCURRENT_QUERIES); + for (int i = 0; i < NUM_CONCURRENT_QUERIES; i++) { + futures.add(makeQueryFuture(executorService, scheduler, makeInteractiveQuery(), NUM_ROWS)); + maybeDelayNextIteration(i); + } + getFuturesAndAssertAftermathIsChill(futures, scheduler, true); + } + + @Test + public void testConfigNone() + { + final Injector injector = createInjector(); + final String propertyPrefix = "druid.query.scheduler"; + final JsonConfigProvider provider = JsonConfigProvider.of( + propertyPrefix, + QuerySchedulerProvider.class + ); + final Properties properties = new Properties(); + properties.setProperty(propertyPrefix + ".numThreads", "10"); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final QueryScheduler scheduler = provider.get().get().get(); + Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(-1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + } + + @Test + public void testConfigHiLo() + { + final Injector injector = createInjector(); + final String propertyPrefix = "druid.query.scheduler"; + final JsonConfigProvider provider = JsonConfigProvider.of( + propertyPrefix, + QuerySchedulerProvider.class + ); + final Properties properties = new Properties(); + properties.setProperty(propertyPrefix + ".numThreads", "10"); + properties.setProperty(propertyPrefix + ".laning.strategy", "hilo"); + properties.setProperty(propertyPrefix + ".laning.maxLowPercent", "20"); + + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final QueryScheduler scheduler = provider.get().get().get(); + Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + } + + + @Test + public void testMisConfigHiLo() + { + expected.expect(ProvisionException.class); + final Injector injector = createInjector(); + final String propertyPrefix = "druid.query.scheduler"; + final JsonConfigProvider provider = JsonConfigProvider.of( + propertyPrefix, + QuerySchedulerProvider.class + ); + final Properties properties = new Properties(); + properties.setProperty(propertyPrefix + ".laning.strategy", "hilo"); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final QueryScheduler scheduler = provider.get().get().get(); + Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + } + + + private void maybeDelayNextIteration(int i) throws InterruptedException + { + if (i > 0 && i % 10 == 0) { + Thread.sleep(2); + } + } + + private TopNQuery makeRandomQuery() + { + return ThreadLocalRandom.current().nextBoolean() ? makeInteractiveQuery() : makeReportQuery(); + } + + private TopNQuery makeInteractiveQuery() + { + return makeBaseBuilder() + .context(ImmutableMap.of("priority", 10, "queryId", "high-" + UUID.randomUUID())) + .build(); + } + + private TopNQuery makeReportQuery() + { + return makeBaseBuilder() + .context(ImmutableMap.of("priority", -1, "queryId", "low-" + UUID.randomUUID())) + .build(); + } + + private TopNQueryBuilder makeBaseBuilder() + { + return new TopNQueryBuilder() + .dataSource("foo") + .intervals("2020-01-01/2020-01-02") + .dimension("bar") + .metric("chocula") + .aggregators(new CountAggregatorFactory("chocula")) + .threshold(10); + } + + private int consumeAndCloseSequence(Sequence sequence) throws IOException + { + Yielder yielder = Yielders.each(sequence); + int rowCount = 0; + while (!yielder.isDone()) { + rowCount++; + yielder = yielder.next(yielder.get()); + } + yielder.close(); + return rowCount; + } + + private Sequence makeSequence(int count) + { + return new LazySequence<>(() -> { + return new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public Iterator make() + { + return new Iterator() + { + int rowCounter = 0; + + @Override + public boolean hasNext() + { + return rowCounter < count; + } + + @Override + public Integer next() + { + rowCounter++; + return rowCounter; + } + }; + } + + @Override + public void cleanup(Iterator iterFromMake) + { + // nothing to cleanup + } + } + ); + }); + } + + private Sequence makeExplodingSequence(int explodeAfter) + { + final int explodeAt = explodeAfter + 1; + return new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public Iterator make() + { + return new Iterator() + { + int rowCounter = 0; + + @Override + public boolean hasNext() + { + return rowCounter < explodeAt; + } + + @Override + public Integer next() + { + if (rowCounter == explodeAfter) { + throw new RuntimeException("exploded"); + } + + rowCounter++; + return rowCounter; + } + }; + } + + @Override + public void cleanup(Iterator iterFromMake) + { + // nothing to cleanup + } + } + ); + } + + private ListenableFuture makeQueryFuture( + ListeningExecutorService executorService, + QueryScheduler scheduler, + Query query, + int numRows + ) + { + return executorService.submit(() -> { + try { + Query scheduled = scheduler.laneQuery(QueryPlus.wrap(query), ImmutableSet.of()); + + Assert.assertNotNull(scheduled); + + Sequence underlyingSequence = makeSequence(numRows); + Sequence results = scheduler.run(scheduled, underlyingSequence); + + final int actualNumRows = consumeAndCloseSequence(results); + Assert.assertEquals(actualNumRows, numRows); + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + } + + + private void getFuturesAndAssertAftermathIsChill( + List> futures, + QueryScheduler scheduler, + boolean successEqualsTotal + ) + { + int success = 0; + int denied = 0; + int other = 0; + for (Future f : futures) { + try { + f.get(); + success++; + } + catch (ExecutionException ex) { + if (ex.getCause() instanceof QueryCapacityExceededException) { + denied++; + } else { + other++; + } + } + catch (Exception ex) { + other++; + } + } + Assert.assertEquals(0, other); + if (successEqualsTotal) { + Assert.assertEquals(success, totalAcquired.get()); + } else { + Assert.assertTrue(success > 0 && success <= totalAcquired.get()); + } + Assert.assertTrue(denied > 0); + Assert.assertEquals(totalReleased.get(), totalAcquired.get()); + Assert.assertEquals(laneReleased.get(), laneAcquired.get() + laneNotAcquired.get()); + Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(5, scheduler.getTotalAvailableCapacity()); + } + + private Injector createInjector() + { + Injector injector = GuiceInjectors.makeStartupInjectorWithModules( + ImmutableList.of( + binder -> { + binder.bind(ServerConfig.class).toInstance(new ServerConfig()); + JsonConfigProvider.bind(binder, "druid.query.scheduler", QuerySchedulerProvider.class, Global.class); + } + ) + ); + ObjectMapper mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class)); + mapper.setInjectableValues( + new InjectableValues.Std().addValue(ServerConfig.class, injector.getInstance(ServerConfig.class)) + ); + return injector; + } +} diff --git a/server/src/test/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategyTest.java b/server/src/test/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategyTest.java new file mode 100644 index 000000000000..3bc7c3606b1d --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategyTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.scheduling; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.server.QueryLaningStrategy; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class HiLoQueryLaningStrategyTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private Druids.TimeseriesQueryBuilder queryBuilder; + private HiLoQueryLaningStrategy strategy; + + @Before + public void setup() + { + this.queryBuilder = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals(ImmutableList.of(Intervals.ETERNITY)) + .granularity(Granularities.DAY) + .aggregators(new CountAggregatorFactory("count")); + + this.strategy = new HiLoQueryLaningStrategy(40); + } + + @Test + public void testMaxPercentageThreadsRequired() + { + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("maxLowPercent must be set"); + QueryLaningStrategy strategy = new HiLoQueryLaningStrategy(null); + } + + @Test + public void testMaxLowPercentMustBeGreaterThanZero() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("maxLowPercent must be in the range 1 to 100"); + QueryLaningStrategy strategy = new HiLoQueryLaningStrategy(-1); + } + + + @Test + public void testMaxLowPercentMustBeLessThanOrEqual100() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("maxLowPercent must be in the range 1 to 100"); + QueryLaningStrategy strategy = new HiLoQueryLaningStrategy(9000); + } + + @Test + public void testMaxLowPercentZero() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("maxLowPercent must be in the range 1 to 100"); + QueryLaningStrategy strategy = new HiLoQueryLaningStrategy(0); + } + + @Test + public void testMaxLowPercent100() + { + QueryLaningStrategy strategy = new HiLoQueryLaningStrategy(100); + Object2IntMap laneConfig = strategy.getLaneLimits(25); + Assert.assertEquals(1, laneConfig.size()); + Assert.assertTrue(laneConfig.containsKey(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(25, laneConfig.getInt(HiLoQueryLaningStrategy.LOW)); + } + + @Test + public void testMaxLowPercentRoundsUp() + { + // will round up to 1 + QueryLaningStrategy strategyRoundLow = new HiLoQueryLaningStrategy(1); + Object2IntMap laneConfigRoundLow = strategyRoundLow.getLaneLimits(25); + Assert.assertEquals(1, laneConfigRoundLow.size()); + Assert.assertTrue(laneConfigRoundLow.containsKey(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(1, laneConfigRoundLow.getInt(HiLoQueryLaningStrategy.LOW)); + + // will not round, evenly divides + QueryLaningStrategy strategy = new HiLoQueryLaningStrategy(96); + Object2IntMap laneConfig = strategy.getLaneLimits(25); + Assert.assertEquals(1, laneConfig.size()); + Assert.assertTrue(laneConfig.containsKey(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(24, laneConfig.getInt(HiLoQueryLaningStrategy.LOW)); + + // will round up + QueryLaningStrategy strategyRounded = new HiLoQueryLaningStrategy(97); + Object2IntMap laneConfigRounded = strategyRounded.getLaneLimits(25); + Assert.assertEquals(1, laneConfigRounded.size()); + Assert.assertTrue(laneConfigRounded.containsKey(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(25, laneConfigRounded.getInt(HiLoQueryLaningStrategy.LOW)); + } + + @Test + public void testLaneLimits() + { + Object2IntMap laneConfig = strategy.getLaneLimits(5); + Assert.assertEquals(1, laneConfig.size()); + Assert.assertTrue(laneConfig.containsKey(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(2, laneConfig.getInt(HiLoQueryLaningStrategy.LOW)); + } + + @Test + public void testLaningNoPriority() + { + TimeseriesQuery query = queryBuilder.build(); + Assert.assertFalse(strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent()); + } + + @Test + public void testLaningZeroPriority() + { + TimeseriesQuery query = queryBuilder.context(ImmutableMap.of(QueryContexts.PRIORITY_KEY, 0)).build(); + Assert.assertFalse(strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent()); + } + + @Test + public void testLaningInteractivePriority() + { + TimeseriesQuery query = queryBuilder.context(ImmutableMap.of(QueryContexts.PRIORITY_KEY, 100)).build(); + Assert.assertFalse(strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent()); + } + + @Test + public void testLaningLowPriority() + { + TimeseriesQuery query = queryBuilder.context(ImmutableMap.of(QueryContexts.PRIORITY_KEY, -1)).build(); + Assert.assertTrue(strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent()); + Assert.assertEquals( + HiLoQueryLaningStrategy.LOW, + strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get() + ); + } + + @Test + public void testLaningPreservesManualSetLane() + { + TimeseriesQuery query = queryBuilder.context( + ImmutableMap.of(QueryContexts.PRIORITY_KEY, 100, QueryContexts.LANE_KEY, "low") + ).build(); + Assert.assertEquals( + HiLoQueryLaningStrategy.LOW, + strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get() + ); + } +} diff --git a/server/src/test/java/org/apache/druid/server/scheduling/NoQueryLaningStrategyTest.java b/server/src/test/java/org/apache/druid/server/scheduling/NoQueryLaningStrategyTest.java new file mode 100644 index 000000000000..58901df42981 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/scheduling/NoQueryLaningStrategyTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.scheduling; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class NoQueryLaningStrategyTest +{ + private Druids.TimeseriesQueryBuilder queryBuilder; + private NoQueryLaningStrategy strategy; + + @Before + public void setup() + { + this.queryBuilder = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals(ImmutableList.of(Intervals.ETERNITY)) + .granularity(Granularities.DAY) + .aggregators(new CountAggregatorFactory("count")); + + this.strategy = new NoQueryLaningStrategy(); + } + + @Test + public void testDoesntSetLane() + { + TimeseriesQuery query = queryBuilder.context(ImmutableMap.of()).build(); + Assert.assertFalse(strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent()); + } + + @Test + public void testPreservesManualLaneFromContext() + { + final String someLane = "some-lane"; + TimeseriesQuery query = queryBuilder.context( + ImmutableMap.of(QueryContexts.PRIORITY_KEY, 100, QueryContexts.LANE_KEY, someLane) + ).build(); + Assert.assertEquals( + someLane, + strategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get() + ); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java index cc7e6cdf2371..c09aa858da6a 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.server.QueryCapacityExceededException; import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.sql.SqlLifecycle; import org.apache.druid.sql.SqlLifecycleFactory; @@ -171,6 +172,10 @@ public Response doPost( throw new RuntimeException(e); } } + catch (QueryCapacityExceededException cap) { + lifecycle.emitLogsAndMetrics(cap, remoteAddr, -1); + return Response.status(QueryCapacityExceededException.STATUS_CODE).entity(jsonMapper.writeValueAsBytes(cap)).build(); + } catch (ForbiddenException e) { throw e; // let ForbiddenExceptionMapper handle this } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index d0192d21e94d..d9571e6bc287 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -106,6 +106,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.DruidNode; import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.log.NoopRequestLogger; import org.apache.druid.server.security.Access; @@ -700,6 +701,15 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker( final QueryRunnerFactoryConglomerate conglomerate, final File tmpDir ) + { + return createMockWalker(conglomerate, tmpDir, null); + } + + public static SpecificSegmentsQuerySegmentWalker createMockWalker( + final QueryRunnerFactoryConglomerate conglomerate, + final File tmpDir, + @Nullable final QueryScheduler scheduler + ) { final QueryableIndex index1 = IndexBuilder .create() @@ -753,7 +763,8 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker( return new SpecificSegmentsQuerySegmentWalker( conglomerate, INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class), - null + null, + scheduler ).add( DataSegment.builder() .dataSource(DATASOURCE1) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index 70ed9cff838d..749d139ff4b9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -25,10 +25,12 @@ import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.io.Closeables; +import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.FunctionalIterable; +import org.apache.druid.java.util.common.guava.LazySequence; import org.apache.druid.query.DataSource; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.InlineDataSource; @@ -63,6 +65,7 @@ import org.apache.druid.segment.join.LookupJoinableFactory; import org.apache.druid.segment.join.MapJoinableFactoryTest; import org.apache.druid.server.ClientQuerySegmentWalker; +import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; @@ -78,6 +81,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -98,18 +102,21 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C private final QueryRunnerFactoryConglomerate conglomerate; private final QuerySegmentWalker walker; private final JoinableFactory joinableFactory; + private final QueryScheduler scheduler; private final Map> timelines = new HashMap<>(); private final List closeables = new ArrayList<>(); private final List segments = new ArrayList<>(); /** * Create an instance using the provided query runner factory conglomerate and lookup provider. - * If a JoinableFactory is provided, it will be used instead of the default. + * If a JoinableFactory is provided, it will be used instead of the default. If a scheduler is included, + * the runner will schedule queries according to the scheduling config. */ public SpecificSegmentsQuerySegmentWalker( final QueryRunnerFactoryConglomerate conglomerate, final LookupExtractorFactoryContainerProvider lookupProvider, - @Nullable final JoinableFactory joinableFactory + @Nullable final JoinableFactory joinableFactory, + @Nullable final QueryScheduler scheduler ) { this.conglomerate = conglomerate; @@ -121,6 +128,7 @@ public SpecificSegmentsQuerySegmentWalker( .build() ) : joinableFactory; + this.scheduler = scheduler; this.walker = new ClientQuerySegmentWalker( new NoopServiceEmitter(), new DataServerLikeWalker(), @@ -165,6 +173,20 @@ public boolean isUseResultLevelCache() ); } + /** + * Create an instance using the provided query runner factory conglomerate and lookup provider. + * If a JoinableFactory is provided, it will be used instead of the default. + */ + public SpecificSegmentsQuerySegmentWalker( + final QueryRunnerFactoryConglomerate conglomerate, + final LookupExtractorFactoryContainerProvider lookupProvider, + @Nullable final JoinableFactory joinableFactory + ) + { + this(conglomerate, lookupProvider, joinableFactory, null); + } + + /** * Create an instance without any lookups, using the default JoinableFactory */ @@ -389,13 +411,33 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final toolChest ); + // Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments. // This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan) // to function properly. - return (theQuery, responseContext) -> baseRunner.run( - theQuery.withQuery(Queries.withSpecificSegments(theQuery.getQuery(), ImmutableList.copyOf(specs))), - responseContext - ); + return (theQuery, responseContext) -> { + if (scheduler != null) { + Set segments = new HashSet<>(); + specs.forEach(spec -> segments.add(new SegmentServerSelector(null, spec))); + return scheduler.run( + scheduler.laneQuery(theQuery, segments), + new LazySequence<>( + () -> baseRunner.run( + theQuery.withQuery(Queries.withSpecificSegments( + theQuery.getQuery(), + ImmutableList.copyOf(specs) + )), + responseContext + ) + ) + ); + } else { + return baseRunner.run( + theQuery.withQuery(Queries.withSpecificSegments(theQuery.getQuery(), ImmutableList.copyOf(specs))), + responseContext + ); + } + }; } private QueryRunner makeTableRunner( diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index ce925902a575..2fa358285597 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.calcite.avatica.SqlType; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.tools.ValidationException; @@ -33,13 +35,19 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.QueryException; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.ResourceLimitExceededException; +import org.apache.druid.server.QueryCapacityExceededException; +import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.log.TestRequestLogger; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.sql.SqlLifecycleFactory; @@ -67,9 +75,11 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.function.Function; import java.util.stream.Collectors; @@ -85,9 +95,11 @@ public class SqlResourceTest extends CalciteTestBase @Rule public QueryLogHook queryLogHook = QueryLogHook.create(); private SpecificSegmentsQuerySegmentWalker walker = null; + private QueryScheduler scheduler = null; private TestRequestLogger testRequestLogger; private SqlResource resource; private HttpServletRequest req; + private ListeningExecutorService executorService; @BeforeClass public static void setUpClass() @@ -107,7 +119,11 @@ public static void tearDownClass() throws IOException @Before public void setUp() throws Exception { - walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder()); + executorService = MoreExecutors.listeningDecorator( + Execs.multiThreaded(8, "test_sql_resource_%s") + ); + scheduler = new QueryScheduler(5, new HiLoQueryLaningStrategy(40), new ServerConfig()); + walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder(), scheduler); final PlannerConfig plannerConfig = new PlannerConfig() { @@ -167,6 +183,7 @@ public void tearDown() throws Exception { walker.close(); walker = null; + executorService.shutdownNow(); } @Test @@ -638,7 +655,7 @@ public void testExplainCountStar() throws Exception @Test public void testCannotValidate() throws Exception { - final QueryInterruptedException exception = doPost( + final QueryException exception = doPost( new SqlQuery( "SELECT dim4 FROM druid.foo", ResultFormat.OBJECT, @@ -659,7 +676,7 @@ public void testCannotValidate() throws Exception public void testCannotConvert() throws Exception { // SELECT + ORDER unsupported - final QueryInterruptedException exception = doPost( + final QueryException exception = doPost( new SqlQuery("SELECT dim1 FROM druid.foo ORDER BY dim1", ResultFormat.OBJECT, false, null, null) ).lhs; @@ -676,7 +693,7 @@ public void testCannotConvert() throws Exception @Test public void testResourceLimitExceeded() throws Exception { - final QueryInterruptedException exception = doPost( + final QueryException exception = doPost( new SqlQuery( "SELECT DISTINCT dim1 FROM foo", ResultFormat.OBJECT, @@ -692,6 +709,56 @@ public void testResourceLimitExceeded() throws Exception checkSqlRequestLog(false); } + @Test + public void testTooManyRequests() throws Exception + { + final int numQueries = 3; + + List>>>> futures = new ArrayList<>(numQueries); + for (int i = 0; i < numQueries; i++) { + futures.add(executorService.submit(() -> { + try { + return doPost( + new SqlQuery( + "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", + null, + false, + ImmutableMap.of("priority", -5), + null + ), + makeExpectedReq() + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + })); + } + + + int success = 0; + int limited = 0; + for (int i = 0; i < numQueries; i++) { + Pair>> result = futures.get(i).get(); + List> rows = result.rhs; + if (rows != null) { + Assert.assertEquals(ImmutableList.of(ImmutableMap.of("cnt", 6, "TheFoo", "foo")), rows); + success++; + } else { + QueryException interruped = result.lhs; + Assert.assertEquals(QueryCapacityExceededException.ERROR_CODE, interruped.getErrorCode()); + Assert.assertEquals( + StringUtils.format(QueryCapacityExceededException.ERROR_MESSAGE_TEMPLATE, HiLoQueryLaningStrategy.LOW), + interruped.getMessage() + ); + limited++; + } + } + Assert.assertEquals(2, success); + Assert.assertEquals(1, limited); + Assert.assertEquals(3, testRequestLogger.getSqlQueryLogs().size()); + } + @SuppressWarnings("unchecked") private void checkSqlRequestLog(boolean success) { @@ -710,23 +777,53 @@ private void checkSqlRequestLog(boolean success) } } + + private Pair>> doPost(final SqlQuery query) throws Exception + { + return doPost(query, new TypeReference>>() + { + }); + } + + // Returns either an error or a result, assuming the result is a JSON object. + private Pair doPost( + final SqlQuery query, + final TypeReference typeReference + ) throws Exception + { + return doPost(query, req, typeReference); + } + + private Pair doPostRaw(final SqlQuery query) throws Exception + { + return doPostRaw(query, req); + } + + private Pair>> doPost(final SqlQuery query, HttpServletRequest req) throws Exception + { + return doPost(query, req, new TypeReference>>() + { + }); + } + // Returns either an error or a result, assuming the result is a JSON object. - private Pair doPost( + private Pair doPost( final SqlQuery query, + final HttpServletRequest req, final TypeReference typeReference ) throws Exception { - final Pair pair = doPostRaw(query); + final Pair pair = doPostRaw(query, req); if (pair.rhs == null) { //noinspection unchecked - return (Pair) pair; + return (Pair) pair; } else { return Pair.of(pair.lhs, JSON_MAPPER.readValue(pair.rhs, typeReference)); } } // Returns either an error or a result. - private Pair doPostRaw(final SqlQuery query) throws Exception + private Pair doPostRaw(final SqlQuery query, final HttpServletRequest req) throws Exception { final Response response = resource.doPost(query, req); if (response.getStatus() == 200) { @@ -739,16 +836,32 @@ private Pair doPostRaw(final SqlQuery query) ); } else { return Pair.of( - JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryInterruptedException.class), + JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryException.class), null ); } } - private Pair>> doPost(final SqlQuery query) throws Exception + private HttpServletRequest makeExpectedReq() { - return doPost(query, new TypeReference>>() - { - }); + HttpServletRequest req = EasyMock.createStrictMock(HttpServletRequest.class); + EasyMock.expect(req.getRemoteAddr()).andReturn(null).once(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) + .andReturn(CalciteTests.REGULAR_USER_AUTH_RESULT) + .anyTimes(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)) + .andReturn(null) + .anyTimes(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) + .andReturn(CalciteTests.REGULAR_USER_AUTH_RESULT) + .anyTimes(); + req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) + .andReturn(CalciteTests.REGULAR_USER_AUTH_RESULT) + .anyTimes(); + EasyMock.replay(req); + return req; } } diff --git a/website/.spelling b/website/.spelling index eed68eeb13c0..e97c1b29ee02 100644 --- a/website/.spelling +++ b/website/.spelling @@ -260,6 +260,7 @@ javadoc kerberos keystore keytab +laning lifecycle localhost log4j