Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.server.DruidNode;
import io.druid.server.coordination.ServerAnnouncer;
import io.druid.server.initialization.IndexerZkConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
Expand Down Expand Up @@ -75,7 +76,8 @@ public TaskMaster(
final TaskRunnerFactory runnerFactory,
final CuratorFramework curator,
final ServiceAnnouncer serviceAnnouncer,
final ServiceEmitter emitter
final ServiceEmitter emitter,
final ServerAnnouncer serverAnnouncer
)
{
this.taskActionClientFactory = taskActionClientFactory;
Expand Down Expand Up @@ -121,12 +123,14 @@ public void takeLeadership(CuratorFramework client) throws Exception
public void start() throws Exception
{
serviceAnnouncer.announce(node);
serverAnnouncer.announceLeadership();
}

@Override
public void stop()
{
serviceAnnouncer.unannounce(node);
serverAnnouncer.unannounceLeadership();
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public class RealtimeIndexTaskTest
0,
"historical",
"dummy_tier",
0
0,
"service",
"hostText", -1
);
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final ServiceEmitter emitter = new ServiceEmitter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.server.DruidNode;
import io.druid.server.coordination.ServerAnnouncer;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
Expand Down Expand Up @@ -170,7 +171,8 @@ public void announce(DruidNode node)
announcementLatch.countDown();
}
},
serviceEmitter
serviceEmitter,
EasyMock.createMock(ServerAnnouncer.class)
);
EmittingLogger.registerEmitter(serviceEmitter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.metamx.common.logger.Logger;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ExternalServerDiscoverySelector;
import io.druid.curator.discovery.ServerDiscoverySelector;

import java.util.concurrent.Callable;
Expand All @@ -30,7 +31,7 @@ public class ServerDiscoveryUtil

private static final Logger LOG = new Logger(ServerDiscoveryUtil.class);

public static boolean isInstanceReady(ServerDiscoverySelector serviceProvider)
public static boolean isInstanceReady(ExternalServerDiscoverySelector serviceProvider)
{
try {
Server instance = serviceProvider.pick();
Expand All @@ -46,7 +47,7 @@ public static boolean isInstanceReady(ServerDiscoverySelector serviceProvider)
return true;
}

public static void waitUntilInstanceReady(final ServerDiscoverySelector serviceProvider, String instanceType)
public static void waitUntilInstanceReady(final ExternalServerDiscoverySelector serviceProvider, String instanceType)
{
RetryUtil.retryUntilTrue(
new Callable<Boolean>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import io.druid.curator.discovery.ExternalServerDiscoverySelector;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
Expand Down Expand Up @@ -183,7 +184,7 @@ private String setShutOffTime(String taskAsString, DateTime time)
public void postEvents() throws Exception
{
DateTimeZone zone = DateTimeZone.forID("UTC");
final ServerDiscoverySelector eventReceiverSelector = factory.createSelector(EVENT_RECEIVER_SERVICE_NAME);
final ExternalServerDiscoverySelector eventReceiverSelector = factory.createExternalSelector(EVENT_RECEIVER_SERVICE_NAME);
eventReceiverSelector.start();
BufferedReader reader = null;
InputStreamReader isr = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import io.druid.curator.discovery.ExternalServerDiscoverySelector;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
Expand Down Expand Up @@ -148,7 +149,7 @@ private String withServiceName(String taskAsString, String serviceName)

public void postEvents(int id) throws Exception
{
final ServerDiscoverySelector eventReceiverSelector = factory.createSelector(EVENT_RECEIVER_SERVICE_PREFIX + id);
final ExternalServerDiscoverySelector eventReceiverSelector = factory.createExternalSelector(EVENT_RECEIVER_SERVICE_PREFIX + id);
eventReceiverSelector.start();
try {
ServerDiscoveryUtil.waitUntilInstanceReady(eventReceiverSelector, "Event Receiver");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public BatchServerInventoryView(
{
super(
log,
zkPaths.getAnnouncementsPath(),
zkPaths.getCapabilityPathFor("segmentServer"),
zkPaths.getLiveSegmentsPath(),
curator,
jsonMapper,
Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/io/druid/client/DruidServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ public DruidServer(
config.getMaxSize(),
type,
config.getTier(),
DEFAULT_PRIORITY
DEFAULT_PRIORITY,
node.getServiceName(),
node.getHost(),
node.getPort()
);
}

Expand All @@ -76,10 +79,13 @@ public DruidServer(
@JsonProperty("maxSize") long maxSize,
@JsonProperty("type") String type,
@JsonProperty("tier") String tier,
@JsonProperty("priority") int priority
@JsonProperty("priority") int priority,
@JsonProperty("service") String service,
@JsonProperty("hostText") String hostText,
@JsonProperty("port") int port
)
{
this.metadata = new DruidServerMetadata(name, host, maxSize, type, tier, priority);
this.metadata = new DruidServerMetadata(name, host, maxSize, type, tier, priority, service, hostText, port);

this.dataSources = new ConcurrentHashMap<String, DruidDataSource>();
this.segments = new ConcurrentHashMap<String, DataSegment>();
Expand Down
72 changes: 72 additions & 0 deletions server/src/main/java/io/druid/client/DruidServerDiscovery.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.client;

import io.druid.server.coordination.DruidServerMetadata;

import java.util.List;

/**
* This interface is only used internally for discovering Druid servers in the cluster, its implementation should not
* involve any external announcement or discovery, and should never change the state of a Druid cluster.
*/
public interface DruidServerDiscovery
{

/**
* Find all the Druid servers that have the specified type
*
* @param type the type of Druid servers we want to find
* @return a list of server metadata from matched Druid servers
* @throws Exception
*/
List<DruidServerMetadata> getServersForType(String type);

/**
* Find the Druid server that is the leader of the specified type
*
* @param type the type of the leader
* @return the leader's server metadata
* @throws Exception
*/
DruidServerMetadata getLeaderForType(String type);

/**
* Find all the Druid servers that have the specified type and service name
*
* @param type the type of Druid servers we want to find
* @param service the service name of Druid servers we want to find. It is specified in druid.service runtime properties
* @return a list of server metadata from matched Druid servers
* @throws Exception
*/
List<DruidServerMetadata> getServersForTypeWithService(String type, String service);


/**
* Discuss:
* Discovery of a descriptor which returns a list of nodes which it describes
* (example: "can load deep-storage segments"). But any particular node may have any number of descriptors.
*
* @param capability
* @return
* @throws Exception
*/
List<DruidServerMetadata> getServersWithCapability(String capability);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public SingleServerInventoryView(
{
super(
log,
zkPaths.getAnnouncementsPath(),
zkPaths.getCapabilityPathFor("segmentServer"),
zkPaths.getServedSegmentsPath(),
curator,
jsonMapper,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Lists;
import com.google.api.client.util.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;

import java.util.ArrayList;
import java.util.List;

/**
*/
public class ZookeeperDruidServerDiscovery implements DruidServerDiscovery
{

private final CuratorFramework curator;
private final ObjectMapper objectMapper;
private final ZkPathsConfig zkPathsConfig;

@Inject
public ZookeeperDruidServerDiscovery(CuratorFramework curator, ObjectMapper objectMapper, ZkPathsConfig zkPathsConfig)
{
this.curator = curator;
this.objectMapper = objectMapper;
this.zkPathsConfig = zkPathsConfig;
}

@Override
public List<DruidServerMetadata> getServersForType(String type)
{
return getNodesUnderPath(zkPathsConfig.getAnnouncementPathForType(type));
}

@Override
public DruidServerMetadata getLeaderForType(String type)
{
final List<DruidServerMetadata> leader = getNodesUnderPath(zkPathsConfig.getLeadershipPathForType(type));
if (leader.size() > 1) {
throw new ISE("There should only be 1 Coordinator leader in the cluster, got [%s]", leader);
}
return leader.size() == 0 ? null : leader.get(0);
}

@Override
public List<DruidServerMetadata> getServersForTypeWithService(final String type, final String service)
{
final List<DruidServerMetadata> retVal = new ArrayList<>();
for (DruidServerMetadata server : getServersForType(type)) {
if (server.getService().equals(service)) {
retVal.add(server);
}
}
return retVal;
}

@Override
public List<DruidServerMetadata> getServersWithCapability(String capability)
{
return getNodesUnderPath(zkPathsConfig.getCapabilityPathFor(capability));
}

private List<DruidServerMetadata> getNodesUnderPath(String announcementPath)
{
final ImmutableList.Builder<DruidServerMetadata> retVal = ImmutableList.builder();

try {
final List<String> children = curator.getChildren().forPath(announcementPath);
for (String hostName : children) {

retVal.add(objectMapper.readValue(
curator.getData()
.decompressed()
.forPath(ZKPaths.makePath(announcementPath, hostName)),
DruidServerMetadata.class
));
}
}
catch (KeeperException.NoNodeException e) {
return Lists.newArrayList();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return retVal.build();
}
}
Loading