Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/content/configuration/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ The coordinator node uses several of the global configs in [Configuration](../co
|`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`|
|`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon , which manages the load and drop of segments.|PT0.050S (50 ms)|
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this coordinator node should act like an overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone overlord nodes. If set to true, then be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false|
|`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord nodes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL|

### Metadata Retrieval

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.server.DruidNode;
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.initialization.IndexerZkConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
Expand Down Expand Up @@ -73,18 +74,23 @@ public TaskMaster(
final TaskLockbox taskLockbox,
final TaskStorage taskStorage,
final TaskActionClientFactory taskActionClientFactory,
@Self final DruidNode node,
@Self final DruidNode selfNode,
final IndexerZkConfig zkPaths,
final TaskRunnerFactory runnerFactory,
final CuratorFramework curator,
final ServiceAnnouncer serviceAnnouncer,
final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig,
final ServiceEmitter emitter,
final SupervisorManager supervisorManager,
final OverlordHelperManager overlordHelperManager
)
{
this.supervisorManager = supervisorManager;
this.taskActionClientFactory = taskActionClientFactory;

final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode :
selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService());

this.leaderSelector = new LeaderSelector(
curator,
zkPaths.getLeaderLatchPath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public OverlordRedirectInfo(TaskMaster taskMaster)
}

@Override
public boolean doLocal()
public boolean doLocal(String requestURI)
{
return taskMaster.isLeading();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testDoLocal()
{
EasyMock.expect(taskMaster.isLeading()).andReturn(true).anyTimes();
EasyMock.replay(taskMaster);
Assert.assertTrue(redirectInfo.doLocal());
Assert.assertTrue(redirectInfo.doLocal(null));
EasyMock.verify(taskMaster);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.server.DruidNode;
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
Expand Down Expand Up @@ -183,6 +184,7 @@ public void announce(DruidNode node)
announcementLatch.countDown();
}
},
new CoordinatorOverlordServiceConfig(null, null),
serviceEmitter,
supervisorManager,
EasyMock.createNiceMock(OverlordHelperManager.class)
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/io/druid/server/DruidNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ public int getPort()
return port;
}

public DruidNode withService(String service)
{
return new DruidNode(service, host, port);
}

/**
* Returns host and port together as something that can be used as part of a URI.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.coordinator;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

/**
*/
public class CoordinatorOverlordServiceConfig
{
@JsonProperty
private final boolean enabled;

@JsonProperty
private final String overlordService;

public CoordinatorOverlordServiceConfig(
@JsonProperty("enabled") Boolean enabled,
@JsonProperty("overlordService") String overlordService
)
{
this.enabled = enabled == null ? false : enabled.booleanValue();
this.overlordService = overlordService;

Preconditions.checkArgument((this.enabled && this.overlordService != null) || !this.enabled,
"coordinator is enabled to be overlord but overlordService is not specified");
}

public boolean isEnabled()
{
return enabled;
}

public String getOverlordService()
{
return overlordService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public CoordinatorRedirectInfo(DruidCoordinator coordinator) {
}

@Override
public boolean doLocal()
public boolean doLocal(String requestURI)
{
return coordinator.isLeader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain)
throw new ServletException("non-HTTP request or response");
}

if (redirectInfo.doLocal()) {
if (redirectInfo.doLocal(request.getRequestURI())) {
chain.doFilter(request, response);
} else {
URL url = redirectInfo.getRedirectURL(request.getQueryString(), request.getRequestURI());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
public interface RedirectInfo
{
public boolean doLocal();
public boolean doLocal(String requestURI);

public URL getRedirectURL(String queryString, String requestURI);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testDoLocal()
{
EasyMock.expect(druidCoordinator.isLeader()).andReturn(true).anyTimes();
EasyMock.replay(druidCoordinator);
Assert.assertTrue(coordinatorRedirectInfo.doLocal());
Assert.assertTrue(coordinatorRedirectInfo.doLocal(null));
EasyMock.verify(druidCoordinator);
}

Expand Down
30 changes: 26 additions & 4 deletions services/src/main/java/io/druid/cli/CliCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Module;
Expand Down Expand Up @@ -79,6 +78,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jetty.server.Server;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
Expand All @@ -94,6 +94,7 @@ public class CliCoordinator extends ServerRunnable
private static final Logger log = new Logger(CliCoordinator.class);

private Properties properties;
private boolean beOverlord;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleIngestion?


public CliCoordinator()
{
Expand All @@ -104,12 +105,19 @@ public CliCoordinator()
public void configure(Properties properties)
{
this.properties = properties;
beOverlord = isOverlord(properties);

if (beOverlord) {
log.info("Coordinator is configured to act as Overlord as well.");
}
}

@Override
protected List<? extends Module> getModules()
{
return ImmutableList.<Module>of(
List<Module> modules = new ArrayList<>();

modules.add(
new Module()
{
@Override
Expand All @@ -131,7 +139,11 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, "druid.coordinator.balancer", BalancerStrategyFactory.class);

binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class);
if (beOverlord) {
binder.bind(RedirectInfo.class).to(CoordinatorOverlordRedirectInfo.class).in(LazySingleton.class);
} else {
binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class);
}

binder.bind(MetadataSegmentManager.class)
.toProvider(MetadataSegmentManagerProvider.class)
Expand Down Expand Up @@ -192,7 +204,6 @@ public void configure(Binder binder)
Predicates.equalTo("true"),
DruidCoordinatorSegmentKiller.class
);

}

@Provides
Expand All @@ -211,5 +222,16 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster(
}
}
);

if (beOverlord) {
modules.addAll(new CliOverlord().getModules(false));
}

return modules;
}

public static boolean isOverlord(Properties properties)
{
return Boolean.valueOf(properties.getProperty("druid.coordinator.asOverlord.enabled")).booleanValue();
}
}
31 changes: 22 additions & 9 deletions services/src/main/java/io/druid/cli/CliOverlord.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.audit.AuditManagerProvider;
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo;
import io.druid.server.initialization.jetty.JettyServerInitUtils;
Expand Down Expand Up @@ -113,18 +114,26 @@ public CliOverlord()

