From 6c498b7d4ac8035acca36a07b39c977e094e4bf8 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Mon, 28 Sep 2015 16:38:36 -0500 Subject: [PATCH] separate ingestion and query thread pool --- .../content/configuration/indexing-service.md | 8 ++ .../indexing/overlord/ForkingTaskRunner.java | 25 +++- .../druid/indexing/overlord/PortFinder.java | 12 ++ .../config/ForkingTaskRunnerConfig.java | 7 ++ .../guice/annotations/RemoteChatHandler.java | 34 ++++++ .../ServiceAnnouncingChatHandlerProvider.java | 4 +- .../jetty/ChatHandlerServerModule.java | 78 +++++++++++++ .../jetty/JettyServerModule.java | 68 +++++------ .../discovery/ServiceAnnouncerTest.java | 108 ++++++++++++++++++ .../src/main/java/io/druid/cli/CliPeon.java | 4 +- .../main/java/io/druid/cli/CliRealtime.java | 4 +- .../java/io/druid/guice/RealtimeModule.java | 2 +- 12 files changed, 316 insertions(+), 38 deletions(-) create mode 100644 server/src/main/java/io/druid/guice/annotations/RemoteChatHandler.java create mode 100644 server/src/main/java/io/druid/server/initialization/jetty/ChatHandlerServerModule.java create mode 100644 server/src/test/java/io/druid/curator/discovery/ServiceAnnouncerTest.java diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index 68d6612f41de..288398fc0239 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -251,6 +251,7 @@ Middle managers pass their configurations down to their child peons. The middle |`druid.indexer.runner.javaOpts`|-X Java options to run the peon in its own JVM.|""| |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in Zookeeper.|524288| |`druid.indexer.runner.startPort`|The port that peons begin running on.|8100| +|`druid.indexer.runner.separateIngestionEndpoint`|Use separate server and consequently separate jetty thread pool for ingesting events|false| |`druid.worker.ip`|The IP of the worker.|localhost| |`druid.worker.version`|Version identifier for the middle manager.|0| |`druid.worker.capacity`|Maximum number of tasks the middle manager can accept.|Number of available processors - 1| @@ -273,6 +274,13 @@ Additional peon configs include: |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0| +If `druid.indexer.runner.separateIngestionEndpoint` is set to true then following configurations are available for the ingestion server at peon: + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.indexer.server.chathandler.http.numThreads`|Number of threads for HTTP requests.|Math.max(10, (Number of available processors * 17) / 16 + 2) + 30| +|`druid.indexer.server.chathandler.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m| + If the peon is running in remote mode, there must be an overlord up and running. Peons in remote mode can set the following configurations: |Property|Description|Default| diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 3cb40a14d288..7d863892d98a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -37,6 +37,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; import io.druid.guice.annotations.Self; @@ -124,7 +125,18 @@ public TaskStatus call() final File attemptDir = new File(taskDir, attemptUUID); final ProcessHolder processHolder; - final int childPort = portFinder.findUnusedPort(); + final int childPort; + final int childChatHandlerPort; + + if (config.isSeparateIngestionEndpoint()) { + Pair portPair = portFinder.findTwoConsecutiveUnusedPorts(); + childPort = portPair.lhs; + childChatHandlerPort = portPair.rhs; + } else { + childPort = portFinder.findUnusedPort(); + childChatHandlerPort = -1; + } + try { final Closer closer = Closer.create(); try { @@ -233,6 +245,14 @@ public TaskStatus call() command.add(String.format("-Ddruid.host=%s", childHost)); command.add(String.format("-Ddruid.port=%d", childPort)); + if(config.isSeparateIngestionEndpoint()) { + command.add(String.format("-Ddruid.indexer.task.chathandler.service=%s", "placeholder/serviceName")); + // Actual serviceName will be passed by the EventReceiverFirehose when it registers itself with ChatHandlerProvider + // Thus, "placeholder/serviceName" will be ignored + command.add(String.format("-Ddruid.indexer.task.chathandler.host=%s", childHost)); + command.add(String.format("-Ddruid.indexer.task.chathandler.port=%d", childChatHandlerPort)); + } + command.add("io.druid.cli.Main"); command.add("internal"); command.add("peon"); @@ -301,6 +321,9 @@ public TaskStatus call() } } portFinder.markPortUnused(childPort); + if(childChatHandlerPort > 0) { + portFinder.markPortUnused(childChatHandlerPort); + } log.info("Removing temporary directory: %s", attemptDir); FileUtils.deleteDirectory(attemptDir); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java b/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java index be3c1d7412d4..3f0cb5954675 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java @@ -19,6 +19,7 @@ import com.google.common.collect.Sets; import com.metamx.common.ISE; +import com.metamx.common.Pair; import java.io.IOException; import java.net.BindException; @@ -74,6 +75,17 @@ public synchronized int findUnusedPort() return port; } + public synchronized Pair findTwoConsecutiveUnusedPorts() + { + int firstPort = chooseNext(startPort); + while (!canBind(firstPort) || !canBind(firstPort + 1)) { + firstPort = chooseNext(firstPort + 1); + } + usedPorts.add(firstPort); + usedPorts.add(firstPort + 1); + return new Pair<>(firstPort, firstPort + 1); + } + public synchronized void markPortUnused(int port) { usedPorts.remove(port); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java index da0db3262b44..82b89c18893f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java @@ -65,6 +65,13 @@ public class ForkingTaskRunnerConfig "hadoop" ); + @JsonProperty + private boolean separateIngestionEndpoint = false; + + public boolean isSeparateIngestionEndpoint() { + return separateIngestionEndpoint; + } + public String getJavaCommand() { return javaCommand; diff --git a/server/src/main/java/io/druid/guice/annotations/RemoteChatHandler.java b/server/src/main/java/io/druid/guice/annotations/RemoteChatHandler.java new file mode 100644 index 000000000000..4b048c485b53 --- /dev/null +++ b/server/src/main/java/io/druid/guice/annotations/RemoteChatHandler.java @@ -0,0 +1,34 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface RemoteChatHandler +{ +} diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java b/server/src/main/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java index 1f4e973c59b7..92d2bb8be12c 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java @@ -23,8 +23,8 @@ import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import io.druid.curator.discovery.ServiceAnnouncer; -import io.druid.guice.annotations.Self; import io.druid.server.DruidNode; +import io.druid.guice.annotations.RemoteChatHandler; import java.util.concurrent.ConcurrentMap; @@ -43,7 +43,7 @@ public class ServiceAnnouncingChatHandlerProvider implements ChatHandlerProvider @Inject public ServiceAnnouncingChatHandlerProvider( - @Self DruidNode node, + @RemoteChatHandler DruidNode node, ServiceAnnouncer serviceAnnouncer ) { diff --git a/server/src/main/java/io/druid/server/initialization/jetty/ChatHandlerServerModule.java b/server/src/main/java/io/druid/server/initialization/jetty/ChatHandlerServerModule.java new file mode 100644 index 000000000000..8a42f8068737 --- /dev/null +++ b/server/src/main/java/io/druid/server/initialization/jetty/ChatHandlerServerModule.java @@ -0,0 +1,78 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.server.initialization.jetty; + +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; +import io.druid.guice.LifecycleModule; +import io.druid.guice.annotations.RemoteChatHandler; +import io.druid.guice.annotations.Self; +import io.druid.server.DruidNode; +import io.druid.server.initialization.ServerConfig; +import org.eclipse.jetty.server.Server; + +import java.util.Properties; + +/** + */ +public class ChatHandlerServerModule implements Module +{ + private static final Logger log = new Logger(ChatHandlerServerModule.class); + + @Inject + private Properties properties; + + @Override + public void configure(Binder binder) + { + /** If "druid.indexer.task.chathandler.port" property is set then we assume that a + * separate Jetty Server with it's own {@link ServerConfig} is required for ingestion apart from the query server + * otherwise we bind {@link DruidNode} annotated with {@link RemoteChatHandler} to {@literal @}{@link Self} {@link DruidNode} + * so that same Jetty Server is used for querying as well as ingestion + */ + if (properties.containsKey("druid.indexer.task.chathandler.port")) { + log.info("Spawning separate ingestion server at port [%s]", properties.get("druid.indexer.task.chathandler.port")); + JsonConfigProvider.bind(binder, "druid.indexer.task.chathandler", DruidNode.class, RemoteChatHandler.class); + JsonConfigProvider.bind(binder, "druid.indexer.server.chathandler.http", ServerConfig.class, RemoteChatHandler.class); + LifecycleModule.register(binder, Server.class, RemoteChatHandler.class); + } else { + binder.bind(DruidNode.class).annotatedWith(RemoteChatHandler.class).to(Key.get(DruidNode.class, Self.class)); + binder.bind(ServerConfig.class).annotatedWith(RemoteChatHandler.class).to(Key.get(ServerConfig.class)); + } + } + + @Provides + @LazySingleton + @RemoteChatHandler + public Server getServer(Injector injector, Lifecycle lifecycle, @RemoteChatHandler DruidNode node, @RemoteChatHandler ServerConfig config) + { + final Server server = JettyServerModule.makeJettyServer(node, config); + JettyServerModule.initializeServer(injector, lifecycle, server); + return server; + } +} diff --git a/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java index 45bd177251b0..775c1e5e952e 100644 --- a/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java @@ -109,38 +109,8 @@ protected ResourceConfig getDefaultResourceConfig( @LazySingleton public Server getServer(Injector injector, Lifecycle lifecycle, @Self DruidNode node, ServerConfig config) { - JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class); - final Server server = makeJettyServer(node, config); - try { - initializer.initialize(server, injector); - } - catch (ConfigurationException e) { - throw new ProvisionException(Iterables.getFirst(e.getErrorMessages(), null).getMessage()); - } - - - lifecycle.addHandler( - new Lifecycle.Handler() - { - @Override - public void start() throws Exception - { - server.start(); - } - - @Override - public void stop() - { - try { - server.stop(); - } - catch (Exception e) { - log.warn(e, "Unable to stop Jetty server."); - } - } - } - ); + initializeServer(injector, lifecycle, server); return server; } @@ -153,7 +123,7 @@ public JacksonJsonProvider getJacksonJsonProvider(@Json ObjectMapper objectMappe return provider; } - private static Server makeJettyServer(@Self DruidNode node, ServerConfig config) + static Server makeJettyServer(DruidNode node, ServerConfig config) { final QueuedThreadPool threadPool = new QueuedThreadPool(); threadPool.setMinThreads(config.getNumThreads()); @@ -177,4 +147,38 @@ private static Server makeJettyServer(@Self DruidNode node, ServerConfig config) return server; } + + static void initializeServer(Injector injector, Lifecycle lifecycle, final Server server) + { + JettyServerInitializer initializer = injector.getInstance(JettyServerInitializer.class); + try { + initializer.initialize(server, injector); + } + catch (ConfigurationException e) { + throw new ProvisionException(Iterables.getFirst(e.getErrorMessages(), null).getMessage()); + } + + lifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() throws Exception + { + server.start(); + } + + @Override + public void stop() + { + try { + server.stop(); + } + catch (Exception e) { + log.warn(e, "Unable to stop Jetty server."); + } + } + } + ); + } + } diff --git a/server/src/test/java/io/druid/curator/discovery/ServiceAnnouncerTest.java b/server/src/test/java/io/druid/curator/discovery/ServiceAnnouncerTest.java new file mode 100644 index 000000000000..bce613e0157b --- /dev/null +++ b/server/src/test/java/io/druid/curator/discovery/ServiceAnnouncerTest.java @@ -0,0 +1,108 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.curator.discovery; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; +import com.metamx.common.ISE; +import io.druid.curator.CuratorTestBase; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class ServiceAnnouncerTest extends CuratorTestBase +{ + @Before + public void setUp() throws Exception + { + setupServerAndCurator(); + } + + @Test + public void testServiceAnnouncement() throws Exception + { + curator.start(); + List serviceNames = ImmutableList.of( + "druid/overlord", + "druid/coordinator", + "druid/firehose/tranquility_test-50-0000-0000" + ); + final ServiceDiscovery serviceDiscovery = createAndAnnounceServices(serviceNames); + Assert.assertTrue( + Iterators.all( + serviceNames.iterator(), + new Predicate() + { + @Override + public boolean apply(String input) + { + try { + return serviceDiscovery.queryForInstances(input.replaceAll("/", ":")).size() == 1; + } + catch (Exception e) { + throw new ISE( + "Something went wrong while finding instance with name [%s] in Service Discovery", + input + ); + } + } + } + ) + ); + } + + @Test (expected = IllegalArgumentException.class) + public void testServiceAnnouncementFail() throws Exception + { + curator.start(); + createAndAnnounceServices(ImmutableList.of("placeholder/\u0001")); + } + + private ServiceDiscovery createAndAnnounceServices(List serviceNames) throws Exception + { + int port = 1000; + ServiceDiscovery serviceDiscovery = + ServiceDiscoveryBuilder.builder(Void.class) + .basePath("/test") + .client(curator) + .build(); + for (String serviceName: serviceNames) { + String serviceNameToUse = CuratorServiceUtils.makeCanonicalServiceName(serviceName); + ServiceInstance instance = ServiceInstance.builder() + .name(serviceNameToUse) + .address("localhost") + .port(port++) + .build(); + serviceDiscovery.registerService(instance); + } + return serviceDiscovery; + } + + @After + public void tearDown() + { + tearDownServerAndCurator(); + } +} diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 7c4053907361..7c38e4311fee 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -70,6 +70,7 @@ import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; import io.druid.server.QueryResource; +import io.druid.server.initialization.jetty.ChatHandlerServerModule; import io.druid.server.initialization.jetty.JettyServerInitializer; import org.eclipse.jetty.server.Server; @@ -198,7 +199,8 @@ private void configureTaskActionClient(Binder binder) } }, - new IndexingServiceFirehoseModule() + new IndexingServiceFirehoseModule(), + new ChatHandlerServerModule() ); } diff --git a/services/src/main/java/io/druid/cli/CliRealtime.java b/services/src/main/java/io/druid/cli/CliRealtime.java index d95cf079a69a..0bf2fd844b67 100644 --- a/services/src/main/java/io/druid/cli/CliRealtime.java +++ b/services/src/main/java/io/druid/cli/CliRealtime.java @@ -24,6 +24,7 @@ import com.metamx.common.logger.Logger; import io.airlift.airline.Command; import io.druid.guice.RealtimeModule; +import io.druid.server.initialization.jetty.ChatHandlerServerModule; import java.util.List; @@ -54,7 +55,8 @@ public void configure(Binder binder) binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/realtime"); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8084); } - } + }, + new ChatHandlerServerModule() ); } } diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index 784aa7958be2..86dece8791d0 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -35,7 +35,6 @@ import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider; import io.druid.server.QueryResource; import io.druid.server.initialization.jetty.JettyServerInitializer; - import org.eclipse.jetty.server.Server; import java.util.List; @@ -44,6 +43,7 @@ */ public class RealtimeModule implements Module { + @Override public void configure(Binder binder) {