From abf7dda6e2024f06595f8347c3d5958e681015c1 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 9 Aug 2021 18:49:54 +0530 Subject: [PATCH 01/12] Move classes from druid-server to druid-services --- .../server/AsyncQueryForwardingServlet.java | 104 +++++++++++++++++- .../druid/server/http/RouterResource.java | 0 .../server/router/CoordinatorRuleManager.java | 0 ...avaScriptTieredBrokerSelectorStrategy.java | 0 .../ManualTieredBrokerSelectorStrategy.java | 0 .../PriorityTieredBrokerSelectorStrategy.java | 0 .../druid/server/router/QueryHostFinder.java | 54 ++++----- .../server/router/TieredBrokerConfig.java | 0 .../router/TieredBrokerHostSelector.java | 37 ++++++- ...ieredBrokerSelectorStrategiesProvider.java | 0 .../router/TieredBrokerSelectorStrategy.java | 5 + ...eBoundaryTieredBrokerSelectorStrategy.java | 0 .../AsyncQueryForwardingServletTest.java | 49 +++++---- .../security/SecurityResourceFilterTest.java | 0 .../router/CoordinatorRuleManagerTest.java | 0 ...criptTieredBrokerSelectorStrategyTest.java | 0 ...anualTieredBrokerSelectorStrategyTest.java | 0 .../server/router/QueryHostFinderTest.java | 0 .../router/TieredBrokerHostSelectorTest.java | 0 19 files changed, 195 insertions(+), 54 deletions(-) rename {server => services}/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java (86%) rename {server => services}/src/main/java/org/apache/druid/server/http/RouterResource.java (100%) rename {server => services}/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java (100%) rename {server => services}/src/main/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategy.java (100%) rename {server => services}/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java (100%) rename {server => services}/src/main/java/org/apache/druid/server/router/PriorityTieredBrokerSelectorStrategy.java (100%) rename {server => services}/src/main/java/org/apache/druid/server/router/QueryHostFinder.java (83%) rename {server => services}/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java (100%) rename {server => services}/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java (90%) rename {server => services}/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategiesProvider.java (100%) rename {server => services}/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java (93%) rename {server => services}/src/main/java/org/apache/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java (100%) rename {server => services}/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java (94%) rename {server => services}/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java (100%) rename {server => services}/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java (100%) rename {server => services}/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java (100%) rename {server => services}/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java (100%) rename {server => services}/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java (100%) rename {server => services}/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java (100%) diff --git a/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java similarity index 86% rename from server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java rename to services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java index 253fe1da4a30..00c6e6a9519a 100644 --- a/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java +++ b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java @@ -68,6 +68,7 @@ import javax.ws.rs.core.Response.Status; import java.io.IOException; import java.util.Map; +import java.util.Properties; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -96,6 +97,8 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu private final AtomicLong failedQueryCount = new AtomicLong(); private final AtomicLong interruptedQueryCount = new AtomicLong(); + private final boolean routeSqlQueries; + private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception) throws IOException { @@ -137,7 +140,8 @@ public AsyncQueryForwardingServlet( ServiceEmitter emitter, RequestLogger requestLogger, GenericQueryMetricsFactory queryMetricsFactory, - AuthenticatorMapper authenticatorMapper + AuthenticatorMapper authenticatorMapper, + Properties properties ) { this.warehouse = warehouse; @@ -151,6 +155,9 @@ public AsyncQueryForwardingServlet( this.queryMetricsFactory = queryMetricsFactory; this.authenticatorMapper = authenticatorMapper; this.protobufTranslation = new ProtobufTranslationImpl(); + this.routeSqlQueries = Boolean.parseBoolean( + properties.getProperty("druid.router.sql.enable", "false") + ); } @Override @@ -195,7 +202,8 @@ protected void service(HttpServletRequest request, HttpServletResponse response) // The Router does not have the ability to look inside SQL queries and route them intelligently, so just treat // them as a generic request. - final boolean isQueryEndpoint = requestURI.startsWith("/druid/v2") && !requestURI.startsWith("/druid/v2/sql"); + final boolean isNativeQueryEndpoint = requestURI.startsWith("/druid/v2") && !requestURI.startsWith("/druid/v2/sql"); + final boolean isSqlQueryEndpoint = requestURI.startsWith("/druid/v2/sql"); final boolean isAvaticaJson = requestURI.startsWith("/druid/v2/sql/avatica"); final boolean isAvaticaPb = requestURI.startsWith("/druid/v2/sql/avatica-protobuf"); @@ -215,7 +223,7 @@ protected void service(HttpServletRequest request, HttpServletResponse response) targetServer = hostFinder.findServerAvatica(connectionId); byte[] requestBytes = objectMapper.writeValueAsBytes(requestMap); request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes); - } else if (isQueryEndpoint && HttpMethod.DELETE.is(method)) { + } else if (isNativeQueryEndpoint && HttpMethod.DELETE.is(method)) { // query cancellation request targetServer = hostFinder.pickDefaultServer(); @@ -244,7 +252,7 @@ protected void service(HttpServletRequest request, HttpServletResponse response) } interruptedQueryCount.incrementAndGet(); } - } else if (isQueryEndpoint && HttpMethod.POST.is(method)) { + } else if (isNativeQueryEndpoint && HttpMethod.POST.is(method)) { // query request try { Query inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); @@ -282,6 +290,17 @@ protected void service(HttpServletRequest request, HttpServletResponse response) handleException(response, objectMapper, e); return; } + } else if (routeSqlQueries && isSqlQueryEndpoint && HttpMethod.DELETE.is(method)) { + // Cancel SQL query + targetServer = cancelSqlQuery(request); + } else if (routeSqlQueries && isSqlQueryEndpoint && HttpMethod.POST.is(method)) { + // Submit a SQL query + try { + targetServer = submitSqlQuery(request, response, objectMapper); + } catch (Exception e) { + handleException(response, objectMapper, e); + return; + } } else { targetServer = hostFinder.pickDefaultServer(); } @@ -292,6 +311,83 @@ protected void service(HttpServletRequest request, HttpServletResponse response) doService(request, response); } + private Server cancelSqlQuery(HttpServletRequest request) + { + final Server targetServer = hostFinder.pickDefaultServer(); + + for (final Server server : hostFinder.getAllServers()) { + // send query cancellation to all brokers this query may have gone to + // to keep the code simple, the proxy servlet will also send a request to the default targetServer. + if (!server.getHost().equals(targetServer.getHost())) { + // issue async requests + Response.CompleteListener completeListener = result -> { + if (result.isFailed()) { + log.warn( + result.getFailure(), + "Failed to forward SQL cancellation request to [%s]", + server.getHost() + ); + } + }; + + Request broadcastReq = broadcastClient + .newRequest(rewriteURI(request, server.getScheme(), server.getHost())) + .method(HttpMethod.DELETE) + .timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + + copyRequestHeaders(request, broadcastReq); + broadcastReq.send(completeListener); + } + interruptedQueryCount.incrementAndGet(); + } + + return targetServer; + } + + private Server submitSqlQuery( + HttpServletRequest request, + HttpServletResponse response, + ObjectMapper objectMapper + ) throws IOException + { + try { + Server targetServer; + Object inputSqlQuery = null; + // TODO: Query inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); + if (inputSqlQuery != null) { + targetServer = hostFinder.findServerSql(); + /* if (inputQuery.getId() == null) { + inputQuery = inputQuery.withId(UUID.randomUUID().toString()); + }*/ + } else { + targetServer = hostFinder.pickDefaultServer(); + } + // TODO: request.setAttribute(QUERY_ATTRIBUTE, inputQuery); + return targetServer; + } + catch (IOException e) { + log.warn(e, "Exception parsing query"); + final String errorMessage = e.getMessage() == null ? "no error message" : e.getMessage(); + requestLogger.logSqlQuery( + RequestLogLine.forSql( + null, + null, + DateTimes.nowUtc(), + request.getRemoteAddr(), + new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage)) + ) + ); + response.setStatus(HttpServletResponse.SC_BAD_REQUEST); + response.setContentType(MediaType.APPLICATION_JSON); + objectMapper.writeValue( + response.getOutputStream(), + ImmutableMap.of("error", errorMessage) + ); + + return targetServer; + } + } + protected void doService( HttpServletRequest request, HttpServletResponse response diff --git a/server/src/main/java/org/apache/druid/server/http/RouterResource.java b/services/src/main/java/org/apache/druid/server/http/RouterResource.java similarity index 100% rename from server/src/main/java/org/apache/druid/server/http/RouterResource.java rename to services/src/main/java/org/apache/druid/server/http/RouterResource.java diff --git a/server/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java b/services/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java similarity index 100% rename from server/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java rename to services/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java diff --git a/server/src/main/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategy.java b/services/src/main/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategy.java similarity index 100% rename from server/src/main/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategy.java rename to services/src/main/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategy.java diff --git a/server/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java b/services/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java similarity index 100% rename from server/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java rename to services/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java diff --git a/server/src/main/java/org/apache/druid/server/router/PriorityTieredBrokerSelectorStrategy.java b/services/src/main/java/org/apache/druid/server/router/PriorityTieredBrokerSelectorStrategy.java similarity index 100% rename from server/src/main/java/org/apache/druid/server/router/PriorityTieredBrokerSelectorStrategy.java rename to services/src/main/java/org/apache/druid/server/router/PriorityTieredBrokerSelectorStrategy.java diff --git a/server/src/main/java/org/apache/druid/server/router/QueryHostFinder.java b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java similarity index 83% rename from server/src/main/java/org/apache/druid/server/router/QueryHostFinder.java rename to services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java index 0494f9213053..3c69403ae444 100644 --- a/server/src/main/java/org/apache/druid/server/router/QueryHostFinder.java +++ b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java @@ -74,13 +74,12 @@ public Collection getAllServers() public Server findServerAvatica(String connectionId) { Server chosenServer = avaticaConnectionBalancer.pickServer(getAllServers(), connectionId); - if (chosenServer == null) { - log.makeAlert( - "Catastrophic failure! No servers found at all! Failing request!" - ).emit(); + assertServerFound( + chosenServer, + "No server found for Avatica request with connectionId[%s]", + connectionId + ); - throw new ISE("No server found for Avatica request with connectionId[%s]", connectionId); - } log.debug( "Balancer class [%s] sending request with connectionId[%s] to server: %s", avaticaConnectionBalancer.getClass(), @@ -90,35 +89,24 @@ public Server findServerAvatica(String connectionId) return chosenServer; } + public Server findServerSql() + { + Server server = findServerInner(hostSelector.selectForSql()); + assertServerFound(server, "No server found for SQL Query [%s]", "SELECT IT"); + return server; + } + public Server pickServer(Query query) { Server server = findServer(query); - - if (server == null) { - log.makeAlert( - "Catastrophic failure! No servers found at all! Failing request!" - ).emit(); - - throw new ISE("No server found for query[%s]", query); - } - - log.debug("Selected [%s]", server.getHost()); - + assertServerFound(server, "No server found for query[%s]", query); return server; } public Server pickDefaultServer() { Server server = findDefaultServer(); - - if (server == null) { - log.makeAlert( - "Catastrophic failure! No servers found at all! Failing request!" - ).emit(); - - throw new ISE("No default server found!"); - } - + assertServerFound(server, "No default server found!"); return server; } @@ -155,4 +143,18 @@ private Server findServerInner(final Pair selected) return server; } + + private void assertServerFound(Server server, String messageFormat, Object... args) + { + if (server != null) { + log.debug("Selected [%s]", server.getHost()); + return; + } + + log.makeAlert( + "Catastrophic failure! No servers found at all! Failing request!" + ).emit(); + + throw new ISE(messageFormat, args); + } } diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java b/services/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java similarity index 100% rename from server/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java rename to services/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java similarity index 90% rename from server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java rename to services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java index a15aa5b292c6..6d0189682eef 100644 --- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java +++ b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java @@ -243,8 +243,12 @@ public Pair select(final Query query) brokerServiceName = tierConfig.getDefaultBrokerServiceName(); } - NodesHolder nodesHolder = servers.get(brokerServiceName); + return getServerPair(brokerServiceName); + } + private Pair getServerPair(String brokerServiceName) + { + NodesHolder nodesHolder = servers.get(brokerServiceName); if (nodesHolder == null) { log.error( "No nodesHolder found for brokerServiceName[%s]. Using default selector for[%s]", @@ -257,6 +261,37 @@ public Pair select(final Query query) return new Pair<>(brokerServiceName, nodesHolder.pick()); } + public Pair selectForSql() + { + synchronized (lock) { + if (!ruleManager.isStarted() || !started) { + return getDefaultLookup(); + } + } + + // Resolve brokerServiceName using Tier selector strategies + String brokerServiceName = null; + for (TieredBrokerSelectorStrategy strategy : strategies) { + final Optional optionalName = strategy.getBrokerServiceName(tierConfig); + if (optionalName.isPresent()) { + brokerServiceName = optionalName.get(); + break; + } + } + + // Use defaut if not resolved by strategies + if (brokerServiceName == null) { + log.error( + "No brokerServiceName found for SQL Query [%s]. Using default selector for [%s].", + "SELECT stuff", + tierConfig.getDefaultBrokerServiceName() + ); + brokerServiceName = tierConfig.getDefaultBrokerServiceName(); + } + + return getServerPair(brokerServiceName); + } + public Pair getDefaultLookup() { final String brokerServiceName = tierConfig.getDefaultBrokerServiceName(); diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategiesProvider.java b/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategiesProvider.java similarity index 100% rename from server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategiesProvider.java rename to services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategiesProvider.java diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java b/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java similarity index 93% rename from server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java rename to services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java index 06f3a98c091d..71b24556d4a5 100644 --- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java +++ b/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java @@ -37,4 +37,9 @@ public interface TieredBrokerSelectorStrategy { Optional getBrokerServiceName(TieredBrokerConfig config, Query query); + + default Optional getBrokerServiceName(TieredBrokerConfig config) + { + return Optional.absent(); + } } diff --git a/server/src/main/java/org/apache/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java b/services/src/main/java/org/apache/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java similarity index 100% rename from server/src/main/java/org/apache/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java rename to services/src/main/java/org/apache/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java diff --git a/server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java similarity index 94% rename from server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java rename to services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java index b7767be043b7..0689aa38fce5 100644 --- a/server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java +++ b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java @@ -89,6 +89,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Deflater; @@ -262,7 +263,8 @@ public int read() new NoopServiceEmitter(), new NoopRequestLogger(), new DefaultGenericQueryMetricsFactory(), - new AuthenticatorMapper(ImmutableMap.of()) + new AuthenticatorMapper(ImmutableMap.of()), + new Properties() ) { @Override @@ -354,7 +356,8 @@ public Collection getAllServers() new NoopServiceEmitter(), new NoopRequestLogger(), new DefaultGenericQueryMetricsFactory(), - new AuthenticatorMapper(ImmutableMap.of()) + new AuthenticatorMapper(ImmutableMap.of()), + new Properties() ) { @Override @@ -477,32 +480,32 @@ public void testGetAvaticaProtobufConnectionId() final int maxNumRows = 1000; final List avaticaRequests = ImmutableList.of( - new Service.CatalogsRequest(connectionId), - new Service.SchemasRequest(connectionId, "druid", null), - new Service.TablesRequest(connectionId, "druid", "druid", null, null), - new Service.ColumnsRequest(connectionId, "druid", "druid", "someTable", null), - new Service.PrepareAndExecuteRequest( - connectionId, - statementId, - query, - maxNumRows - ), - new Service.PrepareRequest(connectionId, query, maxNumRows), - new Service.ExecuteRequest( - new Meta.StatementHandle(connectionId, statementId, null), - ImmutableList.of(), - maxNumRows - ), - new Service.CloseStatementRequest(connectionId, statementId), - new Service.CloseConnectionRequest(connectionId) + new Service.CatalogsRequest(connectionId), + new Service.SchemasRequest(connectionId, "druid", null), + new Service.TablesRequest(connectionId, "druid", "druid", null, null), + new Service.ColumnsRequest(connectionId, "druid", "druid", "someTable", null), + new Service.PrepareAndExecuteRequest( + connectionId, + statementId, + query, + maxNumRows + ), + new Service.PrepareRequest(connectionId, query, maxNumRows), + new Service.ExecuteRequest( + new Meta.StatementHandle(connectionId, statementId, null), + ImmutableList.of(), + maxNumRows + ), + new Service.CloseStatementRequest(connectionId, statementId), + new Service.CloseConnectionRequest(connectionId) ); for (Service.Request request : avaticaRequests) { Assert.assertEquals( - "failed", - connectionId, - AsyncQueryForwardingServlet.getAvaticaProtobufConnectionId(request) + "failed", + connectionId, + AsyncQueryForwardingServlet.getAvaticaProtobufConnectionId(request) ); } } diff --git a/server/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java b/services/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java similarity index 100% rename from server/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java rename to services/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java diff --git a/server/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java b/services/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java similarity index 100% rename from server/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java rename to services/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java diff --git a/server/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java b/services/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java similarity index 100% rename from server/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java rename to services/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java diff --git a/server/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java b/services/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java similarity index 100% rename from server/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java rename to services/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java diff --git a/server/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java b/services/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java similarity index 100% rename from server/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java rename to services/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java diff --git a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java b/services/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java similarity index 100% rename from server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java rename to services/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java From bfbb749b8537ac71c1606ed8ec15999f3d78ece5 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 10 Aug 2021 10:09:31 +0530 Subject: [PATCH 02/12] Add property druid.router.sql.enable for SQL routing --- .../org/apache/druid/query/QueryContexts.java | 4 +- .../apache/druid/query/QueryContextsTest.java | 27 +-- services/pom.xml | 26 +++ .../server/AsyncQueryForwardingServlet.java | 179 +++++++++--------- .../ManualTieredBrokerSelectorStrategy.java | 21 +- .../druid/server/router/QueryHostFinder.java | 12 +- .../router/TieredBrokerHostSelector.java | 14 +- .../router/TieredBrokerSelectorStrategy.java | 3 +- ...anualTieredBrokerSelectorStrategyTest.java | 52 +++++ 9 files changed, 211 insertions(+), 127 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 4a293e56cf7a..91156314c19f 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -418,9 +418,9 @@ public static boolean allowReturnPartialResults(Query query, boolean defa return query.getContextBoolean(RETURN_PARTIAL_RESULTS_KEY, defaultValue); } - public static String getBrokerServiceName(Query query) + public static String getBrokerServiceName(Map queryContext) { - return query.getContextValue(BROKER_SERVICE_NAME); + return queryContext == null ? null : (String) queryContext.get(BROKER_SERVICE_NAME); } static long parseLong(Query query, String key, long defaultValue) diff --git a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java index 764a81aec3ff..61b35f290401 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java @@ -30,6 +30,7 @@ import org.junit.rules.ExpectedException; import java.util.HashMap; +import java.util.Map; public class QueryContextsTest { @@ -149,33 +150,21 @@ public void testGetEnableJoinLeftScanDirect() @Test public void testGetBrokerServiceName() { - Query query = new TestQuery( - new TableDataSource("test"), - new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))), - false, - new HashMap<>() - ); - - Assert.assertNull(QueryContexts.getBrokerServiceName(query)); + Map queryContext = new HashMap<>(); + Assert.assertNull(QueryContexts.getBrokerServiceName(queryContext)); - query.getContext().put(QueryContexts.BROKER_SERVICE_NAME, "hotBroker"); - Assert.assertEquals("hotBroker", QueryContexts.getBrokerServiceName(query)); + queryContext.put(QueryContexts.BROKER_SERVICE_NAME, "hotBroker"); + Assert.assertEquals("hotBroker", QueryContexts.getBrokerServiceName(queryContext)); } @Test public void testGetBrokerServiceName_withNonStringValue() { - Query query = new TestQuery( - new TableDataSource("test"), - new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))), - false, - new HashMap<>() - ); - - query.getContext().put(QueryContexts.BROKER_SERVICE_NAME, 100); + Map queryContext = new HashMap<>(); + queryContext.put(QueryContexts.BROKER_SERVICE_NAME, 100); exception.expect(ClassCastException.class); - QueryContexts.getBrokerServiceName(query); + QueryContexts.getBrokerServiceName(queryContext); } @Test diff --git a/services/pom.xml b/services/pom.xml index d37cc1d1a53a..643843deea3d 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -161,6 +161,27 @@ jaxb-api + + org.apache.druid + druid-core + ${project.parent.version} + test-jar + test + + + org.apache.druid + druid-server + ${project.parent.version} + test-jar + test + + + org.apache.druid + druid-processing + ${project.parent.version} + test-jar + test + junit junit @@ -176,6 +197,11 @@ hamcrest-core test + + org.easymock + easymock + test + diff --git a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java index 00c6e6a9519a..57a72c8ddeb7 100644 --- a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java +++ b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java @@ -52,6 +52,7 @@ import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.Authenticator; import org.apache.druid.server.security.AuthenticatorMapper; +import org.apache.druid.sql.http.SqlQuery; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; @@ -89,6 +90,7 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu private static final String SCHEME_ATTRIBUTE = "org.apache.druid.proxy.to.host.scheme"; private static final String QUERY_ATTRIBUTE = "org.apache.druid.proxy.query"; private static final String AVATICA_QUERY_ATTRIBUTE = "org.apache.druid.proxy.avaticaQuery"; + private static final String SQL_QUERY_ATTRIBUTE = "org.apache.druid.proxy.sqlQuery"; private static final String OBJECTMAPPER_ATTRIBUTE = "org.apache.druid.proxy.objectMapper"; private static final int CANCELLATION_TIMEOUT_MILLIS = 500; @@ -225,33 +227,7 @@ protected void service(HttpServletRequest request, HttpServletResponse response) request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes); } else if (isNativeQueryEndpoint && HttpMethod.DELETE.is(method)) { // query cancellation request - targetServer = hostFinder.pickDefaultServer(); - - for (final Server server : hostFinder.getAllServers()) { - // send query cancellation to all brokers this query may have gone to - // to keep the code simple, the proxy servlet will also send a request to the default targetServer. - if (!server.getHost().equals(targetServer.getHost())) { - // issue async requests - Response.CompleteListener completeListener = result -> { - if (result.isFailed()) { - log.warn( - result.getFailure(), - "Failed to forward cancellation request to [%s]", - server.getHost() - ); - } - }; - - Request broadcastReq = broadcastClient - .newRequest(rewriteURI(request, server.getScheme(), server.getHost())) - .method(HttpMethod.DELETE) - .timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - - copyRequestHeaders(request, broadcastReq); - broadcastReq.send(completeListener); - } - interruptedQueryCount.incrementAndGet(); - } + targetServer = cancelQuery(request); } else if (isNativeQueryEndpoint && HttpMethod.POST.is(method)) { // query request try { @@ -267,23 +243,7 @@ protected void service(HttpServletRequest request, HttpServletResponse response) request.setAttribute(QUERY_ATTRIBUTE, inputQuery); } catch (IOException e) { - log.warn(e, "Exception parsing query"); - final String errorMessage = e.getMessage() == null ? "no error message" : e.getMessage(); - requestLogger.logNativeQuery( - RequestLogLine.forNative( - null, - DateTimes.nowUtc(), - request.getRemoteAddr(), - new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage)) - ) - ); - response.setStatus(HttpServletResponse.SC_BAD_REQUEST); - response.setContentType(MediaType.APPLICATION_JSON); - objectMapper.writeValue( - response.getOutputStream(), - ImmutableMap.of("error", errorMessage) - ); - + handleQueryParseException(request, response, objectMapper, e, true); return; } catch (Exception e) { @@ -291,13 +251,16 @@ protected void service(HttpServletRequest request, HttpServletResponse response) return; } } else if (routeSqlQueries && isSqlQueryEndpoint && HttpMethod.DELETE.is(method)) { - // Cancel SQL query - targetServer = cancelSqlQuery(request); + targetServer = cancelQuery(request); } else if (routeSqlQueries && isSqlQueryEndpoint && HttpMethod.POST.is(method)) { - // Submit a SQL query try { - targetServer = submitSqlQuery(request, response, objectMapper); - } catch (Exception e) { + targetServer = submitSqlQuery(request, objectMapper); + } + catch (IOException e) { + handleQueryParseException(request, response, objectMapper, e, false); + return; + } + catch (Exception e) { handleException(response, objectMapper, e); return; } @@ -311,63 +274,91 @@ protected void service(HttpServletRequest request, HttpServletResponse response) doService(request, response); } - private Server cancelSqlQuery(HttpServletRequest request) + /** + * Issues async query cancellation requests to all Brokers (except the default + * targetServer). Query cancellation on the default targetServer is handled by + * the proxy servlet. + * + * @return The default targetServer to which the proxy servlet will send a + * cancellation request. + */ + private Server cancelQuery(HttpServletRequest request) { final Server targetServer = hostFinder.pickDefaultServer(); + // send query cancellation to all brokers this query may have gone to + // to keep the code simple, the proxy servlet will also send a request to the default targetServer. for (final Server server : hostFinder.getAllServers()) { - // send query cancellation to all brokers this query may have gone to - // to keep the code simple, the proxy servlet will also send a request to the default targetServer. - if (!server.getHost().equals(targetServer.getHost())) { - // issue async requests - Response.CompleteListener completeListener = result -> { - if (result.isFailed()) { - log.warn( - result.getFailure(), - "Failed to forward SQL cancellation request to [%s]", - server.getHost() - ); - } - }; + if (server.getHost().equals(targetServer.getHost())) { + continue; + } - Request broadcastReq = broadcastClient - .newRequest(rewriteURI(request, server.getScheme(), server.getHost())) - .method(HttpMethod.DELETE) - .timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + // issue async requests + Response.CompleteListener completeListener = result -> { + if (result.isFailed()) { + log.warn( + result.getFailure(), + "Failed to forward cancellation request to [%s]", + server.getHost() + ); + } + }; - copyRequestHeaders(request, broadcastReq); - broadcastReq.send(completeListener); - } - interruptedQueryCount.incrementAndGet(); + Request broadcastReq = broadcastClient + .newRequest(rewriteURI(request, server.getScheme(), server.getHost())) + .method(HttpMethod.DELETE) + .timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + + copyRequestHeaders(request, broadcastReq); + broadcastReq.send(completeListener); } + interruptedQueryCount.incrementAndGet(); return targetServer; } private Server submitSqlQuery( HttpServletRequest request, - HttpServletResponse response, ObjectMapper objectMapper ) throws IOException { - try { - Server targetServer; - Object inputSqlQuery = null; - // TODO: Query inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); - if (inputSqlQuery != null) { - targetServer = hostFinder.findServerSql(); + Server targetServer; + SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), SqlQuery.class); + if (inputSqlQuery != null) { + targetServer = hostFinder.findServerSql(inputSqlQuery); /* if (inputQuery.getId() == null) { inputQuery = inputQuery.withId(UUID.randomUUID().toString()); }*/ - } else { - targetServer = hostFinder.pickDefaultServer(); - } - // TODO: request.setAttribute(QUERY_ATTRIBUTE, inputQuery); - return targetServer; + } else { + targetServer = hostFinder.pickDefaultServer(); } - catch (IOException e) { - log.warn(e, "Exception parsing query"); - final String errorMessage = e.getMessage() == null ? "no error message" : e.getMessage(); + request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery); + return targetServer; + } + + private void handleQueryParseException( + HttpServletRequest request, + HttpServletResponse response, + ObjectMapper objectMapper, + IOException parseException, + boolean isNativeQuery + ) throws IOException + { + log.warn(parseException, "Exception parsing query"); + + // Log the error message + final String errorMessage = parseException.getMessage() == null + ? "no error message" : parseException.getMessage(); + if (isNativeQuery) { + requestLogger.logNativeQuery( + RequestLogLine.forNative( + null, + DateTimes.nowUtc(), + request.getRemoteAddr(), + new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage)) + ) + ); + } else { requestLogger.logSqlQuery( RequestLogLine.forSql( null, @@ -377,15 +368,15 @@ private Server submitSqlQuery( new QueryStats(ImmutableMap.of("success", false, "exception", errorMessage)) ) ); - response.setStatus(HttpServletResponse.SC_BAD_REQUEST); - response.setContentType(MediaType.APPLICATION_JSON); - objectMapper.writeValue( - response.getOutputStream(), - ImmutableMap.of("error", errorMessage) - ); - - return targetServer; } + + // Write to the response + response.setStatus(HttpServletResponse.SC_BAD_REQUEST); + response.setContentType(MediaType.APPLICATION_JSON); + objectMapper.writeValue( + response.getOutputStream(), + ImmutableMap.of("error", errorMessage) + ); } protected void doService( diff --git a/services/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java b/services/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java index 2f1d45d2bc1d..c16ec0c035b6 100644 --- a/services/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java +++ b/services/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java @@ -27,8 +27,10 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; +import org.apache.druid.sql.http.SqlQuery; import javax.annotation.Nullable; +import java.util.Map; /** * Implementation of {@link TieredBrokerSelectorStrategy} which uses the parameter @@ -57,9 +59,26 @@ public ManualTieredBrokerSelectorStrategy( @Override public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) + { + return getBrokerServiceName(tierConfig, query.getContext()); + } + + @Override + public Optional getBrokerServiceName(TieredBrokerConfig config, SqlQuery sqlQuery) + { + return getBrokerServiceName(config, sqlQuery.getContext()); + } + + /** + * Determines the Broker service name from the given query context. + */ + private Optional getBrokerServiceName( + TieredBrokerConfig tierConfig, + Map queryContext + ) { try { - final String contextBrokerService = QueryContexts.getBrokerServiceName(query); + final String contextBrokerService = QueryContexts.getBrokerServiceName(queryContext); if (isValidBrokerService(contextBrokerService, tierConfig)) { // If the broker service in the query context is valid, use that diff --git a/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java index 3c69403ae444..6a7a0785fca1 100644 --- a/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java +++ b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java @@ -25,9 +25,9 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.Query; +import org.apache.druid.sql.http.SqlQuery; import java.util.Collection; -import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -66,9 +66,9 @@ public Server findDefaultServer() public Collection getAllServers() { - return ((Collection>) hostSelector.getAllBrokers().values()).stream() - .flatMap(Collection::stream) - .collect(Collectors.toList()); + return hostSelector.getAllBrokers().values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); } public Server findServerAvatica(String connectionId) @@ -89,9 +89,9 @@ public Server findServerAvatica(String connectionId) return chosenServer; } - public Server findServerSql() + public Server findServerSql(SqlQuery sqlQuery) { - Server server = findServerInner(hostSelector.selectForSql()); + Server server = findServerInner(hostSelector.selectForSql(sqlQuery)); assertServerFound(server, "No server found for SQL Query [%s]", "SELECT IT"); return server; } diff --git a/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java index 6d0189682eef..2f99e4f15f92 100644 --- a/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java +++ b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java @@ -38,6 +38,7 @@ import org.apache.druid.query.Query; import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; +import org.apache.druid.sql.http.SqlQuery; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -50,7 +51,7 @@ /** */ -public class TieredBrokerHostSelector +public class TieredBrokerHostSelector { private static EmittingLogger log = new EmittingLogger(TieredBrokerHostSelector.class); @@ -181,7 +182,7 @@ public String getDefaultServiceName() return tierConfig.getDefaultBrokerServiceName(); } - public Pair select(final Query query) + public Pair select(final Query query) { synchronized (lock) { if (!ruleManager.isStarted() || !started) { @@ -246,6 +247,11 @@ public Pair select(final Query query) return getServerPair(brokerServiceName); } + /** + * Finds a server for the given brokerServiceName and returns a pair containing + * the brokerServiceName and the found server. Uses the default broker service + * if no server is found for the given brokerServiceName. + */ private Pair getServerPair(String brokerServiceName) { NodesHolder nodesHolder = servers.get(brokerServiceName); @@ -261,7 +267,7 @@ private Pair getServerPair(String brokerServiceName) return new Pair<>(brokerServiceName, nodesHolder.pick()); } - public Pair selectForSql() + public Pair selectForSql(SqlQuery sqlQuery) { synchronized (lock) { if (!ruleManager.isStarted() || !started) { @@ -272,7 +278,7 @@ public Pair selectForSql() // Resolve brokerServiceName using Tier selector strategies String brokerServiceName = null; for (TieredBrokerSelectorStrategy strategy : strategies) { - final Optional optionalName = strategy.getBrokerServiceName(tierConfig); + final Optional optionalName = strategy.getBrokerServiceName(tierConfig, sqlQuery); if (optionalName.isPresent()) { brokerServiceName = optionalName.get(); break; diff --git a/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java b/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java index 71b24556d4a5..42f4af79d134 100644 --- a/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java +++ b/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Optional; import org.apache.druid.query.Query; +import org.apache.druid.sql.http.SqlQuery; /** */ @@ -38,7 +39,7 @@ public interface TieredBrokerSelectorStrategy { Optional getBrokerServiceName(TieredBrokerConfig config, Query query); - default Optional getBrokerServiceName(TieredBrokerConfig config) + default Optional getBrokerServiceName(TieredBrokerConfig config, SqlQuery sqlQuery) { return Optional.absent(); } diff --git a/services/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java b/services/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java index d5b85eeb399a..68855fc0d9aa 100644 --- a/services/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java +++ b/services/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java @@ -28,11 +28,13 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.sql.http.SqlQuery; import org.junit.Before; import org.junit.Test; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -200,6 +202,56 @@ public void testGetBrokerServiceName_withInvalidFallback() ); } + @Test + public void testGetBrokerServiceName_forSql() + { + final ManualTieredBrokerSelectorStrategy strategy = + new ManualTieredBrokerSelectorStrategy(null); + + assertEquals( + Optional.absent(), + strategy.getBrokerServiceName(tieredBrokerConfig, createSqlQueryWithContext(null)) + ); + assertEquals( + Optional.absent(), + strategy.getBrokerServiceName( + tieredBrokerConfig, + createSqlQueryWithContext( + ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.INVALID_BROKER) + ) + ) + ); + assertEquals( + Optional.of(Names.BROKER_SVC_HOT), + strategy.getBrokerServiceName( + tieredBrokerConfig, + createSqlQueryWithContext( + ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.BROKER_SVC_HOT) + ) + ) + ); + assertEquals( + Optional.of(Names.BROKER_SVC_COLD), + strategy.getBrokerServiceName( + tieredBrokerConfig, + createSqlQueryWithContext( + ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, Names.BROKER_SVC_COLD) + ) + ) + ); + } + + private SqlQuery createSqlQueryWithContext(Map queryContext) + { + return new SqlQuery( + "SELECT * FROM test", + null, + false, + queryContext, + null + ); + } + /** * Test constants. */ From fc9fc4103fc561b9e7d0e6b314868152277724e8 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 10 Aug 2021 11:35:17 +0530 Subject: [PATCH 03/12] Update docs router.md, configuration/index.md --- docs/configuration/index.md | 1 + docs/design/router.md | 10 ++++++++++ .../druid/server/router/TieredBrokerHostSelector.java | 2 +- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 09b336d337a8..d50b4690bbb2 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -2046,6 +2046,7 @@ Supported query contexts: |`druid.router.tierToBrokerMap`|Queries for a certain tier of data are routed to their appropriate Broker. This value should be an ordered JSON map of tiers to Broker names. The priority of Brokers is based on the ordering.|{"_default_tier": ""}| |`druid.router.defaultRule`|The default rule for all datasources.|"_default"| |`druid.router.pollPeriod`|How often to poll for new rules.|PT1M| +|`druid.router.sql.enable`|Enable routing of SQL queries. Possible values are `true` and `false`. When set to `true`, the Router uses the provided strategies to determine the broker service for a given SQL query.|`false`| |`druid.router.strategies`|Please see [Router Strategies](../design/router.md#router-strategies) for details.|[{"type":"timeBoundary"},{"type":"priority"}]| |`druid.router.avatica.balancer.type`|Class to use for balancing Avatica queries across Brokers. Please see [Avatica Query Balancing](../design/router.md#avatica-query-balancing).|rendezvousHash| |`druid.router.managementProxy.enabled`|Enables the Router's [management proxy](../design/router.md#router-as-management-proxy) functionality.|false| diff --git a/docs/design/router.md b/docs/design/router.md index c5a3b8dcb071..ad4c20165d10 100644 --- a/docs/design/router.md +++ b/docs/design/router.md @@ -112,6 +112,7 @@ Queries with a priority set to less than minPriority are routed to the lowest pr #### manual This strategy reads the parameter `brokerService` from the query context and routes the query to that broker service. If no valid `brokerService` is specified in the query context, the field `defaultManualBrokerService` is used to determine target broker service given the value is valid and non-null. A value is considered valid if it is present in `druid.router.tierToBrokerMap` +This strategy can route both Native and SQL queries (when enabled). *Example*: A strategy that routes queries to the Broker "druid:broker-hot" if no valid `brokerService` is found in the query context. @@ -137,6 +138,15 @@ Allows defining arbitrary routing rules using a JavaScript function. The functio > JavaScript-based functionality is disabled by default. Please refer to the Druid [JavaScript programming guide](../development/javascript.md) for guidelines about using Druid's JavaScript functionality, including instructions on how to enable it. +### Routing of SQL queries + +To enable routing of SQL queries, set `druid.router.sql.enable` to `true` (`false` by default). The broker service for a +given SQL query is resolved using only the provided Router strategies. If not resolved using any of the strategies, the +Router uses the `defaultBrokerServiceName`. This behaviour is slightly different from native queries where the Router +first tries to resolve the broker service using strategies, then load rules and finally using the `defaultBrokerServiceName` +if still not resolved. + +Routing of native queries is always enabled. ### Avatica query balancing diff --git a/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java index 2f99e4f15f92..20b0625beed0 100644 --- a/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java +++ b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java @@ -289,7 +289,7 @@ public Pair selectForSql(SqlQuery sqlQuery) if (brokerServiceName == null) { log.error( "No brokerServiceName found for SQL Query [%s]. Using default selector for [%s].", - "SELECT stuff", + sqlQuery.getQuery(), tierConfig.getDefaultBrokerServiceName() ); brokerServiceName = tierConfig.getDefaultBrokerServiceName(); From 70c30fbcfa5eabbf1febc22914710a53f81f00fe Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 10 Aug 2021 11:53:09 +0530 Subject: [PATCH 04/12] Refactor AsyncQueryForwardingServlet --- .../server/AsyncQueryForwardingServlet.java | 38 +++++++------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java index 57a72c8ddeb7..2bad35804db2 100644 --- a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java +++ b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java @@ -227,7 +227,8 @@ protected void service(HttpServletRequest request, HttpServletResponse response) request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes); } else if (isNativeQueryEndpoint && HttpMethod.DELETE.is(method)) { // query cancellation request - targetServer = cancelQuery(request); + targetServer = hostFinder.pickDefaultServer(); + broadcastQueryCancelRequest(request, targetServer); } else if (isNativeQueryEndpoint && HttpMethod.POST.is(method)) { // query request try { @@ -251,10 +252,11 @@ protected void service(HttpServletRequest request, HttpServletResponse response) return; } } else if (routeSqlQueries && isSqlQueryEndpoint && HttpMethod.DELETE.is(method)) { - targetServer = cancelQuery(request); + targetServer = hostFinder.pickDefaultServer(); + broadcastQueryCancelRequest(request, targetServer); } else if (routeSqlQueries && isSqlQueryEndpoint && HttpMethod.POST.is(method)) { try { - targetServer = submitSqlQuery(request, objectMapper); + targetServer = getTargetServerForSql(request, objectMapper); } catch (IOException e) { handleQueryParseException(request, response, objectMapper, e, false); @@ -275,17 +277,12 @@ protected void service(HttpServletRequest request, HttpServletResponse response) } /** - * Issues async query cancellation requests to all Brokers (except the default - * targetServer). Query cancellation on the default targetServer is handled by - * the proxy servlet. - * - * @return The default targetServer to which the proxy servlet will send a - * cancellation request. + * Issues async query cancellation requests to all Brokers (except the given + * targetServer). Query cancellation on the targetServer is handled by the + * proxy servlet. */ - private Server cancelQuery(HttpServletRequest request) + private void broadcastQueryCancelRequest(HttpServletRequest request, Server targetServer) { - final Server targetServer = hostFinder.pickDefaultServer(); - // send query cancellation to all brokers this query may have gone to // to keep the code simple, the proxy servlet will also send a request to the default targetServer. for (final Server server : hostFinder.getAllServers()) { @@ -314,26 +311,19 @@ private Server cancelQuery(HttpServletRequest request) } interruptedQueryCount.incrementAndGet(); - return targetServer; } - private Server submitSqlQuery( + private Server getTargetServerForSql( HttpServletRequest request, ObjectMapper objectMapper ) throws IOException { - Server targetServer; SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), SqlQuery.class); - if (inputSqlQuery != null) { - targetServer = hostFinder.findServerSql(inputSqlQuery); - /* if (inputQuery.getId() == null) { - inputQuery = inputQuery.withId(UUID.randomUUID().toString()); - }*/ - } else { - targetServer = hostFinder.pickDefaultServer(); - } request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery); - return targetServer; + + return inputSqlQuery != null + ? hostFinder.findServerSql(inputSqlQuery) + : hostFinder.pickDefaultServer(); } private void handleQueryParseException( From d607069fb6023cb60099a2f8bd11f98b10687423 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 10 Aug 2021 12:43:21 +0530 Subject: [PATCH 05/12] Set content of proxy request for SQL. Add tests --- .../server/AsyncQueryForwardingServlet.java | 26 +++++++----- .../router/TieredBrokerHostSelectorTest.java | 40 +++++++++++++++++++ 2 files changed, 57 insertions(+), 9 deletions(-) diff --git a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java index 2bad35804db2..c4f433b0c54b 100644 --- a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java +++ b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java @@ -394,16 +394,11 @@ protected void sendProxyRequest( } final Query query = (Query) clientRequest.getAttribute(QUERY_ATTRIBUTE); + final SqlQuery sqlQuery = (SqlQuery) clientRequest.getAttribute(SQL_QUERY_ATTRIBUTE); if (query != null) { - final ObjectMapper objectMapper = (ObjectMapper) clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE); - try { - byte[] bytes = objectMapper.writeValueAsBytes(query); - proxyRequest.content(new BytesContentProvider(bytes)); - proxyRequest.getHeaders().put(HttpHeader.CONTENT_LENGTH, String.valueOf(bytes.length)); - } - catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + setProxyRequestContent(proxyRequest, clientRequest, query); + } else if (sqlQuery != null) { + setProxyRequestContent(proxyRequest, clientRequest, sqlQuery); } // Since we can't see the request object on the remote side, we can't check whether the remote side actually @@ -435,6 +430,19 @@ protected void sendProxyRequest( ); } + private void setProxyRequestContent(Request proxyRequest, HttpServletRequest clientRequest, Object content) + { + final ObjectMapper objectMapper = (ObjectMapper) clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE); + try { + byte[] bytes = objectMapper.writeValueAsBytes(content); + proxyRequest.content(new BytesContentProvider(bytes)); + proxyRequest.getHeaders().put(HttpHeader.CONTENT_LENGTH, String.valueOf(bytes.length)); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + @Override protected Response.Listener newProxyResponseListener(HttpServletRequest request, HttpServletResponse response) { diff --git a/services/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java b/services/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java index a4d78d13eae5..bf1bfed5525d 100644 --- a/services/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/services/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java @@ -44,6 +44,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordinator.rules.IntervalLoadRule; import org.apache.druid.server.coordinator.rules.Rule; +import org.apache.druid.sql.http.SqlQuery; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; @@ -56,6 +57,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; /** */ @@ -333,6 +335,33 @@ public void testSelectBasedOnQueryContext() ); } + @Test + public void testSelectForSql() + { + Assert.assertEquals( + brokerSelector.getDefaultServiceName(), + brokerSelector.selectForSql( + createSqlQueryWithContext(null) + ).lhs + ); + Assert.assertEquals( + "hotBroker", + brokerSelector.selectForSql( + createSqlQueryWithContext( + ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, "hotBroker") + ) + ).lhs + ); + Assert.assertEquals( + "coldBroker", + brokerSelector.selectForSql( + createSqlQueryWithContext( + ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, "coldBroker") + ) + ).lhs + ); + } + @Test public void testGetAllBrokers() { @@ -356,6 +385,17 @@ public List apply(@Nullable List servers) ); } + private SqlQuery createSqlQueryWithContext(Map queryContext) + { + return new SqlQuery( + "SELECT * FROM test", + null, + false, + queryContext, + null + ); + } + private static class TestRuleManager extends CoordinatorRuleManager { public TestRuleManager( From 3d9f7edd075727c3c0e069ae815b1f99c2cf560c Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 11 Aug 2021 10:44:02 +0530 Subject: [PATCH 06/12] Fix deps and docs --- docs/design/router.md | 2 +- server/pom.xml | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/docs/design/router.md b/docs/design/router.md index ad4c20165d10..f2cb36c4e806 100644 --- a/docs/design/router.md +++ b/docs/design/router.md @@ -142,7 +142,7 @@ Allows defining arbitrary routing rules using a JavaScript function. The functio To enable routing of SQL queries, set `druid.router.sql.enable` to `true` (`false` by default). The broker service for a given SQL query is resolved using only the provided Router strategies. If not resolved using any of the strategies, the -Router uses the `defaultBrokerServiceName`. This behaviour is slightly different from native queries where the Router +Router uses the `defaultBrokerServiceName`. This behavior is slightly different from native queries where the Router first tries to resolve the broker service using strategies, then load rules and finally using the `defaultBrokerServiceName` if still not resolved. diff --git a/server/pom.xml b/server/pom.xml index a9569a38aef3..677866fad568 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -323,10 +323,6 @@ com.fasterxml.jackson.module jackson-module-guice - - org.apache.calcite.avatica - avatica-core - From a34d7c61fd7b2079de845a541d2d21e473fe499d Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 11 Aug 2021 20:28:28 +0530 Subject: [PATCH 07/12] Log query details when debugging is enabled --- .../org/apache/druid/query/QueryContexts.java | 5 ++++ .../apache/druid/query/QueryContextsTest.java | 2 ++ .../server/AsyncQueryForwardingServlet.java | 30 +++++++------------ .../router/TieredBrokerHostSelector.java | 16 ++++++---- 4 files changed, 28 insertions(+), 25 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 91156314c19f..dd58dfc2e5a1 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -329,6 +329,11 @@ public static boolean isDebug(Query query) return parseBoolean(query, ENABLE_DEBUG, DEFAULT_ENABLE_DEBUG); } + public static boolean isDebug(Map queryContext) + { + return parseBoolean(queryContext, ENABLE_DEBUG, DEFAULT_ENABLE_DEBUG); + } + public static Query withMaxScatterGatherBytes(Query query, long maxScatterGatherBytesLimit) { Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY); diff --git a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java index 61b35f290401..3b693aa799ea 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java @@ -177,6 +177,7 @@ public void testDefaultEnableQueryDebugging() ImmutableMap.of() ); Assert.assertFalse(QueryContexts.isDebug(query)); + Assert.assertFalse(QueryContexts.isDebug(query.getContext())); } @Test @@ -189,5 +190,6 @@ public void testEnableQueryDebuggingSetToTrue() ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true) ); Assert.assertTrue(QueryContexts.isDebug(query)); + Assert.assertTrue(QueryContexts.isDebug(query.getContext())); } } diff --git a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java index c4f433b0c54b..eeedeaeea709 100644 --- a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java +++ b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.google.inject.Provider; +import com.google.inject.name.Named; import org.apache.calcite.avatica.remote.ProtobufTranslation; import org.apache.calcite.avatica.remote.ProtobufTranslationImpl; import org.apache.calcite.avatica.remote.Service; @@ -93,14 +94,15 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu private static final String SQL_QUERY_ATTRIBUTE = "org.apache.druid.proxy.sqlQuery"; private static final String OBJECTMAPPER_ATTRIBUTE = "org.apache.druid.proxy.objectMapper"; + private static final String PROPERTY_SQL_ENABLE = "druid.router.sql.enable"; + private static final String PROPERTY_SQL_ENABLE_DEFAULT = "false"; + private static final int CANCELLATION_TIMEOUT_MILLIS = 500; private final AtomicLong successfulQueryCount = new AtomicLong(); private final AtomicLong failedQueryCount = new AtomicLong(); private final AtomicLong interruptedQueryCount = new AtomicLong(); - private final boolean routeSqlQueries; - private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception) throws IOException { @@ -129,6 +131,8 @@ private static void handleException(HttpServletResponse response, ObjectMapper o private final AuthenticatorMapper authenticatorMapper; private final ProtobufTranslation protobufTranslation; + private final boolean routeSqlQueries; + private HttpClient broadcastClient; @Inject @@ -158,7 +162,7 @@ public AsyncQueryForwardingServlet( this.authenticatorMapper = authenticatorMapper; this.protobufTranslation = new ProtobufTranslationImpl(); this.routeSqlQueries = Boolean.parseBoolean( - properties.getProperty("druid.router.sql.enable", "false") + properties.getProperty(PROPERTY_SQL_ENABLE, PROPERTY_SQL_ENABLE_DEFAULT) ); } @@ -251,12 +255,11 @@ protected void service(HttpServletRequest request, HttpServletResponse response) handleException(response, objectMapper, e); return; } - } else if (routeSqlQueries && isSqlQueryEndpoint && HttpMethod.DELETE.is(method)) { - targetServer = hostFinder.pickDefaultServer(); - broadcastQueryCancelRequest(request, targetServer); } else if (routeSqlQueries && isSqlQueryEndpoint && HttpMethod.POST.is(method)) { try { - targetServer = getTargetServerForSql(request, objectMapper); + SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), SqlQuery.class); + request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery); + targetServer = hostFinder.findServerSql(inputSqlQuery); } catch (IOException e) { handleQueryParseException(request, response, objectMapper, e, false); @@ -313,19 +316,6 @@ private void broadcastQueryCancelRequest(HttpServletRequest request, Server targ interruptedQueryCount.incrementAndGet(); } - private Server getTargetServerForSql( - HttpServletRequest request, - ObjectMapper objectMapper - ) throws IOException - { - SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), SqlQuery.class); - request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery); - - return inputSqlQuery != null - ? hostFinder.findServerSql(inputSqlQuery) - : hostFinder.pickDefaultServer(); - } - private void handleQueryParseException( HttpServletRequest request, HttpServletResponse response, diff --git a/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java index 20b0625beed0..fadcf1347ed7 100644 --- a/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java +++ b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.sql.http.SqlQuery; @@ -287,12 +288,17 @@ public Pair selectForSql(SqlQuery sqlQuery) // Use defaut if not resolved by strategies if (brokerServiceName == null) { - log.error( - "No brokerServiceName found for SQL Query [%s]. Using default selector for [%s].", - sqlQuery.getQuery(), - tierConfig.getDefaultBrokerServiceName() - ); brokerServiceName = tierConfig.getDefaultBrokerServiceName(); + + // Log if query debugging is enabled + if (QueryContexts.isDebug(sqlQuery.getContext())) { + log.info( + "No brokerServiceName found for SQL Query [%s], Context [%s]. Using default selector for [%s].", + sqlQuery.getQuery(), + sqlQuery.getContext(), + tierConfig.getDefaultBrokerServiceName() + ); + } } return getServerPair(brokerServiceName); From f358020b0083015f41a0dea6d082625c1c6113ca Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 11 Aug 2021 21:26:03 +0530 Subject: [PATCH 08/12] Add test for SqlQuery in AsyncQueryForwardingServletTest --- .../AsyncQueryForwardingServletTest.java | 42 +++++++++++++++++-- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java index 0689aa38fce5..4a4bfe96fb0c 100644 --- a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java +++ b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java @@ -64,6 +64,8 @@ import org.apache.druid.server.security.AuthenticatorMapper; import org.apache.druid.server.security.Authorizer; import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.sql.http.ResultFormat; +import org.apache.druid.sql.http.SqlQuery; import org.easymock.EasyMock; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.server.Handler; @@ -83,6 +85,8 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; @@ -191,10 +195,23 @@ public void testDeleteBroadcast() throws Exception latch.await(); } + @Test + public void testSqlQueryProxy() throws Exception + { + final SqlQuery query = new SqlQuery("SELECT * FROM foo", ResultFormat.ARRAY, false, null, null); + final QueryHostFinder hostFinder = EasyMock.createMock(QueryHostFinder.class); + EasyMock.expect(hostFinder.findServerSql(query)) + .andReturn(new TestServer("http", "1.2.3.4", 9999)).once(); + EasyMock.replay(hostFinder); + + Properties properties = new Properties(); + properties.setProperty("druid.router.sql.enable", "true"); + verifyServletCallsForQuery(query, true, hostFinder, properties); + } + @Test public void testQueryProxy() throws Exception { - final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource("foo") .intervals("2000/P1D") @@ -206,6 +223,20 @@ public void testQueryProxy() throws Exception EasyMock.expect(hostFinder.pickServer(query)).andReturn(new TestServer("http", "1.2.3.4", 9999)).once(); EasyMock.replay(hostFinder); + verifyServletCallsForQuery(query, false, hostFinder, new Properties()); + } + + /** + * Verifies that the Servlet calls the right methods the right number of times. + */ + private void verifyServletCallsForQuery( + Object query, + boolean isSql, + QueryHostFinder hostFinder, + Properties properties + ) throws Exception + { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); final HttpServletRequest requestMock = EasyMock.createMock(HttpServletRequest.class); final ByteArrayInputStream inputStream = new ByteArrayInputStream(jsonMapper.writeValueAsBytes(query)); final ServletInputStream servletInputStream = new ServletInputStream() @@ -243,10 +274,13 @@ public int read() EasyMock.expect(requestMock.getContentType()).andReturn("application/json").times(2); requestMock.setAttribute("org.apache.druid.proxy.objectMapper", jsonMapper); EasyMock.expectLastCall(); - EasyMock.expect(requestMock.getRequestURI()).andReturn("/druid/v2/"); + EasyMock.expect(requestMock.getRequestURI()).andReturn(isSql ? "/druid/v2/sql" : "/druid/v2/"); EasyMock.expect(requestMock.getMethod()).andReturn("POST"); EasyMock.expect(requestMock.getInputStream()).andReturn(servletInputStream); - requestMock.setAttribute("org.apache.druid.proxy.query", query); + requestMock.setAttribute( + isSql ? "org.apache.druid.proxy.sqlQuery" : "org.apache.druid.proxy.query", + query + ); requestMock.setAttribute("org.apache.druid.proxy.to.host", "1.2.3.4:9999"); requestMock.setAttribute("org.apache.druid.proxy.to.host.scheme", "http"); EasyMock.expectLastCall(); @@ -264,7 +298,7 @@ public int read() new NoopRequestLogger(), new DefaultGenericQueryMetricsFactory(), new AuthenticatorMapper(ImmutableMap.of()), - new Properties() + properties ) { @Override From a420c5da07c33914870394ca45ad572a8f1a203f Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 12 Aug 2021 07:55:01 +0530 Subject: [PATCH 09/12] Fix dependencies and checkstyle --- services/pom.xml | 48 +++++++++++++++++++ .../server/AsyncQueryForwardingServlet.java | 1 - .../AsyncQueryForwardingServletTest.java | 2 - 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/services/pom.xml b/services/pom.xml index 643843deea3d..1b80915dead0 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -80,6 +80,18 @@ commons-io commons-io + + org.eclipse.jetty + jetty-client + + + org.eclipse.jetty + jetty-http + + + org.eclipse.jetty + jetty-proxy + org.eclipse.jetty jetty-server @@ -88,6 +100,10 @@ org.apache.curator curator-framework + + org.apache.calcite.avatica + avatica-core + joda-time joda-time @@ -96,14 +112,38 @@ com.google.inject guice + + com.fasterxml.jackson.core + jackson-annotations + com.fasterxml.jackson.core jackson-databind + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-smile-provider + com.opencsv opencsv + + javax.validation + validation-api + + + com.google.errorprone + error_prone_annotations + + + commons-lang + commons-lang + + + javax.ws.rs + jsr311-api + org.eclipse.jetty jetty-servlet @@ -112,6 +152,10 @@ org.eclipse.jetty jetty-rewrite + + com.sun.jersey + jersey-server + com.google.inject.extensions guice-multibindings @@ -124,6 +168,10 @@ org.eclipse.jetty jetty-util + + io.netty + netty + io.netty netty-common diff --git a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java index eeedeaeea709..0a2da6293a91 100644 --- a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java +++ b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.google.inject.Provider; -import com.google.inject.name.Named; import org.apache.calcite.avatica.remote.ProtobufTranslation; import org.apache.calcite.avatica.remote.ProtobufTranslationImpl; import org.apache.calcite.avatica.remote.Service; diff --git a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java index 4a4bfe96fb0c..9a922893cfd9 100644 --- a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java +++ b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java @@ -85,8 +85,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; From 2341de88a581f59d2014a1d255a8e61126f3fe38 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 12 Aug 2021 12:50:51 +0530 Subject: [PATCH 10/12] Add test dependency on protobuf-java --- extensions-core/hdfs-storage/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index d16712d8eaea..47a9ec74c5a3 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -377,6 +377,11 @@ junit test + + com.google.protobuf + protobuf-java + test + org.apache.druid druid-server From 805ffb77ab1d1bf8785fb3e60f23400151537976 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 12 Aug 2021 16:36:08 +0530 Subject: [PATCH 11/12] Add javadoc --- docs/design/router.md | 3 +++ .../router/TieredBrokerSelectorStrategy.java | 17 +++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/docs/design/router.md b/docs/design/router.md index f2cb36c4e806..06d1e1fcf909 100644 --- a/docs/design/router.md +++ b/docs/design/router.md @@ -148,6 +148,9 @@ if still not resolved. Routing of native queries is always enabled. +Setting `druid.router.sql.enable` does not affect Avatica JDBC requests. They are routed based on connection ID as +explained in the next section. + ### Avatica query balancing All Avatica JDBC requests with a given connection ID must be routed to the same Broker, since Druid Brokers do not share connection state with each other. diff --git a/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java b/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java index 42f4af79d134..aee4ef88c9a5 100644 --- a/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java +++ b/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java @@ -37,8 +37,25 @@ public interface TieredBrokerSelectorStrategy { + + /** + * Tries to determine the name of the Broker service to which the given native + * query should be routed. + * + * @param config Config containing tier to broker service map + * @param query Native (JSON) query to be routed + * @return An empty Optional if the service name could not be determined. + */ Optional getBrokerServiceName(TieredBrokerConfig config, Query query); + /** + * Tries to determine the name of the Broker service to which the given SqlQuery + * should be routed. The default implementation returns an empty Optional. + * + * @param config Config containing tier to broker service map + * @param sqlQuery SQL query to be routed + * @return An empty Optional if the service name could not be determined. + */ default Optional getBrokerServiceName(TieredBrokerConfig config, SqlQuery sqlQuery) { return Optional.absent(); From 187a59064c8eacbbaf481ca87533b3321dd5e11a Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 13 Aug 2021 16:33:05 +0530 Subject: [PATCH 12/12] Remove condition on ruleManager from TieredBrokerHostSelector.selectForSql() --- .../apache/druid/server/router/TieredBrokerHostSelector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java index fadcf1347ed7..ae03665153ac 100644 --- a/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java +++ b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java @@ -271,7 +271,7 @@ private Pair getServerPair(String brokerServiceName) public Pair selectForSql(SqlQuery sqlQuery) { synchronized (lock) { - if (!ruleManager.isStarted() || !started) { + if (!started) { return getDefaultLookup(); } }