From 8f2fc595ee367efc7561644a49e04e1ac59100ba Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 22 Jul 2014 10:12:02 -0700 Subject: [PATCH 1/5] allow router to override selection based on config --- .../PriorityTieredBrokerSelectorStrategy.java | 46 +++++++++++++++ .../server/router/TieredBrokerConfig.java | 9 +++ .../router/TieredBrokerHostSelector.java | 19 +++--- ...ieredBrokerSelectorStrategiesProvider.java | 59 +++++++++++++++++++ .../router/TieredBrokerSelectorStrategy.java | 19 ++++++ ...eBoundaryTieredBrokerSelectorStrategy.java | 46 +++++++++++++++ .../router/TieredBrokerHostSelectorTest.java | 27 ++++++++- .../src/main/java/io/druid/cli/CliRouter.java | 7 +++ 8 files changed, 223 insertions(+), 9 deletions(-) create mode 100644 server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java create mode 100644 server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java create mode 100644 server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategy.java create mode 100644 server/src/main/java/io/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java diff --git a/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java new file mode 100644 index 000000000000..4fbcd78a6027 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/PriorityTieredBrokerSelectorStrategy.java @@ -0,0 +1,46 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import io.druid.query.Query; + +/** + */ +public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy +{ + @Override + public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) + { + final int priority = query.getContextPriority(0); + + if (priority < 0) { + return Optional.of( + Iterables.getFirst( + tierConfig.getTierToBrokerMap().values(), + tierConfig.getDefaultBrokerServiceName() + ) + ); + } + + return Optional.absent(); + } +} diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java index 67ff109f2176..395dd81e2e67 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java @@ -54,6 +54,10 @@ public class TieredBrokerConfig @NotNull private Period pollPeriod = new Period("PT1M"); + @JsonProperty + @NotNull + private String strategies = "[{\"type\":\"timeBoundary\"},{\"type\":\"priority\"}]"; + // tier, public LinkedHashMap getTierToBrokerMap() { @@ -88,4 +92,9 @@ public Period getPollPeriod() { return pollPeriod; } + + public String getStrategies() + { + return strategies; + } } diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java index 8ebe2e55050c..a28e6ffc2ffc 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java @@ -19,6 +19,7 @@ package io.druid.server.router; +import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.inject.Inject; @@ -30,7 +31,6 @@ import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.query.Query; -import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.server.coordinator.rules.LoadRule; import io.druid.server.coordinator.rules.Rule; import org.joda.time.DateTime; @@ -50,6 +50,7 @@ public class TieredBrokerHostSelector implements HostSelector private final TieredBrokerConfig tierConfig; private final ServerDiscoveryFactory serverDiscoveryFactory; private final ConcurrentHashMap selectorMap = new ConcurrentHashMap<>(); + private final List strategies; private final Object lock = new Object(); @@ -59,12 +60,14 @@ public class TieredBrokerHostSelector implements HostSelector public TieredBrokerHostSelector( CoordinatorRuleManager ruleManager, TieredBrokerConfig tierConfig, - ServerDiscoveryFactory serverDiscoveryFactory + ServerDiscoveryFactory serverDiscoveryFactory, + List strategies ) { this.ruleManager = ruleManager; this.tierConfig = tierConfig; this.serverDiscoveryFactory = serverDiscoveryFactory; + this.strategies = strategies; } @LifecycleStart @@ -128,12 +131,12 @@ public Pair select(final Query query) String brokerServiceName = null; - // Somewhat janky way of always selecting highest priority broker for this type of query - if (query instanceof TimeBoundaryQuery) { - brokerServiceName = Iterables.getFirst( - tierConfig.getTierToBrokerMap().values(), - tierConfig.getDefaultBrokerServiceName() - ); + for (TieredBrokerSelectorStrategy strategy : strategies) { + final Optional optionalName = strategy.getBrokerServiceName(tierConfig, query); + if (optionalName.isPresent()) { + brokerServiceName = optionalName.get(); + break; + } } if (brokerServiceName == null) { diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java new file mode 100644 index 000000000000..3300261b7d4d --- /dev/null +++ b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategiesProvider.java @@ -0,0 +1,59 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Lists; +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import com.google.inject.Provider; + +import java.util.List; + +/** + */ +public class TieredBrokerSelectorStrategiesProvider implements Provider> +{ + private final List strategies = Lists.newArrayList(); + + @Inject + public TieredBrokerSelectorStrategiesProvider(ObjectMapper jsonMapper, TieredBrokerConfig config) + { + try { + this.strategies.addAll( + (List) jsonMapper.readValue( + config.getStrategies(), new TypeReference>() + { + } + ) + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public List get() + { + return strategies; + } +} diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategy.java new file mode 100644 index 000000000000..bccf4d0d9426 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategy.java @@ -0,0 +1,19 @@ +package io.druid.server.router; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.base.Optional; +import io.druid.query.Query; + +/** + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "timeBoundary", value = TimeBoundaryTieredBrokerSelectorStrategy.class), + @JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class) +}) + +public interface TieredBrokerSelectorStrategy +{ + public Optional getBrokerServiceName(TieredBrokerConfig config, Query query); +} diff --git a/server/src/main/java/io/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java new file mode 100644 index 000000000000..135e87c14113 --- /dev/null +++ b/server/src/main/java/io/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java @@ -0,0 +1,46 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server.router; + +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import io.druid.query.Query; +import io.druid.query.timeboundary.TimeBoundaryQuery; + +/** + */ +public class TimeBoundaryTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy +{ + @Override + public Optional getBrokerServiceName(TieredBrokerConfig tierConfig, Query query) + { + // Somewhat janky way of always selecting highest priority broker for this type of query + if (query instanceof TimeBoundaryQuery) { + return Optional.of( + Iterables.getFirst( + tierConfig.getTierToBrokerMap().values(), + tierConfig.getDefaultBrokerServiceName() + ) + ); + } + + return Optional.absent(); + } +} diff --git a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java index 6d058d200fa0..3d1d0cd0bb04 100644 --- a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java @@ -84,7 +84,8 @@ public String getDefaultBrokerServiceName() return "hotBroker"; } }, - factory + factory, + Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy()) ); EasyMock.expect(factory.createSelector(EasyMock.anyObject())).andReturn(selector).atLeastOnce(); EasyMock.replay(factory); @@ -196,6 +197,30 @@ public void testSelectMultiInterval2() throws Exception Assert.assertEquals("coldBroker", brokerName); } + @Test + public void testPrioritySelect() throws Exception + { + String brokerName = (String) brokerSelector.select( + Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) + .intervals( + new MultipleIntervalSegmentSpec( + Arrays.asList( + new Interval("2011-08-31/2011-09-01"), + new Interval("2012-08-31/2012-09-01"), + new Interval("2013-08-31/2013-09-01") + ) + ) + ) + .context(ImmutableMap.of("priority", -1)) + .build() + ).lhs; + + Assert.assertEquals("hotBroker", brokerName); + } + + private static class TestRuleManager extends CoordinatorRuleManager { public TestRuleManager( diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index 681ae78fa272..c9a2f64fb7b4 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -23,6 +23,7 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.TypeLiteral; import com.metamx.common.logger.Logger; import io.airlift.command.Command; import io.druid.client.RoutingDruidClient; @@ -42,6 +43,8 @@ import io.druid.server.router.Router; import io.druid.server.router.TieredBrokerConfig; import io.druid.server.router.TieredBrokerHostSelector; +import io.druid.server.router.TieredBrokerSelectorStrategiesProvider; +import io.druid.server.router.TieredBrokerSelectorStrategy; import org.eclipse.jetty.server.Server; import java.util.List; @@ -79,6 +82,10 @@ public void configure(Binder binder) binder.bind(TieredBrokerHostSelector.class).in(ManageLifecycle.class); binder.bind(QueryHostFinder.class).in(LazySingleton.class); binder.bind(RoutingDruidClient.class).in(LazySingleton.class); + binder.bind(new TypeLiteral>(){}) + .toProvider(TieredBrokerSelectorStrategiesProvider.class) + .in(LazySingleton.class); + binder.bind(JettyServerInitializer.class).to(RouterJettyServerInitializer.class).in(LazySingleton.class); From 09e2f13ffcc82cd1fd550a0e34b48d8513c690c9 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 22 Jul 2014 10:12:47 -0700 Subject: [PATCH 2/5] adding missing header --- .../router/TieredBrokerSelectorStrategy.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategy.java b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategy.java index bccf4d0d9426..40a7714d8708 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategy.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerSelectorStrategy.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package io.druid.server.router; import com.fasterxml.jackson.annotation.JsonSubTypes; From 00f086665cb723248c246f9c793d5e26356b713b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 22 Jul 2014 16:55:21 -0700 Subject: [PATCH 3/5] default async timeout to server idle timeout --- .../java/io/druid/server/AsyncQueryForwardingServlet.java | 6 ++++++ .../java/io/druid/cli/RouterJettyServerInitializer.java | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 9888f0f8a30e..77354b81bf90 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -37,6 +37,7 @@ import io.druid.guice.annotations.Smile; import io.druid.query.DataSourceUtil; import io.druid.query.Query; +import io.druid.server.initialization.ServerConfig; import io.druid.server.log.RequestLogger; import io.druid.server.router.QueryHostFinder; import org.jboss.netty.buffer.ChannelBuffer; @@ -66,6 +67,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class); private static final Joiner COMMA_JOIN = Joiner.on(","); + private final ServerConfig config; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; private final QueryHostFinder hostFinder; @@ -74,6 +76,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet private final RequestLogger requestLogger; public AsyncQueryForwardingServlet( + ServerConfig config, @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, QueryHostFinder hostFinder, @@ -82,6 +85,7 @@ public AsyncQueryForwardingServlet( RequestLogger requestLogger ) { + this.config = config; this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; this.hostFinder = hostFinder; @@ -95,6 +99,8 @@ protected void doGet(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { final AsyncContext asyncContext = req.startAsync(req, res); + // default async timeout to be same as maxIdleTime for now + asyncContext.setTimeout(config.getMaxIdleTime().toStandardDuration().getMillis()); asyncContext.start( new Runnable() { diff --git a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java index 54c3a7f69e41..25c8d2c4bfbb 100644 --- a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java +++ b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java @@ -29,6 +29,7 @@ import io.druid.guice.annotations.Smile; import io.druid.server.AsyncQueryForwardingServlet; import io.druid.server.initialization.JettyServerInitializer; +import io.druid.server.initialization.ServerConfig; import io.druid.server.log.RequestLogger; import io.druid.server.router.QueryHostFinder; import org.eclipse.jetty.server.Handler; @@ -38,12 +39,14 @@ import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.AsyncGzipFilter; import org.eclipse.jetty.servlets.GzipFilter; /** */ public class RouterJettyServerInitializer implements JettyServerInitializer { + private final ServerConfig config; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; private final QueryHostFinder hostFinder; @@ -53,6 +56,7 @@ public class RouterJettyServerInitializer implements JettyServerInitializer @Inject public RouterJettyServerInitializer( + ServerConfig config, @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, QueryHostFinder hostFinder, @@ -61,6 +65,7 @@ public RouterJettyServerInitializer( RequestLogger requestLogger ) { + this.config = config; this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; this.hostFinder = hostFinder; @@ -76,6 +81,7 @@ public void initialize(Server server, Injector injector) queries.addServlet( new ServletHolder( new AsyncQueryForwardingServlet( + config, jsonMapper, smileMapper, hostFinder, From e76561e400cfd16b8a3fae375fc8dea1b79445fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 22 Jul 2014 16:55:43 -0700 Subject: [PATCH 4/5] use async gzip filter for async servlet --- .../main/java/io/druid/cli/RouterJettyServerInitializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java index 25c8d2c4bfbb..ebff67d2a84f 100644 --- a/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java +++ b/services/src/main/java/io/druid/cli/RouterJettyServerInitializer.java @@ -91,7 +91,7 @@ public void initialize(Server server, Injector injector) ) ), "/druid/v2/*" ); - queries.addFilter(GzipFilter.class, "/druid/v2/*", null); + queries.addFilter(AsyncGzipFilter.class, "/druid/v2/*", null); queries.addFilter(GuiceFilter.class, "/status/*", null); final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); From 03efbb0e023d5ddd5c7a9856a16c0ba372c4d028 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 22 Jul 2014 16:56:34 -0700 Subject: [PATCH 5/5] default query timeout to server idle timeout --- .../main/java/io/druid/server/QueryResource.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 4787b17465a2..59ebf5b94417 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -41,6 +41,7 @@ import io.druid.query.Query; import io.druid.query.QueryInterruptedException; import io.druid.query.QuerySegmentWalker; +import io.druid.server.initialization.ServerConfig; import io.druid.server.log.RequestLogger; import org.joda.time.DateTime; @@ -70,6 +71,7 @@ public class QueryResource public static final String APPLICATION_SMILE = "application/smile"; public static final String APPLICATION_JSON = "application/json"; + private final ServerConfig config; private final ObjectMapper jsonMapper; private final ObjectMapper smileMapper; private final QuerySegmentWalker texasRanger; @@ -79,6 +81,7 @@ public class QueryResource @Inject public QueryResource( + ServerConfig config, @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, QuerySegmentWalker texasRanger, @@ -87,6 +90,7 @@ public QueryResource( QueryManager queryManager ) { + this.config = config; this.jsonMapper = jsonMapper.copy(); this.jsonMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); @@ -135,6 +139,15 @@ public Response doPost( queryId = UUID.randomUUID().toString(); query = query.withId(queryId); } + if (query.getContextValue("timeout") == null) { + query = query.withOverriddenContext( + ImmutableMap.of( + "timeout", + config.getMaxIdleTime().toStandardDuration().getMillis() + ) + ); + } + if (log.isDebugEnabled()) { log.debug("Got query [%s]", query);