From 6654a724bdbefeabb28c6d2f5ec5afaddfe8755f Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 3 Mar 2020 16:37:42 -0800 Subject: [PATCH 1/9] add manual laning strategy, integration test, json config test --- docs/configuration/index.md | 11 +- .../docker/environment-configs/broker | 3 + .../AbstractQueryResourceTestClient.java | 16 ++ .../CoordinatorResourceTestClient.java | 23 ++- .../utils/AbstractTestQueryHelper.java | 4 +- .../testing/utils/SqlTestQueryHelper.java | 2 +- .../druid/testing/utils/TestQueryHelper.java | 2 +- .../tests/query/ITWikipediaQueryTest.java | 89 ++++++++++- .../druid/server/QueryLaningStrategy.java | 4 +- .../scheduling/ManualQueryLaningStrategy.java | 80 ++++++++++ .../druid/server/QuerySchedulerTest.java | 43 +++++ .../ManualQueryLaningStrategyTest.java | 149 ++++++++++++++++++ 12 files changed, 415 insertions(+), 11 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java create mode 100644 server/src/test/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategyTest.java diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 7f95e4b858e5..84c195e79d09 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1505,7 +1505,16 @@ This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=h |Property|Description|Default| |--------|-----------|-------| -|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of`druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be in the range 1 to 100, and will be rounded up|No default, must be set if using this mode| +|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be in the range 1 to 100, and will be rounded up|No default, must be set if using this mode| + + +###### 'Manual' laning strategy +This laning strategy is best suited for cases where one or more external applications which query Druid are capable of manually deciding what lane a given query should belong to. Configured with a map of lane names to percent or exact max capacities, queries with a matching `lane` parameter in the [query context](../querying/query-context.md) will be subjected to those limits. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.query.scheduler.laning.lanes.{name}`|Maximum percent or exact limit of queries that can concurrently run in the defined lanes. Any number of lanes may be defined like this.|No default, must define at least one lane with a limit above 0. If `druid.query.scheduler.laning.isLimitPercent` is set to `true`, numbers must be in the range of 1 to 100.| +|`druid.query.scheduler.laning.isLimitPercent`|If set to `true`, the values set for `druid.query.scheduler.laning.lanes` will be treated as a percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`|`false`| ##### Server Configuration diff --git a/integration-tests/docker/environment-configs/broker b/integration-tests/docker/environment-configs/broker index c88354192291..2088881444ba 100644 --- a/integration-tests/docker/environment-configs/broker +++ b/integration-tests/docker/environment-configs/broker @@ -37,3 +37,6 @@ druid_cache_sizeInBytes=40000000 druid_auth_basic_common_cacheDirectory=/tmp/authCache/broker druid_sql_avatica_enable=true druid_server_https_crlPath=/tls/revocations.crl +druid_query_scheduler_laning_strategy=manual +druid_query_scheduler_laning_lanes_one=1 +druid_query_scheduler_laning_lanes_two=2 \ No newline at end of file diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java index 1cb9dbf44ae0..15851c5dbdd1 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java @@ -35,6 +35,7 @@ import java.net.URL; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; public abstract class AbstractQueryResourceTestClient { @@ -87,4 +88,19 @@ public List> query(String url, QueryType query) } } + public Future queryAsync(String url, QueryType query) + { + try { + return httpClient.go( + new Request(HttpMethod.POST, new URL(url)).setContent( + "application/json", + jsonMapper.writeValueAsBytes(query) + ), + StatusResponseHandler.getInstance() + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java index b6d5e28801cc..5f96ca00b49c 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java @@ -41,6 +41,7 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.net.URL; import java.util.List; import java.util.Map; @@ -262,13 +263,29 @@ public Map initializeLookups(String filePath) throws Exception return results2; } + @Nullable private Map>> getLookupLoadStatus() { String url = StringUtils.format("%slookups/nodeStatus", getCoordinatorURL()); Map>> status; try { - StatusResponseHolder response = makeRequest(HttpMethod.GET, url); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.GET, new URL(url)), + responseHandler + ).get(); + + if (response.getStatus().getCode() == HttpResponseStatus.NOT_FOUND.getCode()) { + return null; + } + if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) { + throw new ISE( + "Error while making request to url[%s] status[%s] content[%s]", + url, + response.getStatus(), + response.getContent() + ); + } status = jsonMapper.readValue( response.getContent(), new TypeReference>>>() @@ -286,6 +303,10 @@ public boolean areLookupsLoaded(String lookup) { final Map>> status = getLookupLoadStatus(); + if (status == null) { + return false; + } + final Map> defaultTier = status.get("__default"); boolean isLoaded = true; diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java index d677af9d1153..a84eda399dc4 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java @@ -39,6 +39,7 @@ public abstract class AbstractTestQueryHelper coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load" ); - coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE); - ITRetryUtil.retryUntilTrue( - () -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia lookup load" - ); + if (!coordinatorClient.areLookupsLoaded(WIKI_LOOKUP)) { + coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE); + ITRetryUtil.retryUntilTrue( + () -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia lookup load" + ); + } } @Test @@ -62,4 +82,65 @@ public void testWikipediaQueriesFromFile() throws Exception { queryHelper.testQueriesFromFile(WIKIPEDIA_QUERIES_RESOURCE, 2); } + + @Test + public void testQueryLaning() throws Exception + { + // the broker is configured with 2 manually defined query lanes, 'one' with limit 1, and 'two' with limit 'two' + // -Ddruid.query.scheduler.laning.type=manual + // -Ddruid.query.scheduler.laning.lanes.one=1 + // -Ddruid.query.scheduler.laning.lanes.two=2 + // by issuing 50 queries, at least 1 of them will succeed on 'one', and at least 1 of them will overlap enough to + // get limited + final int numQueries = 50; + List> futures = new ArrayList<>(numQueries); + for (int i = 0; i < numQueries; i++) { + futures.add( + queryClient.queryAsync( + queryHelper.getQueryURL(config.getBrokerUrl()), + getQueryBuilder().build() + ) + ); + } + + int success = 0; + int limited = 0; + + for (Future future : futures) { + StatusResponseHolder status = future.get(); + if (status.getStatus().getCode() == QueryCapacityExceededException.STATUS_CODE) { + limited++; + Assert.assertTrue(status.getContent().contains("one")); + } else if (status.getStatus().getCode() == HttpResponseStatus.OK.getCode()) { + success++; + } + } + + Assert.assertTrue(success > 0); + Assert.assertTrue(limited > 0); + + // test another to make sure we can still issue one query at a time + StatusResponseHolder followUp = queryClient.queryAsync( + queryHelper.getQueryURL(config.getBrokerUrl()), + getQueryBuilder().build() + ).get(); + + Assert.assertEquals(HttpResponseStatus.OK.getCode(), followUp.getStatus().getCode()); + + StatusResponseHolder andAnother = queryClient.queryAsync( + queryHelper.getQueryURL(config.getBrokerUrl()), + getQueryBuilder().build() + ).get(); + + Assert.assertEquals(HttpResponseStatus.OK.getCode(), andAnother.getStatus().getCode()); + } + + private Druids.TimeseriesQueryBuilder getQueryBuilder() + { + return Druids.newTimeseriesQueryBuilder() + .dataSource("wikipedia_editstream") + .aggregators(new CountAggregatorFactory("chocula")) + .intervals("2013-01-01T00:00:00.000/2013-01-08T00:00:00.000") + .context(ImmutableMap.of("lane", "one", "queryId", UUID.randomUUID().toString())); + } } diff --git a/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java index a2e922ef48db..7ef2ee810e8d 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java @@ -25,6 +25,7 @@ import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.query.QueryPlus; import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; +import org.apache.druid.server.scheduling.ManualQueryLaningStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import java.util.Optional; @@ -34,7 +35,8 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = NoQueryLaningStrategy.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "none", value = NoQueryLaningStrategy.class), - @JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class) + @JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class), + @JsonSubTypes.Type(name = "manual", value = ManualQueryLaningStrategy.class) }) public interface QueryLaningStrategy { diff --git a/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java new file mode 100644 index 000000000000..bbf21db002c6 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.scheduling; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import it.unimi.dsi.fastutil.objects.Object2IntArrayMap; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.client.SegmentServerSelector; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.server.QueryLaningStrategy; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class ManualQueryLaningStrategy implements QueryLaningStrategy +{ + @JsonProperty + private Map lanes; + + @JsonProperty + private boolean isLimitPercent; + + @JsonCreator + public ManualQueryLaningStrategy( + @JsonProperty("lanes") Map lanes, + @JsonProperty("isLimitPercent") @Nullable Boolean isLimitPercent + ) + { + this.lanes = Preconditions.checkNotNull(lanes, "lanes must be set"); + this.isLimitPercent = isLimitPercent != null ? isLimitPercent : false; + Preconditions.checkArgument(lanes.size() > 0, "lanes must define at least one lane"); + Preconditions.checkArgument( + lanes.values().stream().allMatch(x -> this.isLimitPercent ? 0 < x && x <= 100 : x > 0), + this.isLimitPercent ? "All lane limits must be in the range 1 to 100" : "All lane limits must be greater than 0" + ); + } + + @Override + public Object2IntMap getLaneLimits(int totalLimit) + { + + if (isLimitPercent) { + Object2IntMap laneLimits = new Object2IntArrayMap(lanes.size()); + lanes.forEach((key, value) -> + laneLimits.put(key, Ints.checkedCast((long) Math.ceil(totalLimit * ((double) value / 100)))) + ); + return laneLimits; + } + return new Object2IntArrayMap<>(lanes); + } + + @Override + public Optional computeLane(QueryPlus query, Set segments) + { + return Optional.ofNullable(QueryContexts.getLane(query.getQuery())); + } +} diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java index 0782be0a1936..c415bddbef06 100644 --- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -399,6 +399,49 @@ public void testMisConfigHiLo() } + @Test + public void testConfigManual() + { + final Injector injector = createInjector(); + final String propertyPrefix = "druid.query.scheduler"; + final JsonConfigProvider provider = JsonConfigProvider.of( + propertyPrefix, + QuerySchedulerProvider.class + ); + final Properties properties = new Properties(); + properties.put(propertyPrefix + ".numThreads", "10"); + properties.put(propertyPrefix + ".laning.strategy", "manual"); + properties.put(propertyPrefix + ".laning.lanes.one", "1"); + properties.put(propertyPrefix + ".laning.lanes.two", "2"); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final QueryScheduler scheduler = provider.get().get().get(); + Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(1, scheduler.getLaneAvailableCapacity("one")); + Assert.assertEquals(2, scheduler.getLaneAvailableCapacity("two")); + } + + @Test + public void testConfigManualPercent() + { + final Injector injector = createInjector(); + final String propertyPrefix = "druid.query.scheduler"; + final JsonConfigProvider provider = JsonConfigProvider.of( + propertyPrefix, + QuerySchedulerProvider.class + ); + final Properties properties = new Properties(); + properties.put(propertyPrefix + ".numThreads", "10"); + properties.put(propertyPrefix + ".laning.strategy", "manual"); + properties.put(propertyPrefix + ".laning.isLimitPercent", "true"); + properties.put(propertyPrefix + ".laning.lanes.one", "1"); + properties.put(propertyPrefix + ".laning.lanes.twenty", "20"); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final QueryScheduler scheduler = provider.get().get().get(); + Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(1, scheduler.getLaneAvailableCapacity("one")); + Assert.assertEquals(2, scheduler.getLaneAvailableCapacity("twenty")); + } + private void maybeDelayNextIteration(int i) throws InterruptedException { if (i > 0 && i % 10 == 0) { diff --git a/server/src/test/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategyTest.java b/server/src/test/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategyTest.java new file mode 100644 index 000000000000..28f13eee7416 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategyTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.scheduling; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.server.QueryLaningStrategy; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +@SuppressWarnings("ResultOfObjectAllocationIgnored") +public class ManualQueryLaningStrategyTest +{ + private Druids.TimeseriesQueryBuilder queryBuilder; + private QueryLaningStrategy exactStrategy; + private QueryLaningStrategy percentStrategy; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setup() + { + this.queryBuilder = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals(ImmutableList.of(Intervals.ETERNITY)) + .granularity(Granularities.DAY) + .aggregators(new CountAggregatorFactory("count")); + this.exactStrategy = + new ManualQueryLaningStrategy(ImmutableMap.of("one", 1, "ten", 10), null); + this.percentStrategy = + new ManualQueryLaningStrategy(ImmutableMap.of("one", 1, "ten", 10), true); + } + + @Test + public void testLanesMustBeSet() + { + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("lanes must be set"); + new ManualQueryLaningStrategy(null, null); + } + + @Test + public void testMustDefineAtLeast1Lane() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("lanes must define at least one lane"); + new ManualQueryLaningStrategy(ImmutableMap.of(), null); + } + + @Test + public void testExactLaneLimitsMustBeAboveZero() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("All lane limits must be greater than 0"); + new ManualQueryLaningStrategy(ImmutableMap.of("zero", 0, "one", 1), null); + } + + @Test + public void testPercentLaneLimitsMustBeAboveZero() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("All lane limits must be in the range 1 to 100"); + new ManualQueryLaningStrategy(ImmutableMap.of("zero", 0, "one", 25), true); + } + + @Test + public void testExactLimits() + { + Object2IntMap exactLanes = exactStrategy.getLaneLimits(50); + Assert.assertEquals(1, exactLanes.getInt("one")); + Assert.assertEquals(10, exactLanes.getInt("ten")); + } + + @Test + public void testPercentLimits() + { + Object2IntMap exactLanes = percentStrategy.getLaneLimits(50); + Assert.assertEquals(1, exactLanes.getInt("one")); + Assert.assertEquals(5, exactLanes.getInt("ten")); + } + + @Test + public void testDoesntSetLane() + { + TimeseriesQuery query = queryBuilder.context(ImmutableMap.of()).build(); + Assert.assertFalse(exactStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent()); + Assert.assertFalse(percentStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent()); + } + + @Test + public void testPreservesManualLaneFromContextThatArentInMapAndIgnoresThem() + { + final String someLane = "some-lane"; + TimeseriesQuery query = queryBuilder.context(ImmutableMap.of(QueryContexts.LANE_KEY, someLane)).build(); + Assert.assertEquals( + someLane, + exactStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get() + ); + Assert.assertEquals( + someLane, + percentStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get() + ); + } + + @Test + public void testPreservesManualLaneFromContext() + { + final String someLane = "ten"; + TimeseriesQuery query = queryBuilder.context(ImmutableMap.of(QueryContexts.LANE_KEY, someLane)).build(); + Assert.assertEquals( + someLane, + exactStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get() + ); + Assert.assertEquals( + someLane, + percentStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get() + ); + } +} From b187125ce937a053a33116769403cc02a001ab23 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 10 Mar 2020 04:10:13 -0700 Subject: [PATCH 2/9] share percent conversion method --- .../java/org/apache/druid/server/QueryLaningStrategy.java | 6 ++++++ .../druid/server/scheduling/HiLoQueryLaningStrategy.java | 3 +-- .../druid/server/scheduling/ManualQueryLaningStrategy.java | 7 ++----- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java index 7ef2ee810e8d..d601384c6659 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.primitives.Ints; import it.unimi.dsi.fastutil.objects.Object2IntMap; import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.query.QueryPlus; @@ -52,4 +53,9 @@ public interface QueryLaningStrategy * This method must be thread safe */ Optional computeLane(QueryPlus query, Set segments); + + default int computeLimitFromPercent(int totalLimit, int value) + { + return Ints.checkedCast((long) Math.ceil(totalLimit * ((double) value / 100))); + } } diff --git a/server/src/main/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategy.java index af4e23bddf7f..0a974228f663 100644 --- a/server/src/main/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategy.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; import it.unimi.dsi.fastutil.objects.Object2IntArrayMap; import it.unimi.dsi.fastutil.objects.Object2IntMap; import org.apache.druid.client.SegmentServerSelector; @@ -60,7 +59,7 @@ public HiLoQueryLaningStrategy( public Object2IntMap getLaneLimits(int totalLimit) { Object2IntMap onlyLow = new Object2IntArrayMap<>(1); - onlyLow.put(LOW, Ints.checkedCast((long) Math.ceil(totalLimit * ((double) maxLowPercent / 100)))); + onlyLow.put(LOW, computeLimitFromPercent(totalLimit, maxLowPercent)); return onlyLow; } diff --git a/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java index bbf21db002c6..a670c2fc455f 100644 --- a/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; import it.unimi.dsi.fastutil.objects.Object2IntArrayMap; import it.unimi.dsi.fastutil.objects.Object2IntMap; import org.apache.druid.client.SegmentServerSelector; @@ -63,10 +62,8 @@ public Object2IntMap getLaneLimits(int totalLimit) { if (isLimitPercent) { - Object2IntMap laneLimits = new Object2IntArrayMap(lanes.size()); - lanes.forEach((key, value) -> - laneLimits.put(key, Ints.checkedCast((long) Math.ceil(totalLimit * ((double) value / 100)))) - ); + Object2IntMap laneLimits = new Object2IntArrayMap<>(lanes.size()); + lanes.forEach((key, value) -> laneLimits.put(key, computeLimitFromPercent(totalLimit, value))); return laneLimits; } return new Object2IntArrayMap<>(lanes); From 700e6abc1520bdaacb69f0a2d601ec73104d5b61 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 10 Mar 2020 13:17:01 -0700 Subject: [PATCH 3/9] wrong assert --- .../java/org/apache/druid/tests/query/ITWikipediaQueryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java index 3e8d0748addf..79e5466e65a2 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java @@ -33,7 +33,7 @@ import org.apache.druid.testing.utils.TestQueryHelper; import org.apache.druid.tests.TestNGGroup; import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.junit.Assert; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Guice; import org.testng.annotations.Test; From a05c9fa53c7c83362241092bcbd71d732be837a0 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 13 Mar 2020 01:58:43 -0700 Subject: [PATCH 4/9] review stuffs --- .../docker/environment-configs/broker | 3 +- .../tests/query/ITWikipediaQueryTest.java | 40 +++++++++++++++++-- .../apache/druid/server/QueryScheduler.java | 6 +-- .../ManualQueryLaningStrategyTest.java | 13 +++++- 4 files changed, 52 insertions(+), 10 deletions(-) diff --git a/integration-tests/docker/environment-configs/broker b/integration-tests/docker/environment-configs/broker index 2088881444ba..b8794d4c0bd7 100644 --- a/integration-tests/docker/environment-configs/broker +++ b/integration-tests/docker/environment-configs/broker @@ -38,5 +38,4 @@ druid_auth_basic_common_cacheDirectory=/tmp/authCache/broker druid_sql_avatica_enable=true druid_server_https_crlPath=/tls/revocations.crl druid_query_scheduler_laning_strategy=manual -druid_query_scheduler_laning_lanes_one=1 -druid_query_scheduler_laning_lanes_two=2 \ No newline at end of file +druid_query_scheduler_laning_lanes_one=1 \ No newline at end of file diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java index 79e5466e65a2..b3d7de81ae00 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java @@ -84,12 +84,11 @@ public void testWikipediaQueriesFromFile() throws Exception } @Test - public void testQueryLaning() throws Exception + public void testQueryLaningLaneIsLimited() throws Exception { - // the broker is configured with 2 manually defined query lanes, 'one' with limit 1, and 'two' with limit 'two' + // the broker is configured with a manually defined query lane, 'one' with limit 1 // -Ddruid.query.scheduler.laning.type=manual // -Ddruid.query.scheduler.laning.lanes.one=1 - // -Ddruid.query.scheduler.laning.lanes.two=2 // by issuing 50 queries, at least 1 of them will succeed on 'one', and at least 1 of them will overlap enough to // get limited final int numQueries = 50; @@ -135,6 +134,41 @@ public void testQueryLaning() throws Exception Assert.assertEquals(HttpResponseStatus.OK.getCode(), andAnother.getStatus().getCode()); } + @Test + public void testQueryLaningWithNoLane() throws Exception + { + // the broker is configured with a manually defined query lane, 'one' with limit 1 + // -Ddruid.query.scheduler.laning.type=manual + // -Ddruid.query.scheduler.laning.lanes.one=1 + // these queries will not belong to the lane so none of them should be limited + final int numQueries = 50; + List> futures = new ArrayList<>(numQueries); + for (int i = 0; i < numQueries; i++) { + futures.add( + queryClient.queryAsync( + queryHelper.getQueryURL(config.getBrokerUrl()), + getQueryBuilder().context(ImmutableMap.of("queryId", UUID.randomUUID().toString())).build() + ) + ); + } + + int success = 0; + int limited = 0; + + for (Future future : futures) { + StatusResponseHolder status = future.get(); + if (status.getStatus().getCode() == QueryCapacityExceededException.STATUS_CODE) { + limited++; + } else if (status.getStatus().getCode() == HttpResponseStatus.OK.getCode()) { + success++; + } + } + + Assert.assertTrue(success > 0); + Assert.assertEquals(limited, 0); + + } + private Druids.TimeseriesQueryBuilder getQueryBuilder() { return Druids.newTimeseriesQueryBuilder() diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java b/server/src/main/java/org/apache/druid/server/QueryScheduler.java index 8338907cc688..93786d770044 100644 --- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java +++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java @@ -55,7 +55,7 @@ */ public class QueryScheduler implements QueryWatcher { - private static final int NO_CAPACITY = -1; + public static final int UNAVAILABLE = -1; static final String TOTAL = "default"; private final int totalCapacity; private final QueryPrioritizationStrategy prioritizationStrategy; @@ -173,7 +173,7 @@ int getTotalAvailableCapacity() { return laneRegistry.getConfiguration(TOTAL) .map(config -> laneRegistry.bulkhead(TOTAL, config).getMetrics().getAvailableConcurrentCalls()) - .orElse(NO_CAPACITY); + .orElse(UNAVAILABLE); } /** @@ -184,7 +184,7 @@ int getLaneAvailableCapacity(String lane) { return laneRegistry.getConfiguration(lane) .map(config -> laneRegistry.bulkhead(lane, config).getMetrics().getAvailableConcurrentCalls()) - .orElse(NO_CAPACITY); + .orElse(UNAVAILABLE); } /** diff --git a/server/src/test/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategyTest.java b/server/src/test/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategyTest.java index 28f13eee7416..7b04c4730c35 100644 --- a/server/src/test/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategyTest.java @@ -58,7 +58,7 @@ public void setup() this.exactStrategy = new ManualQueryLaningStrategy(ImmutableMap.of("one", 1, "ten", 10), null); this.percentStrategy = - new ManualQueryLaningStrategy(ImmutableMap.of("one", 1, "ten", 10), true); + new ManualQueryLaningStrategy(ImmutableMap.of("one", 1, "ten", 10, "one-hundred", 100), true); } @Test @@ -70,7 +70,7 @@ public void testLanesMustBeSet() } @Test - public void testMustDefineAtLeast1Lane() + public void testMustDefineAtLeastOneLane() { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("lanes must define at least one lane"); @@ -93,6 +93,14 @@ public void testPercentLaneLimitsMustBeAboveZero() new ManualQueryLaningStrategy(ImmutableMap.of("zero", 0, "one", 25), true); } + @Test + public void testPercentLaneLimitsMustBeLessThanOneHundred() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("All lane limits must be in the range 1 to 100"); + new ManualQueryLaningStrategy(ImmutableMap.of("one", 1, "one-hundred-and-one", 101), true); + } + @Test public void testExactLimits() { @@ -107,6 +115,7 @@ public void testPercentLimits() Object2IntMap exactLanes = percentStrategy.getLaneLimits(50); Assert.assertEquals(1, exactLanes.getInt("one")); Assert.assertEquals(5, exactLanes.getInt("ten")); + Assert.assertEquals(50, exactLanes.getInt("one-hundred")); } @Test From 20c10cfedb3d0cc2b28b4a5c7afe0e5ca2e4b929 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 13 Mar 2020 02:01:09 -0700 Subject: [PATCH 5/9] doc adjustments --- docs/configuration/index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 1d5a1195cca0..338dcae524d8 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1524,7 +1524,7 @@ This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=h |Property|Description|Default| |--------|-----------|-------| -|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be in the range 1 to 100, and will be rounded up|No default, must be set if using this mode| +|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be an integer in the range 1 to 100, and will be rounded up|No default, must be set if using this mode| ###### 'Manual' laning strategy @@ -1532,7 +1532,7 @@ This laning strategy is best suited for cases where one or more external applica |Property|Description|Default| |--------|-----------|-------| -|`druid.query.scheduler.laning.lanes.{name}`|Maximum percent or exact limit of queries that can concurrently run in the defined lanes. Any number of lanes may be defined like this.|No default, must define at least one lane with a limit above 0. If `druid.query.scheduler.laning.isLimitPercent` is set to `true`, numbers must be in the range of 1 to 100.| +|`druid.query.scheduler.laning.lanes.{name}`|Maximum percent or exact limit of queries that can concurrently run in the defined lanes. Any number of lanes may be defined like this.|No default, must define at least one lane with a limit above 0. If `druid.query.scheduler.laning.isLimitPercent` is set to `true`, values must be integers in the range of 1 to 100.| |`druid.query.scheduler.laning.isLimitPercent`|If set to `true`, the values set for `druid.query.scheduler.laning.lanes` will be treated as a percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`|`false`| ##### Server Configuration From 0bf4232d3aa4f4f0d42bd1724aacbdc2a0a971c5 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 13 Mar 2020 02:05:13 -0700 Subject: [PATCH 6/9] more tests --- .../apache/druid/server/QuerySchedulerTest.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java index 15acc06512b6..0e3066bea37d 100644 --- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -183,6 +183,7 @@ public void before() }); future.get(); Assert.assertEquals(5, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); } @Test @@ -217,6 +218,7 @@ public void before() future.get(); Assert.assertEquals(5, scheduler.getTotalAvailableCapacity()); Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); } @Test @@ -272,7 +274,9 @@ public void testHiLoFailsWhenOutOfLaneCapacity() Assert.assertEquals(0, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); // too many reports - scheduler.run(scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()), Sequences.empty()); + scheduler.run( + scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()), Sequences.empty() + ); } @Test @@ -309,7 +313,9 @@ public void testHiLoFailsWhenOutOfTotalCapacity() Assert.assertEquals(0, scheduler.getTotalAvailableCapacity()); // one too many - scheduler.run(scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()), Sequences.empty()); + scheduler.run( + scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()), Sequences.empty() + ); } @Test @@ -359,7 +365,8 @@ public void testConfigNone() provider.inject(properties, injector.getInstance(JsonConfigurator.class)); final QueryScheduler scheduler = provider.get().get().get(); Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); - Assert.assertEquals(-1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); } @Test @@ -380,6 +387,7 @@ public void testConfigHiLo() final QueryScheduler scheduler = provider.get().get().get(); Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); } @@ -421,6 +429,7 @@ public void testConfigHiLoWithThreshold() final QueryScheduler scheduler = provider.get().get().get(); Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); Query query = scheduler.prioritizeAndLaneQuery( QueryPlus.wrap(makeDefaultQuery()), @@ -471,6 +480,7 @@ public void testConfigManual() Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); Assert.assertEquals(1, scheduler.getLaneAvailableCapacity("one")); Assert.assertEquals(2, scheduler.getLaneAvailableCapacity("two")); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); } @Test @@ -493,6 +503,7 @@ public void testConfigManualPercent() Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); Assert.assertEquals(1, scheduler.getLaneAvailableCapacity("one")); Assert.assertEquals(2, scheduler.getLaneAvailableCapacity("twenty")); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); } private void maybeDelayNextIteration(int i) throws InterruptedException From ee435add58db2333e2539e904d0d3114f5b2f477 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 13 Mar 2020 02:06:46 -0700 Subject: [PATCH 7/9] test adjustment --- .../org/apache/druid/tests/query/ITWikipediaQueryTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java index b3d7de81ae00..9461ed9ed799 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.query.Druids; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -109,7 +110,7 @@ public void testQueryLaningLaneIsLimited() throws Exception StatusResponseHolder status = future.get(); if (status.getStatus().getCode() == QueryCapacityExceededException.STATUS_CODE) { limited++; - Assert.assertTrue(status.getContent().contains("one")); + Assert.assertTrue(status.getContent().contains(StringUtils.format(QueryCapacityExceededException.ERROR_MESSAGE_TEMPLATE, "one"))); } else if (status.getStatus().getCode() == HttpResponseStatus.OK.getCode()) { success++; } From cdb2d12c73e508c81659dffcc5e220839b43c1e1 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 13 Mar 2020 17:13:11 -0700 Subject: [PATCH 8/9] adjust docs --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 338dcae524d8..44b7703d4426 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1533,7 +1533,7 @@ This laning strategy is best suited for cases where one or more external applica |Property|Description|Default| |--------|-----------|-------| |`druid.query.scheduler.laning.lanes.{name}`|Maximum percent or exact limit of queries that can concurrently run in the defined lanes. Any number of lanes may be defined like this.|No default, must define at least one lane with a limit above 0. If `druid.query.scheduler.laning.isLimitPercent` is set to `true`, values must be integers in the range of 1 to 100.| -|`druid.query.scheduler.laning.isLimitPercent`|If set to `true`, the values set for `druid.query.scheduler.laning.lanes` will be treated as a percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`|`false`| +|`druid.query.scheduler.laning.isLimitPercent`|If set to `true`, the values set for `druid.query.scheduler.laning.lanes` will be treated as a percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`. Note that in this mode, these lane values acrross lanes are _not_ required to add up to, and can exceed, 100%.|`false`| ##### Server Configuration From d9762699717e50c1297539d25a38af749a289acb Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 13 Mar 2020 18:00:45 -0700 Subject: [PATCH 9/9] Update index.md --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 44b7703d4426..800044036681 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1533,7 +1533,7 @@ This laning strategy is best suited for cases where one or more external applica |Property|Description|Default| |--------|-----------|-------| |`druid.query.scheduler.laning.lanes.{name}`|Maximum percent or exact limit of queries that can concurrently run in the defined lanes. Any number of lanes may be defined like this.|No default, must define at least one lane with a limit above 0. If `druid.query.scheduler.laning.isLimitPercent` is set to `true`, values must be integers in the range of 1 to 100.| -|`druid.query.scheduler.laning.isLimitPercent`|If set to `true`, the values set for `druid.query.scheduler.laning.lanes` will be treated as a percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`. Note that in this mode, these lane values acrross lanes are _not_ required to add up to, and can exceed, 100%.|`false`| +|`druid.query.scheduler.laning.isLimitPercent`|If set to `true`, the values set for `druid.query.scheduler.laning.lanes` will be treated as a percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`. Note that in this mode, these lane values across lanes are _not_ required to add up to, and can exceed, 100%.|`false`| ##### Server Configuration