Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1524,7 +1524,16 @@ This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=h

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of`druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be in the range 1 to 100, and will be rounded up|No default, must be set if using this mode|
|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be an integer in the range 1 to 100, and will be rounded up|No default, must be set if using this mode|


###### 'Manual' laning strategy
This laning strategy is best suited for cases where one or more external applications which query Druid are capable of manually deciding what lane a given query should belong to. Configured with a map of lane names to percent or exact max capacities, queries with a matching `lane` parameter in the [query context](../querying/query-context.md) will be subjected to those limits.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.scheduler.laning.lanes.{name}`|Maximum percent or exact limit of queries that can concurrently run in the defined lanes. Any number of lanes may be defined like this.|No default, must define at least one lane with a limit above 0. If `druid.query.scheduler.laning.isLimitPercent` is set to `true`, values must be integers in the range of 1 to 100.|
|`druid.query.scheduler.laning.isLimitPercent`|If set to `true`, the values set for `druid.query.scheduler.laning.lanes` will be treated as a percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`. Note that in this mode, these lane values across lanes are _not_ required to add up to, and can exceed, 100%.|`false`|

##### Server Configuration

Expand Down
2 changes: 2 additions & 0 deletions integration-tests/docker/environment-configs/broker
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ druid_cache_sizeInBytes=40000000
druid_auth_basic_common_cacheDirectory=/tmp/authCache/broker
druid_sql_avatica_enable=true
druid_server_https_crlPath=/tls/revocations.crl
druid_query_scheduler_laning_strategy=manual
druid_query_scheduler_laning_lanes_one=1
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;

public abstract class AbstractQueryResourceTestClient<QueryType>
{
Expand Down Expand Up @@ -87,4 +88,19 @@ public List<Map<String, Object>> query(String url, QueryType query)
}
}

public Future<StatusResponseHolder> queryAsync(String url, QueryType query)
{
try {
return httpClient.go(
new Request(HttpMethod.POST, new URL(url)).setContent(
"application/json",
jsonMapper.writeValueAsBytes(query)
),
StatusResponseHandler.getInstance()
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.net.URL;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -262,13 +263,29 @@ public Map<String, Object> initializeLookups(String filePath) throws Exception
return results2;
}

@Nullable
private Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> getLookupLoadStatus()
{
String url = StringUtils.format("%slookups/nodeStatus", getCoordinatorURL());

Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> status;
try {
StatusResponseHolder response = makeRequest(HttpMethod.GET, url);
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.GET, new URL(url)),
responseHandler
).get();

if (response.getStatus().getCode() == HttpResponseStatus.NOT_FOUND.getCode()) {
return null;
}
if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
throw new ISE(
"Error while making request to url[%s] status[%s] content[%s]",
url,
response.getStatus(),
response.getContent()
);
}

status = jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>>>()
Expand All @@ -286,6 +303,10 @@ public boolean areLookupsLoaded(String lookup)
{
final Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> status = getLookupLoadStatus();

if (status == null) {
return false;
}

final Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> defaultTier = status.get("__default");

boolean isLoaded = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public abstract class AbstractTestQueryHelper<QueryResultType extends AbstractQu
{

public static final Logger LOG = new Logger(TestQueryHelper.class);

private final AbstractQueryResourceTestClient queryClient;
private final ObjectMapper jsonMapper;
protected final String broker;
Expand All @@ -64,7 +65,7 @@ public abstract class AbstractTestQueryHelper<QueryResultType extends AbstractQu

public abstract void testQueriesFromFile(String filePath, int timesToRun) throws Exception;

protected abstract String getQueryURL(String schemeAndHost);
public abstract String getQueryURL(String schemeAndHost);

public void testQueriesFromFile(String url, String filePath, int timesToRun) throws Exception
{
Expand Down Expand Up @@ -145,5 +146,4 @@ public int countRows(String dataSource, String interval)
return (Integer) map.get("rows");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testQueriesFromFile(String filePath, int timesToRun) throws Exceptio
}

@Override
protected String getQueryURL(String schemeAndHost)
public String getQueryURL(String schemeAndHost)
{
return StringUtils.format("%s/druid/v2/sql", schemeAndHost);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testQueriesFromString(String str, int timesToRun) throws Exception


@Override
protected String getQueryURL(String schemeAndHost)
public String getQueryURL(String schemeAndHost)
{
return StringUtils.format("%s/druid/v2?pretty", schemeAndHost);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,31 @@

package org.apache.druid.tests.query;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.server.QueryCapacityExceededException;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.QueryResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Future;

@Test(groups = TestNGGroup.QUERY)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITWikipediaQueryTest
Expand All @@ -42,6 +57,10 @@ public class ITWikipediaQueryTest
private CoordinatorResourceTestClient coordinatorClient;
@Inject
private TestQueryHelper queryHelper;
@Inject
private QueryResourceTestClient queryClient;
@Inject
private IntegrationTestingConfig config;

@BeforeMethod
public void before() throws Exception
Expand All @@ -51,15 +70,112 @@ public void before() throws Exception
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
);
coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE);
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia lookup load"
);
if (!coordinatorClient.areLookupsLoaded(WIKI_LOOKUP)) {
coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE);
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia lookup load"
);
}
}

@Test
public void testWikipediaQueriesFromFile() throws Exception
{
queryHelper.testQueriesFromFile(WIKIPEDIA_QUERIES_RESOURCE, 2);
}

@Test
Copy link
Copy Markdown
Contributor

@maytasm maytasm Mar 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also have a test for when the Manual Lane From Context That Arent In Map is given? For example, if lane in query context is "some-unknown-lane"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess they will always be rejected since we give NO_CAPACITY right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ update typo above. Meant to say "also" have a test for case when the Manual Lane From Context That Arent In Map is given

public void testQueryLaningLaneIsLimited() throws Exception
{
// the broker is configured with a manually defined query lane, 'one' with limit 1
// -Ddruid.query.scheduler.laning.type=manual
// -Ddruid.query.scheduler.laning.lanes.one=1
// by issuing 50 queries, at least 1 of them will succeed on 'one', and at least 1 of them will overlap enough to
// get limited
final int numQueries = 50;
List<Future<StatusResponseHolder>> futures = new ArrayList<>(numQueries);
for (int i = 0; i < numQueries; i++) {
futures.add(
queryClient.queryAsync(
queryHelper.getQueryURL(config.getBrokerUrl()),
getQueryBuilder().build()
)
);
}

int success = 0;
int limited = 0;

for (Future<StatusResponseHolder> future : futures) {
StatusResponseHolder status = future.get();
if (status.getStatus().getCode() == QueryCapacityExceededException.STATUS_CODE) {
limited++;
Assert.assertTrue(status.getContent().contains(StringUtils.format(QueryCapacityExceededException.ERROR_MESSAGE_TEMPLATE, "one")));
} else if (status.getStatus().getCode() == HttpResponseStatus.OK.getCode()) {
success++;
}
}

Assert.assertTrue(success > 0);
Assert.assertTrue(limited > 0);

// test another to make sure we can still issue one query at a time
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be separate into another @test method. Will make understanding each test case easier.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is testing a sequence of events though, so it belongs in the same test

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the top part is test where we exceed and expect rejection. The bottom part is test where we are always under the limit and never exceed hence expect all to be successful.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a separate test which runs a bunch of queries the same manner that do not belong to a lane to make sure they are not limited

StatusResponseHolder followUp = queryClient.queryAsync(
queryHelper.getQueryURL(config.getBrokerUrl()),
getQueryBuilder().build()
).get();

Assert.assertEquals(HttpResponseStatus.OK.getCode(), followUp.getStatus().getCode());

StatusResponseHolder andAnother = queryClient.queryAsync(
queryHelper.getQueryURL(config.getBrokerUrl()),
getQueryBuilder().build()
).get();

Assert.assertEquals(HttpResponseStatus.OK.getCode(), andAnother.getStatus().getCode());
}

@Test
public void testQueryLaningWithNoLane() throws Exception
{
// the broker is configured with a manually defined query lane, 'one' with limit 1
// -Ddruid.query.scheduler.laning.type=manual
// -Ddruid.query.scheduler.laning.lanes.one=1
// these queries will not belong to the lane so none of them should be limited
final int numQueries = 50;
List<Future<StatusResponseHolder>> futures = new ArrayList<>(numQueries);
for (int i = 0; i < numQueries; i++) {
futures.add(
queryClient.queryAsync(
queryHelper.getQueryURL(config.getBrokerUrl()),
getQueryBuilder().context(ImmutableMap.of("queryId", UUID.randomUUID().toString())).build()
)
);
}

int success = 0;
int limited = 0;

for (Future<StatusResponseHolder> future : futures) {
StatusResponseHolder status = future.get();
if (status.getStatus().getCode() == QueryCapacityExceededException.STATUS_CODE) {
limited++;
} else if (status.getStatus().getCode() == HttpResponseStatus.OK.getCode()) {
success++;
}
}

Assert.assertTrue(success > 0);
Assert.assertEquals(limited, 0);

}

private Druids.TimeseriesQueryBuilder getQueryBuilder()
{
return Druids.newTimeseriesQueryBuilder()
.dataSource("wikipedia_editstream")
.aggregators(new CountAggregatorFactory("chocula"))
.intervals("2013-01-01T00:00:00.000/2013-01-08T00:00:00.000")
.context(ImmutableMap.of("lane", "one", "queryId", UUID.randomUUID().toString()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
import org.apache.druid.server.scheduling.ManualQueryLaningStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;

import java.util.Optional;
Expand All @@ -34,7 +36,8 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = NoQueryLaningStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "none", value = NoQueryLaningStrategy.class),
@JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class)
@JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class),
@JsonSubTypes.Type(name = "manual", value = ManualQueryLaningStrategy.class)
})
public interface QueryLaningStrategy
{
Expand All @@ -50,4 +53,9 @@ public interface QueryLaningStrategy
* This method must be thread safe
*/
<T> Optional<String> computeLane(QueryPlus<T> query, Set<SegmentServerSelector> segments);

default int computeLimitFromPercent(int totalLimit, int value)
{
return Ints.checkedCast((long) Math.ceil(totalLimit * ((double) value / 100)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
*/
public class QueryScheduler implements QueryWatcher
{
private static final int NO_CAPACITY = -1;
public static final int UNAVAILABLE = -1;
static final String TOTAL = "default";
private final int totalCapacity;
private final QueryPrioritizationStrategy prioritizationStrategy;
Expand Down Expand Up @@ -173,7 +173,7 @@ int getTotalAvailableCapacity()
{
return laneRegistry.getConfiguration(TOTAL)
.map(config -> laneRegistry.bulkhead(TOTAL, config).getMetrics().getAvailableConcurrentCalls())
.orElse(NO_CAPACITY);
.orElse(UNAVAILABLE);
}

/**
Expand All @@ -184,7 +184,7 @@ int getLaneAvailableCapacity(String lane)
{
return laneRegistry.getConfiguration(lane)
.map(config -> laneRegistry.bulkhead(lane, config).getMetrics().getAvailableConcurrentCalls())
.orElse(NO_CAPACITY);
.orElse(UNAVAILABLE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.Object2IntArrayMap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
Expand Down Expand Up @@ -60,7 +59,7 @@ public HiLoQueryLaningStrategy(
public Object2IntMap<String> getLaneLimits(int totalLimit)
{
Object2IntMap<String> onlyLow = new Object2IntArrayMap<>(1);
onlyLow.put(LOW, Ints.checkedCast((long) Math.ceil(totalLimit * ((double) maxLowPercent / 100))));
onlyLow.put(LOW, computeLimitFromPercent(totalLimit, maxLowPercent));
return onlyLow;
}

Expand Down
Loading