@Override
protected List<? extends Module> getModules()
{
return getModules(true);
}

protected List<? extends Module> getModules(final boolean standalone)
{
return ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant()
.annotatedWith(Names.named("serviceName"))
.to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME);
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090);

if (standalone) {
binder.bindConstant()
.annotatedWith(Names.named("serviceName"))
.to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME);
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090);
}

JsonConfigProvider.bind(binder, "druid.coordinator.asOverlord", CoordinatorOverlordServiceConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);

Expand Down Expand Up @@ -160,14 +169,18 @@ public void configure(Binder binder)
.toProvider(AuditManagerProvider.class)
.in(ManageLifecycle.class);

binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class);
if (standalone) {
binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer());
}

binder.bind(JettyServerInitializer.class).toInstance(new OverlordJettyServerInitializer());
Jerseys.addResource(binder, OverlordResource.class);
Jerseys.addResource(binder, SupervisorResource.class);

LifecycleModule.register(binder, Server.class);
if (standalone) {
LifecycleModule.register(binder, Server.class);
}
}

private void configureTaskStorage(Binder binder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,20 @@
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.resource.ResourceCollection;

import java.util.Properties;

/**
*/
class CoordinatorJettyServerInitializer implements JettyServerInitializer
{
private final DruidCoordinatorConfig config;
private final boolean beOverlord;

@Inject
CoordinatorJettyServerInitializer(DruidCoordinatorConfig config)
CoordinatorJettyServerInitializer(DruidCoordinatorConfig config, Properties properties)
{
this.config = config;
this.beOverlord = CliCoordinator.isOverlord(properties);
}

@Override
Expand All @@ -59,10 +63,19 @@ public void initialize(Server server, Injector injector)

root.addServlet(holderPwd, "/");
if(config.getConsoleStatic() == null) {
ResourceCollection staticResources = new ResourceCollection(
Resource.newClassPathResource("io/druid/console"),
Resource.newClassPathResource("static")
);
ResourceCollection staticResources;
if (beOverlord) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have u tested this? are u sure both consoles work?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i tested this. with this flag,

coordinator console - http://host:port/
overlord console - http://host:port/console.html

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also verify that http://coordinator-host:port/#/indexing-service works ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it works

staticResources = new ResourceCollection(
Resource.newClassPathResource("io/druid/console"),
Resource.newClassPathResource("static"),
Resource.newClassPathResource("indexer_static")
);
} else {
staticResources = new ResourceCollection(
Resource.newClassPathResource("io/druid/console"),
Resource.newClassPathResource("static")
);
}
root.setBaseResource(staticResources);
} else {
// used for console development
Expand All @@ -81,10 +94,15 @@ public void initialize(Server server, Injector injector)
// Can't use '/*' here because of Guice and Jetty static content conflicts
root.addFilter(GuiceFilter.class, "/info/*", null);
root.addFilter(GuiceFilter.class, "/druid/coordinator/*", null);
if (beOverlord) {
root.addFilter(GuiceFilter.class, "/druid/indexer/*", null);
}
// this will be removed in the next major release
root.addFilter(GuiceFilter.class, "/coordinator/*", null);

root.addServlet(new ServletHolder(injector.getInstance(OverlordProxyServlet.class)), "/druid/indexer/*");
if (!beOverlord) {
root.addServlet(new ServletHolder(injector.getInstance(OverlordProxyServlet.class)), "/druid/indexer/*");
}

HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{JettyServerInitUtils.getJettyRequestLogHandler(), root});
Expand Down
Loading