From c0f1f3ec9442441b1a01dbc4eca07f029434fe69 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 26 Jul 2021 17:40:22 +0530 Subject: [PATCH 1/6] Add QueryContextTieredBrokerSelectorStrategy --- .../org/apache/druid/query/QueryContexts.java | 6 ++ ...ryContextTieredBrokerSelectorStrategy.java | 58 +++++++++++++++++++ .../router/TieredBrokerHostSelectorTest.java | 42 +++++++++++++- 3 files changed, 105 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategy.java diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 4979d7ce5971..a88156bf7183 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -66,6 +66,7 @@ public class QueryContexts public static final String USE_CACHE_KEY = "useCache"; public static final String SECONDARY_PARTITION_PRUNING_KEY = "secondaryPartitionPruning"; public static final String BY_SEGMENT_KEY = "bySegment"; + public static final String BROKER_SERVICE_NAME = "brokerServiceName"; public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; @@ -410,6 +411,11 @@ public static boolean allowReturnPartialResults(Query query, boolean defa return query.getContextBoolean(RETURN_PARTIAL_RESULTS_KEY, defaultValue); } + public static String getBrokerServiceName(Query query) + { + return query.getContextValue(BROKER_SERVICE_NAME); + } + static long parseLong(Query query, String key, long defaultValue) { final Object val = query.getContextValue(key); diff --git a/server/src/main/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategy.java b/server/src/main/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategy.java new file mode 100644 index 000000000000..8822d2001547 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategy.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.router; + +import com.google.common.base.Optional; +import org.apache.commons.lang.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; + +/** + * Implementation of {@link TieredBrokerSelectorStrategy} which uses the flag + * {@link QueryContexts#BROKER_SERVICE_NAME} in the Query context to select the + * Broker tier. + */ +public class QueryContextTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy +{ + + private static final Logger log = new Logger(QueryContextTieredBrokerSelectorStrategy.class); + + @Override + public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) + { + try { + String brokerServiceName = QueryContexts.getBrokerServiceName(query); + + // Verify that the brokerServiceName is valid + if (StringUtils.isEmpty(brokerServiceName) + || !tierConfig.getTierToBrokerMap().containsValue(brokerServiceName)) { + log.debug("Could not find Broker Service [%s] in TieredBrokerConfig", brokerServiceName); + return Optional.absent(); + } else { + return Optional.of(brokerServiceName); + } + } + catch (Exception e) { + log.error(e, "Error getting Broker Service name from Query Context"); + return Optional.absent(); + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java index 1ea675fdc3cb..6b23c5598126 100644 --- a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java @@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; @@ -135,7 +136,11 @@ public String getDefaultBrokerServiceName() } }, druidNodeDiscoveryProvider, - Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1)) + Arrays.asList( + new QueryContextTieredBrokerSelectorStrategy(), + new TimeBoundaryTieredBrokerSelectorStrategy(), + new PriorityTieredBrokerSelectorStrategy(0, 1) + ) ); brokerSelector.start(); @@ -293,6 +298,41 @@ public void testPrioritySelect2() Assert.assertEquals("hotBroker", brokerName); } + @Test + public void testSelectBasedOnQueryContext() + { + final Druids.TimeseriesQueryBuilder queryBuilder = + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .aggregators(Collections.singletonList(new CountAggregatorFactory("count"))) + .intervals( + new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.of("2009/2010")) + ) + ); + + Assert.assertEquals( + brokerSelector.getDefaultServiceName(), + brokerSelector.select(queryBuilder.build()).lhs + ); + Assert.assertEquals( + "hotBroker", + brokerSelector.select( + queryBuilder + .context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, "hotBroker")) + .build() + ).lhs + ); + Assert.assertEquals( + "coldBroker", + brokerSelector.select( + queryBuilder + .context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, "coldBroker")) + .build() + ).lhs + ); + } + @Test public void testGetAllBrokers() { From e785ce4bb8acb4e01ca123b742f7a062e8398ee4 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 26 Jul 2021 19:29:50 +0530 Subject: [PATCH 2/6] Add tests for QueryContextTieredBrokerSelectorStrategy --- docs/design/router.md | 13 ++ docs/querying/query-context.md | 1 + ...ryContextTieredBrokerSelectorStrategy.java | 61 ++++- .../router/TieredBrokerSelectorStrategy.java | 1 + ...ntextTieredBrokerSelectorStrategyTest.java | 214 ++++++++++++++++++ .../router/TieredBrokerHostSelectorTest.java | 2 +- 6 files changed, 280 insertions(+), 12 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategyTest.java diff --git a/docs/design/router.md b/docs/design/router.md index a2ccbd79bfde..3f9585b3f8b0 100644 --- a/docs/design/router.md +++ b/docs/design/router.md @@ -109,6 +109,19 @@ Including this strategy means all timeBoundary queries are always routed to the Queries with a priority set to less than minPriority are routed to the lowest priority Broker. Queries with priority set to greater than maxPriority are routed to the highest priority Broker. By default, minPriority is 0 and maxPriority is 1. Using these default values, if a query with priority 0 (the default query priority is 0) is sent, the query skips the priority selection logic. +#### queryContext + +This strategy reads the parameter `brokerServiceName` from the query context and routes the query accordingly. If no valid `brokerServiceName` is specified in the query context, the field `fallbackBrokerService` is used if set to a valid non-null value. + +*Example*: A strategy that routes queries to the Broker "druid:broker-hot" if no valid `brokerServiceName` is found in the query context. + +```json +{ + "type": "queryContext", + "fallbackBrokerService": "druid:broker-hot" +} +``` + #### JavaScript Allows defining arbitrary routing rules using a JavaScript function. The function is passed the configuration and the query to be executed, and returns the tier it should be routed to, or null for the default tier. diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index 54b09746a690..a635b0247d40 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -43,6 +43,7 @@ Unless otherwise noted, the following parameters apply to all query types. |priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| |lane | `null` | Query lane, used to control usage limits on classes of queries. See [Broker configuration](../configuration/index.md#broker) for more details.| |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | +|brokerServiceName| `null` | Broker service to which this query should be routed. This parameter is honored only by a broker selector strategy of type *queryContext*. See [Router strategies](../design/router.md#router-strategies) for more details.| |useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Apache Druid uses `druid.broker.cache.useCache` or `druid.historical.cache.useCache` to determine whether or not to read from the query cache | |populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateCache` or `druid.historical.cache.populateCache` to determine whether or not to save the results of this query to the query cache | |useResultLevelCache | `true` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses `druid.broker.cache.useResultLevelCache` to determine whether or not to read from the result-level query cache | diff --git a/server/src/main/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategy.java b/server/src/main/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategy.java index 8822d2001547..effd4429a6da 100644 --- a/server/src/main/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategy.java @@ -19,40 +19,79 @@ package org.apache.druid.server.router; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import org.apache.commons.lang.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; +import javax.annotation.Nullable; + /** - * Implementation of {@link TieredBrokerSelectorStrategy} which uses the flag + * Implementation of {@link TieredBrokerSelectorStrategy} which uses the parameter * {@link QueryContexts#BROKER_SERVICE_NAME} in the Query context to select the - * Broker tier. + * Broker Service. + *

+ * If the {@link #fallbackBrokerService} is set to a valid Broker Service Name, + * then all queries that do not specify a valid value for + * {@link QueryContexts#BROKER_SERVICE_NAME} would be directed to the + * {@code #fallbackBrokerService}. Note that the {@code fallbackBrokerService} + * can be different from the {@link TieredBrokerConfig#getDefaultBrokerServiceName()}. */ public class QueryContextTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy { - private static final Logger log = new Logger(QueryContextTieredBrokerSelectorStrategy.class); + private final String fallbackBrokerService; + + @JsonCreator + public QueryContextTieredBrokerSelectorStrategy( + @JsonProperty("fallbackBrokerService") @Nullable String fallbackBrokerService + ) + { + this.fallbackBrokerService = fallbackBrokerService; + } + @Override public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) { try { - String brokerServiceName = QueryContexts.getBrokerServiceName(query); + final String contextBrokerService = QueryContexts.getBrokerServiceName(query); - // Verify that the brokerServiceName is valid - if (StringUtils.isEmpty(brokerServiceName) - || !tierConfig.getTierToBrokerMap().containsValue(brokerServiceName)) { - log.debug("Could not find Broker Service [%s] in TieredBrokerConfig", brokerServiceName); - return Optional.absent(); + if (isValidBrokerService(contextBrokerService, tierConfig)) { + // If the broker service in the query context is valid, use that + return Optional.of(contextBrokerService); + } else if (isValidBrokerService(fallbackBrokerService, tierConfig)) { + // If the fallbackBrokerService is valid, use that + return Optional.of(fallbackBrokerService); } else { - return Optional.of(brokerServiceName); + log.debug( + "Could not find Broker Service [%s] or fallback [%s] in TieredBrokerConfig", + contextBrokerService, + fallbackBrokerService + ); + return Optional.absent(); } } catch (Exception e) { log.error(e, "Error getting Broker Service name from Query Context"); - return Optional.absent(); + return isValidBrokerService(fallbackBrokerService, tierConfig) + ? Optional.of(fallbackBrokerService) : Optional.absent(); } } + + private boolean isValidBrokerService(String brokerServiceName, TieredBrokerConfig tierConfig) + { + return !StringUtils.isEmpty(brokerServiceName) + && tierConfig.getTierToBrokerMap().containsValue(brokerServiceName); + } + + @VisibleForTesting + String getFallbackBrokerService() + { + return fallbackBrokerService; + } } diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java b/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java index f40dfd4ca292..ce1aa73593b7 100644 --- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java @@ -30,6 +30,7 @@ @JsonSubTypes(value = { @JsonSubTypes.Type(name = "timeBoundary", value = TimeBoundaryTieredBrokerSelectorStrategy.class), @JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class), + @JsonSubTypes.Type(name = "queryContext", value = QueryContextTieredBrokerSelectorStrategy.class), @JsonSubTypes.Type(name = "javascript", value = JavaScriptTieredBrokerSelectorStrategy.class) }) diff --git a/server/src/test/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategyTest.java b/server/src/test/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategyTest.java new file mode 100644 index 000000000000..5c59582519b7 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategyTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.router; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.LinkedHashMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class QueryContextTieredBrokerSelectorStrategyTest +{ + private TieredBrokerConfig tieredBrokerConfig; + private Druids.TimeseriesQueryBuilder queryBuilder; + + @Before + public void setup() + { + tieredBrokerConfig = new TieredBrokerConfig() + { + @Override + public String getDefaultBrokerServiceName() + { + return Names.BROKER_SVC_HOT; + } + + @Override + public LinkedHashMap getTierToBrokerMap() + { + LinkedHashMap tierToBrokerMap = new LinkedHashMap<>(); + tierToBrokerMap.put("hotTier", Names.BROKER_SVC_HOT); + tierToBrokerMap.put("mediumTier", Names.BROKER_SVC_MEDIUM); + tierToBrokerMap.put("coldTier", Names.BROKER_SVC_COLD); + + return tierToBrokerMap; + } + }; + + queryBuilder = + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .aggregators(Collections.singletonList(new CountAggregatorFactory("count"))) + .intervals( + new MultipleIntervalSegmentSpec( + Collections.singletonList(Intervals.of("2009/2010")) + ) + ); + } + + @Test + public void testSerde() throws Exception + { + final ObjectMapper mapper = new DefaultObjectMapper(); + + String json = "{\"type\":\"queryContext\"}"; + TieredBrokerSelectorStrategy strategy = mapper.readValue( + json, + TieredBrokerSelectorStrategy.class + ); + assertTrue(strategy instanceof QueryContextTieredBrokerSelectorStrategy); + + QueryContextTieredBrokerSelectorStrategy queryContextStrategy = + (QueryContextTieredBrokerSelectorStrategy) strategy; + assertNull(queryContextStrategy.getFallbackBrokerService()); + + json = "{\"type\":\"queryContext\",\"fallbackBrokerService\":\"hotBroker\"}"; + queryContextStrategy = mapper.readValue( + json, + QueryContextTieredBrokerSelectorStrategy.class + ); + assertEquals(queryContextStrategy.getFallbackBrokerService(), "hotBroker"); + } + + @Test + public void testGetBrokerServiceName() + { + final QueryContextTieredBrokerSelectorStrategy strategy = + new QueryContextTieredBrokerSelectorStrategy(null); + + assertEquals( + Optional.absent(), + strategy.getBrokerServiceName(tieredBrokerConfig, queryBuilder.build()) + ); + assertEquals( + Optional.absent(), + strategy.getBrokerServiceName( + tieredBrokerConfig, + queryBuilder + .context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.INVALID_BROKER)) + .build() + ) + ); + assertEquals( + Optional.of(Names.BROKER_SVC_HOT), + strategy.getBrokerServiceName( + tieredBrokerConfig, + queryBuilder + .context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.BROKER_SVC_HOT)) + .build() + ) + ); + assertEquals( + Optional.of(Names.BROKER_SVC_COLD), + strategy.getBrokerServiceName( + tieredBrokerConfig, + queryBuilder + .context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.BROKER_SVC_COLD)) + .build() + ) + ); + } + + @Test + public void testGetBrokerServiceName_withFallback() + { + final QueryContextTieredBrokerSelectorStrategy strategy = + new QueryContextTieredBrokerSelectorStrategy(Names.BROKER_SVC_MEDIUM); + + assertEquals( + Optional.of(Names.BROKER_SVC_MEDIUM), + strategy.getBrokerServiceName(tieredBrokerConfig, queryBuilder.build()) + ); + assertEquals( + Optional.of(Names.BROKER_SVC_MEDIUM), + strategy.getBrokerServiceName( + tieredBrokerConfig, + queryBuilder + .context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.INVALID_BROKER)) + .build() + ) + ); + assertEquals( + Optional.of(Names.BROKER_SVC_HOT), + strategy.getBrokerServiceName( + tieredBrokerConfig, + queryBuilder + .context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.BROKER_SVC_HOT)) + .build() + ) + ); + } + + @Test + public void testGetBrokerServiceName_withInvalidFallback() + { + final QueryContextTieredBrokerSelectorStrategy strategy = + new QueryContextTieredBrokerSelectorStrategy("noSuchBroker"); + + assertEquals( + Optional.absent(), + strategy.getBrokerServiceName(tieredBrokerConfig, queryBuilder.build()) + ); + assertEquals( + Optional.absent(), + strategy.getBrokerServiceName( + tieredBrokerConfig, + queryBuilder + .context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.INVALID_BROKER)) + .build() + ) + ); + assertEquals( + Optional.of(Names.BROKER_SVC_HOT), + strategy.getBrokerServiceName( + tieredBrokerConfig, + queryBuilder + .context(ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.BROKER_SVC_HOT)) + .build() + ) + ); + } + + /** + * Test constants. + */ + private static class Names + { + static final String BROKER_SVC_HOT = "druid/hotBroker"; + static final String BROKER_SVC_MEDIUM = "druid/mediumBroker"; + static final String BROKER_SVC_COLD = "druid/coldBroker"; + + static final String INVALID_BROKER = "invalidBroker"; + } +} diff --git a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java index 6b23c5598126..f1e3fb7481ff 100644 --- a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java @@ -137,7 +137,7 @@ public String getDefaultBrokerServiceName() }, druidNodeDiscoveryProvider, Arrays.asList( - new QueryContextTieredBrokerSelectorStrategy(), + new QueryContextTieredBrokerSelectorStrategy(null), new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1) ) From 5a40d19a1eee2190d396f5543674ca8595bff289 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 27 Jul 2021 09:24:38 +0530 Subject: [PATCH 3/6] Add test for QueryContexts --- docs/design/router.md | 4 +-- docs/querying/query-context.md | 2 +- .../org/apache/druid/query/QueryContexts.java | 2 +- .../apache/druid/query/QueryContextsTest.java | 32 +++++++++++++++++++ website/.spelling | 5 +++ 5 files changed, 41 insertions(+), 4 deletions(-) diff --git a/docs/design/router.md b/docs/design/router.md index 3f9585b3f8b0..79d7a2a254c3 100644 --- a/docs/design/router.md +++ b/docs/design/router.md @@ -111,9 +111,9 @@ Queries with a priority set to less than minPriority are routed to the lowest pr #### queryContext -This strategy reads the parameter `brokerServiceName` from the query context and routes the query accordingly. If no valid `brokerServiceName` is specified in the query context, the field `fallbackBrokerService` is used if set to a valid non-null value. +This strategy reads the parameter `brokerService` from the query context and routes the query accordingly. If no valid `brokerService` is specified in the query context, the field `fallbackBrokerService` is used if set to a valid non-null value. -*Example*: A strategy that routes queries to the Broker "druid:broker-hot" if no valid `brokerServiceName` is found in the query context. +*Example*: A strategy that routes queries to the Broker "druid:broker-hot" if no valid `brokerService` is found in the query context. ```json { diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index a635b0247d40..980a6609d313 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -43,7 +43,7 @@ Unless otherwise noted, the following parameters apply to all query types. |priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| |lane | `null` | Query lane, used to control usage limits on classes of queries. See [Broker configuration](../configuration/index.md#broker) for more details.| |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | -|brokerServiceName| `null` | Broker service to which this query should be routed. This parameter is honored only by a broker selector strategy of type *queryContext*. See [Router strategies](../design/router.md#router-strategies) for more details.| +|brokerService | `null` | Broker service to which this query should be routed. This parameter is honored only by a broker selector strategy of type *queryContext*. See [Router strategies](../design/router.md#router-strategies) for more details.| |useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Apache Druid uses `druid.broker.cache.useCache` or `druid.historical.cache.useCache` to determine whether or not to read from the query cache | |populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateCache` or `druid.historical.cache.populateCache` to determine whether or not to save the results of this query to the query cache | |useResultLevelCache | `true` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses `druid.broker.cache.useResultLevelCache` to determine whether or not to read from the result-level query cache | diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index a88156bf7183..4b16ad483a21 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -66,7 +66,7 @@ public class QueryContexts public static final String USE_CACHE_KEY = "useCache"; public static final String SECONDARY_PARTITION_PRUNING_KEY = "secondaryPartitionPruning"; public static final String BY_SEGMENT_KEY = "bySegment"; - public static final String BROKER_SERVICE_NAME = "brokerServiceName"; + public static final String BROKER_SERVICE_NAME = "brokerService"; public static final boolean DEFAULT_BY_SEGMENT = false; public static final boolean DEFAULT_POPULATE_CACHE = true; diff --git a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java index 593216c78389..4de31d1500a8 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java @@ -145,4 +145,36 @@ public void testGetEnableJoinLeftScanDirect() false ))); } + + @Test + public void testGetBrokerServiceName() + { + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))), + false, + new HashMap<>() + ); + + Assert.assertNull(QueryContexts.getBrokerServiceName(query)); + + query.getContext().put(QueryContexts.BROKER_SERVICE_NAME, "hotBroker"); + Assert.assertEquals("hotBroker", QueryContexts.getBrokerServiceName(query)); + } + + @Test + public void testGetBrokerServiceName_withNonStringValue() + { + Query query = new TestQuery( + new TableDataSource("test"), + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))), + false, + new HashMap<>() + ); + + query.getContext().put(QueryContexts.BROKER_SERVICE_NAME, 100); + + exception.expect(ClassCastException.class); + QueryContexts.getBrokerServiceName(query); + } } diff --git a/website/.spelling b/website/.spelling index a0a10254409e..1bd8878ca531 100644 --- a/website/.spelling +++ b/website/.spelling @@ -477,9 +477,12 @@ defaultUser inputSegmentSizeBytes skipOffsetFromLatest - ../docs/design/router.md +brokerService c3.2xlarge +fallbackBrokerService maxPriority minPriority +queryContext runtime.properties timeBoundary - ../docs/design/segments.md @@ -1426,6 +1429,7 @@ fieldAccess finalizingFieldAccess hyperUniqueCardinality - ../docs/querying/query-context.md +brokerService bySegment doubleSum druid.broker.cache.populateCache @@ -1445,6 +1449,7 @@ parallelMergeSmallBatchRows populateCache populateResultLevelCache queryId +queryContext row-matchers serializeDateTimeAsLong serializeDateTimeAsLongInner From 91799e5394ffc956038b9662a891e4d5b4bb5465 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 27 Jul 2021 12:37:10 +0530 Subject: [PATCH 4/6] Rename to ManualTieredBrokerSelectorStrategy --- docs/design/router.md | 8 ++--- docs/querying/query-context.md | 2 +- ...> ManualTieredBrokerSelectorStrategy.java} | 28 ++++++++--------- .../router/TieredBrokerSelectorStrategy.java | 2 +- ...nualTieredBrokerSelectorStrategyTest.java} | 30 +++++++++---------- .../router/TieredBrokerHostSelectorTest.java | 2 +- website/.spelling | 4 +-- 7 files changed, 37 insertions(+), 39 deletions(-) rename server/src/main/java/org/apache/druid/server/router/{QueryContextTieredBrokerSelectorStrategy.java => ManualTieredBrokerSelectorStrategy.java} (75%) rename server/src/test/java/org/apache/druid/server/router/{QueryContextTieredBrokerSelectorStrategyTest.java => ManualTieredBrokerSelectorStrategyTest.java} (85%) diff --git a/docs/design/router.md b/docs/design/router.md index 79d7a2a254c3..a7e8045e76bd 100644 --- a/docs/design/router.md +++ b/docs/design/router.md @@ -109,16 +109,16 @@ Including this strategy means all timeBoundary queries are always routed to the Queries with a priority set to less than minPriority are routed to the lowest priority Broker. Queries with priority set to greater than maxPriority are routed to the highest priority Broker. By default, minPriority is 0 and maxPriority is 1. Using these default values, if a query with priority 0 (the default query priority is 0) is sent, the query skips the priority selection logic. -#### queryContext +#### manual -This strategy reads the parameter `brokerService` from the query context and routes the query accordingly. If no valid `brokerService` is specified in the query context, the field `fallbackBrokerService` is used if set to a valid non-null value. +This strategy reads the parameter `brokerService` from the query context and routes the query accordingly. If no valid `brokerService` is specified in the query context, the field `defaultManualBrokerService` is used if set to a valid non-null value. *Example*: A strategy that routes queries to the Broker "druid:broker-hot" if no valid `brokerService` is found in the query context. ```json { - "type": "queryContext", - "fallbackBrokerService": "druid:broker-hot" + "type": "manual", + "defaultManualBrokerService": "druid:broker-hot" } ``` diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index 980a6609d313..a566b8bd702b 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -43,7 +43,7 @@ Unless otherwise noted, the following parameters apply to all query types. |priority | `0` | Query Priority. Queries with higher priority get precedence for computational resources.| |lane | `null` | Query lane, used to control usage limits on classes of queries. See [Broker configuration](../configuration/index.md#broker) for more details.| |queryId | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | -|brokerService | `null` | Broker service to which this query should be routed. This parameter is honored only by a broker selector strategy of type *queryContext*. See [Router strategies](../design/router.md#router-strategies) for more details.| +|brokerService | `null` | Broker service to which this query should be routed. This parameter is honored only by a broker selector strategy of type *manual*. See [Router strategies](../design/router.md#router-strategies) for more details.| |useCache | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Apache Druid uses `druid.broker.cache.useCache` or `druid.historical.cache.useCache` to determine whether or not to read from the query cache | |populateCache | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateCache` or `druid.historical.cache.populateCache` to determine whether or not to save the results of this query to the query cache | |useResultLevelCache | `true` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses `druid.broker.cache.useResultLevelCache` to determine whether or not to read from the result-level query cache | diff --git a/server/src/main/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategy.java b/server/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java similarity index 75% rename from server/src/main/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategy.java rename to server/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java index effd4429a6da..aa2c2985d3c5 100644 --- a/server/src/main/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java @@ -35,24 +35,24 @@ * {@link QueryContexts#BROKER_SERVICE_NAME} in the Query context to select the * Broker Service. *

- * If the {@link #fallbackBrokerService} is set to a valid Broker Service Name, + * If the {@link #defaultManualBrokerService} is set to a valid Broker Service Name, * then all queries that do not specify a valid value for * {@link QueryContexts#BROKER_SERVICE_NAME} would be directed to the * {@code #fallbackBrokerService}. Note that the {@code fallbackBrokerService} * can be different from the {@link TieredBrokerConfig#getDefaultBrokerServiceName()}. */ -public class QueryContextTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy +public class ManualTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy { - private static final Logger log = new Logger(QueryContextTieredBrokerSelectorStrategy.class); + private static final Logger log = new Logger(ManualTieredBrokerSelectorStrategy.class); - private final String fallbackBrokerService; + private final String defaultManualBrokerService; @JsonCreator - public QueryContextTieredBrokerSelectorStrategy( - @JsonProperty("fallbackBrokerService") @Nullable String fallbackBrokerService + public ManualTieredBrokerSelectorStrategy( + @JsonProperty("defaultManualBrokerService") @Nullable String defaultManualBrokerService ) { - this.fallbackBrokerService = fallbackBrokerService; + this.defaultManualBrokerService = defaultManualBrokerService; } @Override @@ -64,22 +64,22 @@ public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Quer if (isValidBrokerService(contextBrokerService, tierConfig)) { // If the broker service in the query context is valid, use that return Optional.of(contextBrokerService); - } else if (isValidBrokerService(fallbackBrokerService, tierConfig)) { + } else if (isValidBrokerService(defaultManualBrokerService, tierConfig)) { // If the fallbackBrokerService is valid, use that - return Optional.of(fallbackBrokerService); + return Optional.of(defaultManualBrokerService); } else { log.debug( "Could not find Broker Service [%s] or fallback [%s] in TieredBrokerConfig", contextBrokerService, - fallbackBrokerService + defaultManualBrokerService ); return Optional.absent(); } } catch (Exception e) { log.error(e, "Error getting Broker Service name from Query Context"); - return isValidBrokerService(fallbackBrokerService, tierConfig) - ? Optional.of(fallbackBrokerService) : Optional.absent(); + return isValidBrokerService(defaultManualBrokerService, tierConfig) + ? Optional.of(defaultManualBrokerService) : Optional.absent(); } } @@ -90,8 +90,8 @@ private boolean isValidBrokerService(String brokerServiceName, TieredBrokerConfi } @VisibleForTesting - String getFallbackBrokerService() + String getDefaultManualBrokerService() { - return fallbackBrokerService; + return defaultManualBrokerService; } } diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java b/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java index ce1aa73593b7..06f3a98c091d 100644 --- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java @@ -30,7 +30,7 @@ @JsonSubTypes(value = { @JsonSubTypes.Type(name = "timeBoundary", value = TimeBoundaryTieredBrokerSelectorStrategy.class), @JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class), - @JsonSubTypes.Type(name = "queryContext", value = QueryContextTieredBrokerSelectorStrategy.class), + @JsonSubTypes.Type(name = "manual", value = ManualTieredBrokerSelectorStrategy.class), @JsonSubTypes.Type(name = "javascript", value = JavaScriptTieredBrokerSelectorStrategy.class) }) diff --git a/server/src/test/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategyTest.java b/server/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java similarity index 85% rename from server/src/test/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategyTest.java rename to server/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java index 5c59582519b7..d5b85eeb399a 100644 --- a/server/src/test/java/org/apache/druid/server/router/QueryContextTieredBrokerSelectorStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java @@ -38,7 +38,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -public class QueryContextTieredBrokerSelectorStrategyTest +public class ManualTieredBrokerSelectorStrategyTest { private TieredBrokerConfig tieredBrokerConfig; private Druids.TimeseriesQueryBuilder queryBuilder; @@ -82,30 +82,30 @@ public void testSerde() throws Exception { final ObjectMapper mapper = new DefaultObjectMapper(); - String json = "{\"type\":\"queryContext\"}"; + String json = "{\"type\":\"manual\"}"; TieredBrokerSelectorStrategy strategy = mapper.readValue( json, TieredBrokerSelectorStrategy.class ); - assertTrue(strategy instanceof QueryContextTieredBrokerSelectorStrategy); + assertTrue(strategy instanceof ManualTieredBrokerSelectorStrategy); - QueryContextTieredBrokerSelectorStrategy queryContextStrategy = - (QueryContextTieredBrokerSelectorStrategy) strategy; - assertNull(queryContextStrategy.getFallbackBrokerService()); + ManualTieredBrokerSelectorStrategy queryContextStrategy = + (ManualTieredBrokerSelectorStrategy) strategy; + assertNull(queryContextStrategy.getDefaultManualBrokerService()); - json = "{\"type\":\"queryContext\",\"fallbackBrokerService\":\"hotBroker\"}"; + json = "{\"type\":\"manual\",\"defaultManualBrokerService\":\"hotBroker\"}"; queryContextStrategy = mapper.readValue( json, - QueryContextTieredBrokerSelectorStrategy.class + ManualTieredBrokerSelectorStrategy.class ); - assertEquals(queryContextStrategy.getFallbackBrokerService(), "hotBroker"); + assertEquals(queryContextStrategy.getDefaultManualBrokerService(), "hotBroker"); } @Test public void testGetBrokerServiceName() { - final QueryContextTieredBrokerSelectorStrategy strategy = - new QueryContextTieredBrokerSelectorStrategy(null); + final ManualTieredBrokerSelectorStrategy strategy = + new ManualTieredBrokerSelectorStrategy(null); assertEquals( Optional.absent(), @@ -143,8 +143,8 @@ public void testGetBrokerServiceName() @Test public void testGetBrokerServiceName_withFallback() { - final QueryContextTieredBrokerSelectorStrategy strategy = - new QueryContextTieredBrokerSelectorStrategy(Names.BROKER_SVC_MEDIUM); + final ManualTieredBrokerSelectorStrategy strategy = + new ManualTieredBrokerSelectorStrategy(Names.BROKER_SVC_MEDIUM); assertEquals( Optional.of(Names.BROKER_SVC_MEDIUM), @@ -173,8 +173,8 @@ public void testGetBrokerServiceName_withFallback() @Test public void testGetBrokerServiceName_withInvalidFallback() { - final QueryContextTieredBrokerSelectorStrategy strategy = - new QueryContextTieredBrokerSelectorStrategy("noSuchBroker"); + final ManualTieredBrokerSelectorStrategy strategy = + new ManualTieredBrokerSelectorStrategy("noSuchBroker"); assertEquals( Optional.absent(), diff --git a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java index f1e3fb7481ff..a4d78d13eae5 100644 --- a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java @@ -137,7 +137,7 @@ public String getDefaultBrokerServiceName() }, druidNodeDiscoveryProvider, Arrays.asList( - new QueryContextTieredBrokerSelectorStrategy(null), + new ManualTieredBrokerSelectorStrategy(null), new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1) ) diff --git a/website/.spelling b/website/.spelling index 1bd8878ca531..d4b568c8c037 100644 --- a/website/.spelling +++ b/website/.spelling @@ -479,10 +479,9 @@ skipOffsetFromLatest - ../docs/design/router.md brokerService c3.2xlarge -fallbackBrokerService +defaultManualBrokerService maxPriority minPriority -queryContext runtime.properties timeBoundary - ../docs/design/segments.md @@ -1449,7 +1448,6 @@ parallelMergeSmallBatchRows populateCache populateResultLevelCache queryId -queryContext row-matchers serializeDateTimeAsLong serializeDateTimeAsLongInner From 5f2b0ae1ee52c5aa6b4e17a8c2073128d25c3ed5 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 27 Jul 2021 12:54:10 +0530 Subject: [PATCH 5/6] Update docs/design/router.md Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> --- docs/design/router.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/design/router.md b/docs/design/router.md index a7e8045e76bd..c5a3b8dcb071 100644 --- a/docs/design/router.md +++ b/docs/design/router.md @@ -111,7 +111,7 @@ Queries with a priority set to less than minPriority are routed to the lowest pr #### manual -This strategy reads the parameter `brokerService` from the query context and routes the query accordingly. If no valid `brokerService` is specified in the query context, the field `defaultManualBrokerService` is used if set to a valid non-null value. +This strategy reads the parameter `brokerService` from the query context and routes the query to that broker service. If no valid `brokerService` is specified in the query context, the field `defaultManualBrokerService` is used to determine target broker service given the value is valid and non-null. A value is considered valid if it is present in `druid.router.tierToBrokerMap` *Example*: A strategy that routes queries to the Broker "druid:broker-hot" if no valid `brokerService` is found in the query context. @@ -216,4 +216,3 @@ druid.router.http.numMaxThreads=100 druid.server.http.numThreads=100 ``` - From 98a6be02c92d34095584a88ef387bd58118e7ab5 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 27 Jul 2021 14:43:01 +0530 Subject: [PATCH 6/6] Fix field names, change log level --- .../server/router/ManualTieredBrokerSelectorStrategy.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java b/server/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java index aa2c2985d3c5..2f1d45d2bc1d 100644 --- a/server/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java +++ b/server/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java @@ -38,7 +38,7 @@ * If the {@link #defaultManualBrokerService} is set to a valid Broker Service Name, * then all queries that do not specify a valid value for * {@link QueryContexts#BROKER_SERVICE_NAME} would be directed to the - * {@code #fallbackBrokerService}. Note that the {@code fallbackBrokerService} + * {@code #defaultManualBrokerService}. Note that the {@code defaultManualBrokerService} * can be different from the {@link TieredBrokerConfig#getDefaultBrokerServiceName()}. */ public class ManualTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy @@ -68,8 +68,8 @@ public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Quer // If the fallbackBrokerService is valid, use that return Optional.of(defaultManualBrokerService); } else { - log.debug( - "Could not find Broker Service [%s] or fallback [%s] in TieredBrokerConfig", + log.warn( + "Could not find Broker Service [%s] or default [%s] in TieredBrokerConfig", contextBrokerService, defaultManualBrokerService );