diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java index a9792200d958..c7654226e9aa 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java @@ -35,6 +35,7 @@ import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.server.DruidNode; +import io.druid.server.coordination.ServerAnnouncer; import io.druid.server.initialization.IndexerZkConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderSelector; @@ -75,7 +76,8 @@ public TaskMaster( final TaskRunnerFactory runnerFactory, final CuratorFramework curator, final ServiceAnnouncer serviceAnnouncer, - final ServiceEmitter emitter + final ServiceEmitter emitter, + final ServerAnnouncer serverAnnouncer ) { this.taskActionClientFactory = taskActionClientFactory; @@ -121,12 +123,14 @@ public void takeLeadership(CuratorFramework client) throws Exception public void start() throws Exception { serviceAnnouncer.announce(node); + serverAnnouncer.announceLeadership(); } @Override public void stop() { serviceAnnouncer.unannounce(node); + serverAnnouncer.unannounceLeadership(); } } ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 37914a8df42a..f3c08644f684 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -132,7 +132,9 @@ public class RealtimeIndexTaskTest 0, "historical", "dummy_tier", - 0 + 0, + "service", + "hostText", -1 ); private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); private static final ServiceEmitter emitter = new ServiceEmitter( diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index 24a6e2bdc7d0..a832b031372d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -49,6 +49,7 @@ import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.TaskQueueConfig; import io.druid.server.DruidNode; +import io.druid.server.coordination.ServerAnnouncer; import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; @@ -170,7 +171,8 @@ public void announce(DruidNode node) announcementLatch.countDown(); } }, - serviceEmitter + serviceEmitter, + EasyMock.createMock(ServerAnnouncer.class) ); EmittingLogger.registerEmitter(serviceEmitter); } diff --git a/integration-tests/src/main/java/io/druid/testing/utils/ServerDiscoveryUtil.java b/integration-tests/src/main/java/io/druid/testing/utils/ServerDiscoveryUtil.java index eec05ffea396..28f6978f5264 100644 --- a/integration-tests/src/main/java/io/druid/testing/utils/ServerDiscoveryUtil.java +++ b/integration-tests/src/main/java/io/druid/testing/utils/ServerDiscoveryUtil.java @@ -21,6 +21,7 @@ import com.metamx.common.logger.Logger; import io.druid.client.selector.Server; +import io.druid.curator.discovery.ExternalServerDiscoverySelector; import io.druid.curator.discovery.ServerDiscoverySelector; import java.util.concurrent.Callable; @@ -30,7 +31,7 @@ public class ServerDiscoveryUtil private static final Logger LOG = new Logger(ServerDiscoveryUtil.class); - public static boolean isInstanceReady(ServerDiscoverySelector serviceProvider) + public static boolean isInstanceReady(ExternalServerDiscoverySelector serviceProvider) { try { Server instance = serviceProvider.pick(); @@ -46,7 +47,7 @@ public static boolean isInstanceReady(ServerDiscoverySelector serviceProvider) return true; } - public static void waitUntilInstanceReady(final ServerDiscoverySelector serviceProvider, String instanceType) + public static void waitUntilInstanceReady(final ExternalServerDiscoverySelector serviceProvider, String instanceType) { RetryUtil.retryUntilTrue( new Callable() diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITRealtimeIndexTaskTest.java index 605c0350bafe..53920ed66e60 100644 --- a/integration-tests/src/test/java/io/druid/tests/indexer/ITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITRealtimeIndexTaskTest.java @@ -25,6 +25,7 @@ import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; +import io.druid.curator.discovery.ExternalServerDiscoverySelector; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.guice.annotations.Global; @@ -183,7 +184,7 @@ private String setShutOffTime(String taskAsString, DateTime time) public void postEvents() throws Exception { DateTimeZone zone = DateTimeZone.forID("UTC"); - final ServerDiscoverySelector eventReceiverSelector = factory.createSelector(EVENT_RECEIVER_SERVICE_NAME); + final ExternalServerDiscoverySelector eventReceiverSelector = factory.createExternalSelector(EVENT_RECEIVER_SERVICE_NAME); eventReceiverSelector.start(); BufferedReader reader = null; InputStreamReader isr = null; diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITUnionQueryTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITUnionQueryTest.java index 6857b884b764..8681433eb807 100644 --- a/integration-tests/src/test/java/io/druid/tests/indexer/ITUnionQueryTest.java +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITUnionQueryTest.java @@ -24,6 +24,7 @@ import com.google.inject.Inject; import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; +import io.druid.curator.discovery.ExternalServerDiscoverySelector; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.guice.annotations.Global; @@ -148,7 +149,7 @@ private String withServiceName(String taskAsString, String serviceName) public void postEvents(int id) throws Exception { - final ServerDiscoverySelector eventReceiverSelector = factory.createSelector(EVENT_RECEIVER_SERVICE_PREFIX + id); + final ExternalServerDiscoverySelector eventReceiverSelector = factory.createExternalSelector(EVENT_RECEIVER_SERVICE_PREFIX + id); eventReceiverSelector.start(); try { ServerDiscoveryUtil.waitUntilInstanceReady(eventReceiverSelector, "Event Receiver"); diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryView.java b/server/src/main/java/io/druid/client/BatchServerInventoryView.java index 35fc92325dc2..94cf89114d72 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryView.java @@ -52,7 +52,7 @@ public BatchServerInventoryView( { super( log, - zkPaths.getAnnouncementsPath(), + zkPaths.getCapabilityPathFor("segmentServer"), zkPaths.getLiveSegmentsPath(), curator, jsonMapper, diff --git a/server/src/main/java/io/druid/client/DruidServer.java b/server/src/main/java/io/druid/client/DruidServer.java index 9d5564b42540..99b9ce5b9423 100644 --- a/server/src/main/java/io/druid/client/DruidServer.java +++ b/server/src/main/java/io/druid/client/DruidServer.java @@ -65,7 +65,10 @@ public DruidServer( config.getMaxSize(), type, config.getTier(), - DEFAULT_PRIORITY + DEFAULT_PRIORITY, + node.getServiceName(), + node.getHost(), + node.getPort() ); } @@ -76,10 +79,13 @@ public DruidServer( @JsonProperty("maxSize") long maxSize, @JsonProperty("type") String type, @JsonProperty("tier") String tier, - @JsonProperty("priority") int priority + @JsonProperty("priority") int priority, + @JsonProperty("service") String service, + @JsonProperty("hostText") String hostText, + @JsonProperty("port") int port ) { - this.metadata = new DruidServerMetadata(name, host, maxSize, type, tier, priority); + this.metadata = new DruidServerMetadata(name, host, maxSize, type, tier, priority, service, hostText, port); this.dataSources = new ConcurrentHashMap(); this.segments = new ConcurrentHashMap(); diff --git a/server/src/main/java/io/druid/client/DruidServerDiscovery.java b/server/src/main/java/io/druid/client/DruidServerDiscovery.java new file mode 100644 index 000000000000..0c55976212d6 --- /dev/null +++ b/server/src/main/java/io/druid/client/DruidServerDiscovery.java @@ -0,0 +1,72 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import io.druid.server.coordination.DruidServerMetadata; + +import java.util.List; + +/** + * This interface is only used internally for discovering Druid servers in the cluster, its implementation should not + * involve any external announcement or discovery, and should never change the state of a Druid cluster. + */ +public interface DruidServerDiscovery +{ + + /** + * Find all the Druid servers that have the specified type + * + * @param type the type of Druid servers we want to find + * @return a list of server metadata from matched Druid servers + * @throws Exception + */ + List getServersForType(String type); + + /** + * Find the Druid server that is the leader of the specified type + * + * @param type the type of the leader + * @return the leader's server metadata + * @throws Exception + */ + DruidServerMetadata getLeaderForType(String type); + + /** + * Find all the Druid servers that have the specified type and service name + * + * @param type the type of Druid servers we want to find + * @param service the service name of Druid servers we want to find. It is specified in druid.service runtime properties + * @return a list of server metadata from matched Druid servers + * @throws Exception + */ + List getServersForTypeWithService(String type, String service); + + + /** + * Discuss: + * Discovery of a descriptor which returns a list of nodes which it describes + * (example: "can load deep-storage segments"). But any particular node may have any number of descriptors. + * + * @param capability + * @return + * @throws Exception + */ + List getServersWithCapability(String capability); +} diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryView.java b/server/src/main/java/io/druid/client/SingleServerInventoryView.java index 3a51cf0dd968..b24b87371646 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryView.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryView.java @@ -44,7 +44,7 @@ public SingleServerInventoryView( { super( log, - zkPaths.getAnnouncementsPath(), + zkPaths.getCapabilityPathFor("segmentServer"), zkPaths.getServedSegmentsPath(), curator, jsonMapper, diff --git a/server/src/main/java/io/druid/client/ZookeeperDruidServerDiscovery.java b/server/src/main/java/io/druid/client/ZookeeperDruidServerDiscovery.java new file mode 100644 index 000000000000..9e39d65233ff --- /dev/null +++ b/server/src/main/java/io/druid/client/ZookeeperDruidServerDiscovery.java @@ -0,0 +1,112 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Lists; +import com.google.api.client.util.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import com.metamx.common.ISE; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ZkPathsConfig; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; +import org.apache.zookeeper.KeeperException; + +import java.util.ArrayList; +import java.util.List; + +/** + */ +public class ZookeeperDruidServerDiscovery implements DruidServerDiscovery +{ + + private final CuratorFramework curator; + private final ObjectMapper objectMapper; + private final ZkPathsConfig zkPathsConfig; + + @Inject + public ZookeeperDruidServerDiscovery(CuratorFramework curator, ObjectMapper objectMapper, ZkPathsConfig zkPathsConfig) + { + this.curator = curator; + this.objectMapper = objectMapper; + this.zkPathsConfig = zkPathsConfig; + } + + @Override + public List getServersForType(String type) + { + return getNodesUnderPath(zkPathsConfig.getAnnouncementPathForType(type)); + } + + @Override + public DruidServerMetadata getLeaderForType(String type) + { + final List leader = getNodesUnderPath(zkPathsConfig.getLeadershipPathForType(type)); + if (leader.size() > 1) { + throw new ISE("There should only be 1 Coordinator leader in the cluster, got [%s]", leader); + } + return leader.size() == 0 ? null : leader.get(0); + } + + @Override + public List getServersForTypeWithService(final String type, final String service) + { + final List retVal = new ArrayList<>(); + for (DruidServerMetadata server : getServersForType(type)) { + if (server.getService().equals(service)) { + retVal.add(server); + } + } + return retVal; + } + + @Override + public List getServersWithCapability(String capability) + { + return getNodesUnderPath(zkPathsConfig.getCapabilityPathFor(capability)); + } + + private List getNodesUnderPath(String announcementPath) + { + final ImmutableList.Builder retVal = ImmutableList.builder(); + + try { + final List children = curator.getChildren().forPath(announcementPath); + for (String hostName : children) { + + retVal.add(objectMapper.readValue( + curator.getData() + .decompressed() + .forPath(ZKPaths.makePath(announcementPath, hostName)), + DruidServerMetadata.class + )); + } + } + catch (KeeperException.NoNodeException e) { + return Lists.newArrayList(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + return retVal.build(); + } +} diff --git a/server/src/main/java/io/druid/curator/CuratorConfig.java b/server/src/main/java/io/druid/curator/CuratorConfig.java index 7ef233eabbf1..c664763268ee 100644 --- a/server/src/main/java/io/druid/curator/CuratorConfig.java +++ b/server/src/main/java/io/druid/curator/CuratorConfig.java @@ -31,6 +31,9 @@ public class CuratorConfig @JsonProperty("host") private String zkHosts = "localhost"; + @JsonProperty("externalHost") + private String externalZkHosts = "localhost"; + @JsonProperty("sessionTimeoutMs") @Min(0) private int zkSessionTimeoutMs = 30000; @@ -46,40 +49,23 @@ public String getZkHosts() return zkHosts; } - public void setZkHosts(String zkHosts) - { - this.zkHosts = zkHosts; - } - public Integer getZkSessionTimeoutMs() { return zkSessionTimeoutMs; } - public void setZkSessionTimeoutMs(Integer zkSessionTimeoutMs) - { - this.zkSessionTimeoutMs = zkSessionTimeoutMs; - } - public boolean getEnableCompression() { return enableCompression; } - public void setEnableCompression(Boolean enableCompression) - { - Preconditions.checkNotNull(enableCompression, "enableCompression"); - this.enableCompression = enableCompression; - } - public boolean getEnableAcl() { return enableAcl; } - public void setEnableAcl(Boolean enableAcl) + public String getExternalZkHosts() { - Preconditions.checkNotNull(enableAcl, "enableAcl"); - this.enableAcl = enableAcl; + return externalZkHosts; } } diff --git a/server/src/main/java/io/druid/curator/CuratorModule.java b/server/src/main/java/io/druid/curator/CuratorModule.java index 9de4239ece24..05fc3650a58a 100644 --- a/server/src/main/java/io/druid/curator/CuratorModule.java +++ b/server/src/main/java/io/druid/curator/CuratorModule.java @@ -20,6 +20,7 @@ package io.druid.curator; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; import com.metamx.common.lifecycle.Lifecycle; @@ -28,6 +29,8 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; +import io.druid.guice.annotations.External; +import io.druid.guice.annotations.Internal; import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -54,9 +57,12 @@ public void configure(Binder binder) binder, "druid.zk.service", CuratorConfig.class ); + + binder.bind(CuratorFramework.class).to(Key.get(CuratorFramework.class, Internal.class)); } @Provides + @Internal @LazySingleton public CuratorFramework makeCurator(CuratorConfig config, Lifecycle lifecycle) throws IOException { @@ -91,6 +97,42 @@ public void stop() return framework; } + @Provides + @External + @LazySingleton + public CuratorFramework makeExternalCurator(CuratorConfig config, Lifecycle lifecycle) throws IOException + { + final CuratorFramework framework = + CuratorFrameworkFactory.builder() + .connectString(config.getExternalZkHosts()) + .sessionTimeoutMs(config.getZkSessionTimeoutMs()) + .retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30)) + .compressionProvider(new PotentiallyGzippedCompressionProvider(config.getEnableCompression())) + .aclProvider(config.getEnableAcl() ? new SecuredACLProvider() : new DefaultACLProvider()) + .build(); + + lifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() throws Exception + { + log.info("Starting Curator"); + framework.start(); + } + + @Override + public void stop() + { + log.info("Stopping Curator"); + framework.close(); + } + } + ); + + return framework; + } + class SecuredACLProvider implements ACLProvider { @Override diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index 43d81067328b..95d6e32e303e 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -31,11 +31,15 @@ import com.google.inject.name.Named; import com.google.inject.name.Names; import com.metamx.common.lifecycle.Lifecycle; +import io.druid.client.DruidServerDiscovery; +import io.druid.client.ZookeeperDruidServerDiscovery; import io.druid.guice.DruidBinders; import io.druid.guice.JsonConfigProvider; import io.druid.guice.KeyHolder; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; +import io.druid.guice.annotations.External; +import io.druid.guice.annotations.Internal; import io.druid.server.DruidNode; import io.druid.server.initialization.CuratorDiscoveryConfig; import org.apache.curator.framework.CuratorFramework; @@ -70,7 +74,7 @@ */ public class DiscoveryModule implements Module { - private static final String NAME = "DiscoveryModule:internal"; + private static final String NAME = "DiscoveryModule:external"; /** * Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle. @@ -140,10 +144,11 @@ public void configure(Binder binder) // Build the binder so that it will at a minimum inject an empty set. DruidBinders.discoveryAnnouncementBinder(binder); - binder.bind(ServiceAnnouncer.class) .to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME))) .in(LazySingleton.class); + + binder.bind(DruidServerDiscovery.class).to(ZookeeperDruidServerDiscovery.class); } @Provides @@ -195,7 +200,7 @@ public void stop() @Provides @LazySingleton public ServiceDiscovery getServiceDiscovery( - CuratorFramework curator, + @External CuratorFramework curator, CuratorDiscoveryConfig config, Lifecycle lifecycle ) throws Exception @@ -238,10 +243,11 @@ public void stop() @Provides @LazySingleton public ServerDiscoveryFactory getServerDiscoveryFactory( - ServiceDiscovery serviceDiscovery + ServiceDiscovery serviceDiscovery, + DruidServerDiscovery serverDiscovery ) { - return new ServerDiscoveryFactory(serviceDiscovery); + return new ServerDiscoveryFactory(serverDiscovery, serviceDiscovery); } private static class NoopServiceDiscovery implements ServiceDiscovery diff --git a/server/src/main/java/io/druid/curator/discovery/ExternalServerDiscoverySelector.java b/server/src/main/java/io/druid/curator/discovery/ExternalServerDiscoverySelector.java new file mode 100644 index 000000000000..bf47acfff6c8 --- /dev/null +++ b/server/src/main/java/io/druid/curator/discovery/ExternalServerDiscoverySelector.java @@ -0,0 +1,126 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator.discovery; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.net.HostAndPort; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import io.druid.client.selector.DiscoverySelector; +import io.druid.client.selector.Server; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.ServiceProvider; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +/** + */ +public class ExternalServerDiscoverySelector implements DiscoverySelector +{ + private static final Logger log = new Logger(ServerDiscoverySelector.class); + + private final ServiceProvider serviceProvider; + + public ExternalServerDiscoverySelector(ServiceProvider serviceProvider) + { + this.serviceProvider = serviceProvider; + } + + private static final Function TO_SERVER = new Function() + { + @Override + public Server apply(final ServiceInstance instance) + { + return new Server() + { + @Override + public String getHost() + { + return HostAndPort.fromParts(getAddress(), getPort()).toString(); + } + + @Override + public String getAddress() + { + return instance.getAddress(); + } + + @Override + public int getPort() + { + return instance.getPort(); + } + + @Override + public String getScheme() + { + return "http"; + } + }; + } + }; + + @Override + public Server pick() + { + final ServiceInstance instance; + try { + instance = serviceProvider.getInstance(); + } + catch (Exception e) { + log.info(e, "Exception getting instance"); + return null; + } + + if (instance == null) { + log.error("No server instance found"); + return null; + } + + return TO_SERVER.apply(instance); + } + + public Collection getAll() + { + try { + return Collections2.transform(serviceProvider.getAllInstances(), TO_SERVER); + } + catch (Exception e) { + log.info(e, "Unable to get all instances"); + return Collections.emptyList(); + } + } + + @LifecycleStart + public void start() throws Exception + { + serviceProvider.start(); + } + + @LifecycleStop + public void stop() throws IOException + { + serviceProvider.close(); + } +} diff --git a/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java b/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java index 3ddaa2c02300..2116fdfec49a 100644 --- a/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java +++ b/server/src/main/java/io/druid/curator/discovery/ServerDiscoveryFactory.java @@ -19,39 +19,51 @@ package io.druid.curator.discovery; +import com.google.common.base.Function; import com.google.inject.Inject; +import io.druid.client.DruidServerDiscovery; +import io.druid.server.coordination.DruidServerMetadata; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; import java.io.IOException; import java.util.Collection; +import java.util.List; /** */ public class ServerDiscoveryFactory { - private final ServiceDiscovery serviceDiscovery; + + private final DruidServerDiscovery serverDiscovery; + private final ServiceDiscovery externlServiceDiscovery; @Inject - public ServerDiscoveryFactory( - ServiceDiscovery serviceDiscovery + public ServerDiscoveryFactory(DruidServerDiscovery serverDiscovery, ServiceDiscovery externalServiceDiscovery) + { + this.serverDiscovery = serverDiscovery; + this.externlServiceDiscovery = externalServiceDiscovery; + } + + public ServerDiscoverySelector createSelector( + Function> selectFunction ) { - this.serviceDiscovery = serviceDiscovery; + return new ServerDiscoverySelector(serverDiscovery, selectFunction); } - public ServerDiscoverySelector createSelector(String serviceName) + public ExternalServerDiscoverySelector createExternalSelector(String serviceName) { if (serviceName == null) { - return new ServerDiscoverySelector(new NoopServiceProvider()); + return new ExternalServerDiscoverySelector(new NoopServiceProvider()); } - final ServiceProvider serviceProvider = serviceDiscovery + final ServiceProvider serviceProvider = externlServiceDiscovery .serviceProviderBuilder() .serviceName(CuratorServiceUtils.makeCanonicalServiceName(serviceName)) .build(); - return new ServerDiscoverySelector(serviceProvider); + return new ExternalServerDiscoverySelector(serviceProvider); } private static class NoopServiceProvider implements ServiceProvider @@ -75,7 +87,8 @@ public Collection> getAllInstances() throws Exception } @Override - public void noteError(ServiceInstance tServiceInstance) { + public void noteError(ServiceInstance tServiceInstance) + { // do nothing } diff --git a/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java b/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java index dfd7df251b4b..f5ec2a65b442 100644 --- a/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java +++ b/server/src/main/java/io/druid/curator/discovery/ServerDiscoverySelector.java @@ -22,48 +22,53 @@ import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.net.HostAndPort; -import com.metamx.common.lifecycle.LifecycleStart; -import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import io.druid.client.DruidServerDiscovery; import io.druid.client.selector.DiscoverySelector; import io.druid.client.selector.Server; -import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.ServiceProvider; +import io.druid.server.coordination.DruidServerMetadata; -import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** */ public class ServerDiscoverySelector implements DiscoverySelector { private static final Logger log = new Logger(ServerDiscoverySelector.class); - - private final ServiceProvider serviceProvider; - - public ServerDiscoverySelector(ServiceProvider serviceProvider) + private final DruidServerDiscovery druidServerDiscovery; + private final Function> selectFunction; + private final AtomicInteger roundRobinIdx; + + public ServerDiscoverySelector( + DruidServerDiscovery druidServerDiscovery, + Function> selectFunction + ) { - this.serviceProvider = serviceProvider; + this.roundRobinIdx = new AtomicInteger(0); + this.druidServerDiscovery = druidServerDiscovery; + this.selectFunction = selectFunction == null ? new NoopSelectFunction() : selectFunction; } - private static final Function TO_SERVER = new Function() + private static final Function TO_SERVER = new Function() { @Override - public Server apply(final ServiceInstance instance) + public Server apply(final DruidServerMetadata instance) { return new Server() { @Override public String getHost() { - return HostAndPort.fromParts(getAddress(), getPort()).toString(); + return HostAndPort.fromParts(instance.getHostText(), instance.getPort()).toString(); } @Override public String getAddress() { - return instance.getAddress(); + return instance.getHostText(); } @Override @@ -84,9 +89,13 @@ public String getScheme() @Override public Server pick() { - final ServiceInstance instance; + final DruidServerMetadata instance; try { - instance = serviceProvider.getInstance(); + final List candidates = selectFunction.apply(druidServerDiscovery); + if (candidates == null || candidates.isEmpty()) { + return null; + } + instance = roundRobinPick(candidates); } catch (Exception e) { log.info(e, "Exception getting instance"); @@ -101,10 +110,15 @@ public Server pick() return TO_SERVER.apply(instance); } + private DruidServerMetadata roundRobinPick(List candidates) + { + return candidates.get(roundRobinIdx.getAndIncrement() % candidates.size()); + } + public Collection getAll() { try { - return Collections2.transform(serviceProvider.getAllInstances(), TO_SERVER); + return Collections2.transform(selectFunction.apply(druidServerDiscovery), TO_SERVER); } catch (Exception e) { log.info(e, "Unable to get all instances"); @@ -112,15 +126,12 @@ public Collection getAll() } } - @LifecycleStart - public void start() throws Exception - { - serviceProvider.start(); - } - - @LifecycleStop - public void stop() throws IOException + private static class NoopSelectFunction implements Function> { - serviceProvider.close(); + @Override + public List apply(DruidServerDiscovery input) + { + return Collections.emptyList(); + } } } diff --git a/server/src/main/java/io/druid/guice/AnnouncerModule.java b/server/src/main/java/io/druid/guice/AnnouncerModule.java index 8d1152ea7cad..77c02644fcdf 100644 --- a/server/src/main/java/io/druid/guice/AnnouncerModule.java +++ b/server/src/main/java/io/druid/guice/AnnouncerModule.java @@ -22,11 +22,15 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.multibindings.Multibinder; import io.druid.concurrent.Execs; import io.druid.curator.announcement.Announcer; +import io.druid.curator.discovery.ServiceAnnouncer; import io.druid.server.coordination.BatchDataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncerProvider; +import io.druid.server.coordination.ZooKeeperServerAnnouncer; +import io.druid.server.coordination.ServerAnnouncer; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import org.apache.curator.framework.CuratorFramework; @@ -41,6 +45,10 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class); binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class); binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleLast.class); + binder.bind(ServerAnnouncer.class).to(ZooKeeperServerAnnouncer.class); + binder.bind(ZooKeeperServerAnnouncer.class).in(ManageLifecycleLast.class); + Multibinder.newSetBinder(binder, String.class, Capability.class); + LifecycleModule.register(binder, ServerAnnouncer.class); } @Provides diff --git a/server/src/main/java/io/druid/guice/Capability.java b/server/src/main/java/io/druid/guice/Capability.java new file mode 100644 index 000000000000..898a0defc44a --- /dev/null +++ b/server/src/main/java/io/druid/guice/Capability.java @@ -0,0 +1,36 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.guice; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + */ +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface Capability +{ +} diff --git a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java index 1d2e1a85b0c3..1f3ba84bdb17 100644 --- a/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/CoordinatorDiscoveryModule.java @@ -19,13 +19,20 @@ package io.druid.guice; +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import io.druid.client.DruidServerDiscovery; import io.druid.client.coordinator.Coordinator; import io.druid.client.coordinator.CoordinatorSelectorConfig; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.server.coordination.DruidServerMetadata; + +import java.util.List; /** */ @@ -34,17 +41,29 @@ public class CoordinatorDiscoveryModule implements Module @Override public void configure(Binder binder) { - JsonConfigProvider.bind(binder, "druid.selectors.coordinator", CoordinatorSelectorConfig.class); } @Provides @Coordinator - @ManageLifecycle public ServerDiscoverySelector getServiceProvider( CoordinatorSelectorConfig config, ServerDiscoveryFactory serverDiscoveryFactory ) { - return serverDiscoveryFactory.createSelector(config.getServiceName()); + return serverDiscoveryFactory.createSelector(new Function>() + { + @Override + public List apply(DruidServerDiscovery discovery) + { + final DruidServerMetadata leader; + try { + leader = discovery.getLeaderForType("coordinator"); + return leader == null ? null : Lists.newArrayList(leader); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + }); } } diff --git a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java index 167e003f2365..a7e71967bbbf 100644 --- a/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java +++ b/server/src/main/java/io/druid/guice/IndexingServiceDiscoveryModule.java @@ -19,13 +19,20 @@ package io.druid.guice; +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import io.druid.client.DruidServerDiscovery; import io.druid.client.indexing.IndexingService; import io.druid.client.indexing.IndexingServiceSelectorConfig; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.server.coordination.DruidServerMetadata; + +import java.util.List; /** */ @@ -34,17 +41,29 @@ public class IndexingServiceDiscoveryModule implements Module @Override public void configure(Binder binder) { - JsonConfigProvider.bind(binder, "druid.selectors.indexing", IndexingServiceSelectorConfig.class); } @Provides @IndexingService - @ManageLifecycle public ServerDiscoverySelector getServiceProvider( IndexingServiceSelectorConfig config, ServerDiscoveryFactory serverDiscoveryFactory ) { - return serverDiscoveryFactory.createSelector(config.getServiceName()); + return serverDiscoveryFactory.createSelector(new Function>() + { + @Override + public List apply(DruidServerDiscovery discovery) + { + final DruidServerMetadata leader; + try { + leader = discovery.getLeaderForType("overlord"); + return leader == null ? null : Lists.newArrayList(leader); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + }); } } diff --git a/server/src/main/java/io/druid/guice/StorageNodeModule.java b/server/src/main/java/io/druid/guice/StorageNodeModule.java index 4d9d43b9d0da..8f1c00e476c9 100644 --- a/server/src/main/java/io/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/io/druid/guice/StorageNodeModule.java @@ -71,7 +71,10 @@ public DruidServerMetadata getMetadata(@Self DruidNode node, @Nullable NodeTypeC config.getMaxSize(), nodeType.getNodeType(), config.getTier(), - config.getPriority() + config.getPriority(), + node.getServiceName(), + node.getHost(), + node.getPort() ); } } diff --git a/server/src/main/java/io/druid/guice/annotations/External.java b/server/src/main/java/io/druid/guice/annotations/External.java new file mode 100644 index 000000000000..1290d7e4f563 --- /dev/null +++ b/server/src/main/java/io/druid/guice/annotations/External.java @@ -0,0 +1,36 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + */ +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface External +{ +} diff --git a/server/src/main/java/io/druid/guice/annotations/Internal.java b/server/src/main/java/io/druid/guice/annotations/Internal.java new file mode 100644 index 000000000000..e7c9e8efba59 --- /dev/null +++ b/server/src/main/java/io/druid/guice/annotations/Internal.java @@ -0,0 +1,36 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + */ +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface Internal +{ +} diff --git a/server/src/main/java/io/druid/server/coordination/AbstractDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/AbstractDataSegmentAnnouncer.java index c2c76f6bbcd7..c97321b0e5be 100644 --- a/server/src/main/java/io/druid/server/coordination/AbstractDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/AbstractDataSegmentAnnouncer.java @@ -19,44 +19,17 @@ package io.druid.server.coordination; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.common.logger.Logger; -import io.druid.curator.announcement.Announcer; -import io.druid.server.initialization.ZkPathsConfig; -import org.apache.curator.utils.ZKPaths; /** */ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnouncer { - private static final Logger log = new Logger(AbstractDataSegmentAnnouncer.class); - - private final DruidServerMetadata server; - private final ZkPathsConfig config; - private final Announcer announcer; - private final ObjectMapper jsonMapper; - private final Object lock = new Object(); private volatile boolean started = false; - protected AbstractDataSegmentAnnouncer( - DruidServerMetadata server, - ZkPathsConfig config, - Announcer announcer, - ObjectMapper jsonMapper - ) - { - this.server = server; - this.config = config; - this.announcer = announcer; - this.jsonMapper = jsonMapper; - } - @LifecycleStart public void start() { @@ -65,15 +38,6 @@ public void start() return; } - try { - final String path = makeAnnouncementPath(); - log.info("Announcing self[%s] at [%s]", server, path); - announcer.announce(path, jsonMapper.writeValueAsBytes(server), false); - } - catch (JsonProcessingException e) { - throw Throwables.propagate(e); - } - started = true; } } @@ -86,15 +50,8 @@ public void stop() return; } - log.info("Stopping %s with config[%s]", getClass(), config); - announcer.unannounce(makeAnnouncementPath()); - started = false; } } - private String makeAnnouncementPath() - { - return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName()); - } } diff --git a/server/src/main/java/io/druid/server/coordination/AbstractServerAnnouncer.java b/server/src/main/java/io/druid/server/coordination/AbstractServerAnnouncer.java new file mode 100644 index 000000000000..0bc4f767b30c --- /dev/null +++ b/server/src/main/java/io/druid/server/coordination/AbstractServerAnnouncer.java @@ -0,0 +1,39 @@ +package io.druid.server.coordination; + +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; + +/** + */ +public abstract class AbstractServerAnnouncer implements ServerAnnouncer +{ + private volatile boolean started = false; + + private final Object lock = new Object(); + + @LifecycleStart + public void start() + { + synchronized (lock) { + if (started) { + return; + } + + announceSelf(); + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (!started) { + return; + } + + unannounceSelf(); + started = false; + } + } +} diff --git a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java index 503649a5073d..fb0553571908 100644 --- a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -69,7 +69,6 @@ public BatchDataSegmentAnnouncer( ObjectMapper jsonMapper ) { - super(server, zkPaths, announcer, jsonMapper); this.config = config; this.announcer = announcer; this.jsonMapper = jsonMapper; diff --git a/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java b/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java index 9094d1009980..54b6a994f178 100644 --- a/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java +++ b/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java @@ -32,6 +32,9 @@ public class DruidServerMetadata private final String tier; private final String type; private final int priority; + private final String service; + private final String hostText; + private final int port; @JsonCreator public DruidServerMetadata( @@ -40,7 +43,10 @@ public DruidServerMetadata( @JsonProperty("maxSize") long maxSize, @JsonProperty("type") String type, @JsonProperty("tier") String tier, - @JsonProperty("priority") int priority + @JsonProperty("priority") int priority, + @JsonProperty("service") String service, + @JsonProperty("hostText") String hostText, + @JsonProperty("port") int port ) { this.name = name; @@ -49,6 +55,9 @@ public DruidServerMetadata( this.tier = tier; this.type = type; this.priority = priority; + this.service = service; + this.hostText = hostText; + this.port = port; } @JsonProperty @@ -87,9 +96,27 @@ public int getPriority() return priority; } + @JsonProperty + public String getService() + { + return service; + } + + @JsonProperty + public String getHostText() + { + return hostText; + } + + @JsonProperty + public int getPort() + { + return port; + } + public boolean isAssignable() { - return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge"); + return type.equalsIgnoreCase("historical"); } @Override @@ -102,28 +129,34 @@ public boolean equals(Object o) return false; } - DruidServerMetadata metadata = (DruidServerMetadata) o; + DruidServerMetadata that = (DruidServerMetadata) o; - if (maxSize != metadata.maxSize) { + if (maxSize != that.maxSize) { + return false; + } + if (priority != that.priority) { + return false; + } + if (port != that.port) { return false; } - if (priority != metadata.priority) { + if (name != null ? !name.equals(that.name) : that.name != null) { return false; } - if (host != null ? !host.equals(metadata.host) : metadata.host != null) { + if (host != null ? !host.equals(that.host) : that.host != null) { return false; } - if (name != null ? !name.equals(metadata.name) : metadata.name != null) { + if (tier != null ? !tier.equals(that.tier) : that.tier != null) { return false; } - if (tier != null ? !tier.equals(metadata.tier) : metadata.tier != null) { + if (type != null ? !type.equals(that.type) : that.type != null) { return false; } - if (type != null ? !type.equals(metadata.type) : metadata.type != null) { + if (service != null ? !service.equals(that.service) : that.service != null) { return false; } + return hostText != null ? hostText.equals(that.hostText) : that.hostText == null; - return true; } @Override @@ -135,6 +168,9 @@ public int hashCode() result = 31 * result + (tier != null ? tier.hashCode() : 0); result = 31 * result + (type != null ? type.hashCode() : 0); result = 31 * result + priority; + result = 31 * result + (service != null ? service.hashCode() : 0); + result = 31 * result + (hostText != null ? hostText.hashCode() : 0); + result = 31 * result + port; return result; } @@ -147,7 +183,10 @@ public String toString() ", maxSize=" + maxSize + ", tier='" + tier + '\'' + ", type='" + type + '\'' + - ", priority='" + priority + '\'' + + ", priority=" + priority + + ", service='" + service + '\'' + + ", hostText='" + hostText + '\'' + + ", port=" + port + '}'; } } diff --git a/server/src/main/java/io/druid/server/coordination/ServerAnnouncer.java b/server/src/main/java/io/druid/server/coordination/ServerAnnouncer.java new file mode 100644 index 000000000000..a4f3667b113f --- /dev/null +++ b/server/src/main/java/io/druid/server/coordination/ServerAnnouncer.java @@ -0,0 +1,36 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +/** + * ServerAnnouncer is bound to each Druid server. When the lifecycle starts, ServerAnnouncer will make an announcement + * to indicate the existence of the Druid server to which it belongs. + */ +public interface ServerAnnouncer +{ + + void announceSelf(); + + void unannounceSelf(); + + void announceLeadership(); + + void unannounceLeadership(); +} diff --git a/server/src/main/java/io/druid/server/coordination/ZooKeeperServerAnnouncer.java b/server/src/main/java/io/druid/server/coordination/ZooKeeperServerAnnouncer.java new file mode 100644 index 000000000000..adcd2b2fbd00 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordination/ZooKeeperServerAnnouncer.java @@ -0,0 +1,110 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.inject.Inject; +import com.metamx.common.logger.Logger; +import io.druid.curator.announcement.Announcer; +import io.druid.guice.Capability; +import io.druid.server.initialization.ZkPathsConfig; + +import java.util.Set; + +/** + */ +public class ZooKeeperServerAnnouncer extends AbstractServerAnnouncer +{ + + private static final Logger log = new Logger(ZooKeeperServerAnnouncer.class); + + private final DruidServerMetadata me; + private final ObjectMapper objectMapper; + private final ZkPathsConfig zkPathsConfig; + private final Announcer announcer; + private final Set capabilities; + private final String selfAnnouncementPath; + private final String leadershipPath; + + @Inject + public ZooKeeperServerAnnouncer( + DruidServerMetadata me, + ObjectMapper mapper, + ZkPathsConfig zkPathsConfig, + Announcer announcer, + @Capability Set capabilities + ) + { + this.me = me; + this.objectMapper = mapper; + this.zkPathsConfig = zkPathsConfig; + this.announcer = announcer; + this.capabilities = capabilities; + this.selfAnnouncementPath = zkPathsConfig.getAnnouncementPathForServer(me); + this.leadershipPath = zkPathsConfig.getLeadershipPathForServer(me); + } + + @Override + public void announceLeadership() + { + log.info("Announcing self [%s] as leader at [%s]", me, selfAnnouncementPath); + try { + announcer.announce(leadershipPath, objectMapper.writeValueAsBytes(me), false); + } + catch (JsonProcessingException e) { + throw Throwables.propagate(e); + } + } + + @Override + public void unannounceLeadership() + { + log.info("Unannouncing self [%s] as leader at [%s]", me, selfAnnouncementPath); + announcer.unannounce(leadershipPath); + } + + @Override + public void announceSelf() + { + + log.info("Announcing self [%s] at [%s]", me, selfAnnouncementPath); + try { + final byte[] meta = objectMapper.writeValueAsBytes(me); + announcer.announce(selfAnnouncementPath, meta, false); + for (String cap : capabilities) { + announcer.announce(zkPathsConfig.getCapabilityPathForServerWithCapability(me, cap), meta, false); + } + } + catch (JsonProcessingException e) { + throw Throwables.propagate(e); + } + } + + @Override + public void unannounceSelf() + { + announcer.unannounce(selfAnnouncementPath); + for (String cap : capabilities) { + announcer.unannounce(zkPathsConfig.getCapabilityPathFor(cap)); + } + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 748a8522a6b4..478705d18bff 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -57,6 +57,7 @@ import io.druid.metadata.MetadataSegmentManager; import io.druid.segment.IndexIO; import io.druid.server.DruidNode; +import io.druid.server.coordination.ServerAnnouncer; import io.druid.server.coordinator.helper.DruidCoordinatorBalancer; import io.druid.server.coordinator.helper.DruidCoordinatorCleanupOvershadowed; import io.druid.server.coordinator.helper.DruidCoordinatorCleanupUnneeded; @@ -111,6 +112,7 @@ public Interval apply(DataSegment segment) .reverse(); private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class); + private final Object lock = new Object(); private final DruidCoordinatorConfig config; private final ZkPathsConfig zkPaths; @@ -127,6 +129,8 @@ public Interval apply(DataSegment segment) private final AtomicReference leaderLatch; private final ServiceAnnouncer serviceAnnouncer; private final DruidNode self; + private final ServerAnnouncer serverAnnouncer; + private volatile boolean started = false; private volatile int leaderCounter = 0; private volatile boolean leader = false; @@ -147,7 +151,8 @@ public DruidCoordinator( IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster, ServiceAnnouncer serviceAnnouncer, - @Self DruidNode self + @Self DruidNode self, + ServerAnnouncer serverAnnouncer ) { this( @@ -164,7 +169,8 @@ public DruidCoordinator( taskMaster, serviceAnnouncer, self, - Maps.newConcurrentMap() + Maps.newConcurrentMap(), + serverAnnouncer ); } @@ -182,7 +188,8 @@ public DruidCoordinator( LoadQueueTaskMaster taskMaster, ServiceAnnouncer serviceAnnouncer, DruidNode self, - ConcurrentMap loadQueuePeonMap + ConcurrentMap loadQueuePeonMap, + ServerAnnouncer serverAnnouncer ) { this.config = config; @@ -198,6 +205,7 @@ public DruidCoordinator( this.taskMaster = taskMaster; this.serviceAnnouncer = serviceAnnouncer; this.self = self; + this.serverAnnouncer = serverAnnouncer; this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); @@ -546,6 +554,7 @@ private void becomeLeader() metadataRuleManager.start(); serverInventoryView.start(); serviceAnnouncer.announce(self); + serverAnnouncer.announceLeadership(); final int startingLeaderCounter = leaderCounter; final List> coordinatorRunnables = Lists.newArrayList(); @@ -631,6 +640,7 @@ private void stopBeingLeader() loadManagementPeons.clear(); serviceAnnouncer.unannounce(self); + serverAnnouncer.unannounceLeadership(); serverInventoryView.stop(); metadataRuleManager.stop(); metadataSegmentManager.stop(); diff --git a/server/src/main/java/io/druid/server/http/ClusterInfoResource.java b/server/src/main/java/io/druid/server/http/ClusterInfoResource.java new file mode 100644 index 000000000000..7b736149405a --- /dev/null +++ b/server/src/main/java/io/druid/server/http/ClusterInfoResource.java @@ -0,0 +1,181 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.http; + +import com.google.api.client.util.Maps; +import com.google.common.collect.ImmutableMap; +import io.druid.client.DruidServerDiscovery; +import io.druid.server.coordination.DruidServerMetadata; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.util.List; +import java.util.Map; + +/** + */ +@Path("/druid/coordinator/v1/nodes") +public class ClusterInfoResource +{ + + private static final String HISTORICAL = "historical"; + private static final String COORDINATOR = "coordinator"; + private static final String OVERLORD = "overlord"; + private static final String BROKER = "broker"; + private static final String ROUTER = "router"; + private static final String REALTIME = "realtime"; + + private final DruidServerDiscovery discovery; + + @Inject + public ClusterInfoResource(DruidServerDiscovery discovery) + { + this.discovery = discovery; + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getClusterInfo() + { + final Map> clusterInfo = Maps.newHashMap(); + try { + clusterInfo.put(COORDINATOR, discovery.getServersForType(COORDINATOR)); + clusterInfo.put(OVERLORD, discovery.getServersForType(OVERLORD)); + clusterInfo.put(HISTORICAL, discovery.getServersForType(HISTORICAL)); + clusterInfo.put(BROKER, discovery.getServersForType(BROKER)); + clusterInfo.put(ROUTER, discovery.getServersForType(ROUTER)); + clusterInfo.put(REALTIME, discovery.getServersForType(REALTIME)); + } + catch (Exception e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + return Response.ok(clusterInfo).build(); + } + + @GET + @Path("/historical") + @Produces(MediaType.APPLICATION_JSON) + public Response getHistoricalInfo() + { + final Map> clusterInfo = Maps.newHashMap(); + try { + clusterInfo.put(HISTORICAL, discovery.getServersForType(HISTORICAL)); + } + catch (Exception e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + return Response.ok(clusterInfo).build(); + } + + @GET + @Path("/overlord") + @Produces(MediaType.APPLICATION_JSON) + public Response getOverlordInfo() + { + final Map> clusterInfo = Maps.newHashMap(); + try { + clusterInfo.put(OVERLORD, discovery.getServersForType(OVERLORD)); + } + catch (Exception e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + return Response.ok(clusterInfo).build(); + } + + @GET + @Path("/broker") + @Produces(MediaType.APPLICATION_JSON) + public Response getBrokerInfo() + { + final Map> clusterInfo = Maps.newHashMap(); + try { + clusterInfo.put(BROKER, discovery.getServersForType(BROKER)); + } + catch (Exception e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + return Response.ok(clusterInfo).build(); + } + + @GET + @Path("/coordinator") + @Produces(MediaType.APPLICATION_JSON) + public Response getCoordinatorInfo() + { + final Map> clusterInfo = Maps.newHashMap(); + try { + clusterInfo.put(COORDINATOR, discovery.getServersForType(COORDINATOR)); + } + catch (Exception e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + return Response.ok(clusterInfo).build(); + } + + @GET + @Path("/router") + @Produces(MediaType.APPLICATION_JSON) + public Response getRouterInfo() + { + final Map> clusterInfo = Maps.newHashMap(); + try { + clusterInfo.put(ROUTER, discovery.getServersForType(ROUTER)); + } + catch (Exception e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + return Response.ok(clusterInfo).build(); + } + + @GET + @Path("/realtime") + @Produces(MediaType.APPLICATION_JSON) + public Response getRealtimeInfo() + { + final Map> clusterInfo = Maps.newHashMap(); + try { + clusterInfo.put(REALTIME, discovery.getServersForType(REALTIME)); + } + catch (Exception e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + return Response.ok(clusterInfo).build(); + } + + +} diff --git a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java index ed6bf8505546..6c7386144d28 100644 --- a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java @@ -20,34 +20,35 @@ package io.druid.server.initialization; import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.server.coordination.DruidServerMetadata; import org.apache.curator.utils.ZKPaths; public class ZkPathsConfig { @JsonProperty - private - String base = "druid"; + private String base = "druid"; + @JsonProperty - private - String propertiesPath; + private String propertiesPath; + @JsonProperty - private - String announcementsPath; - @JsonProperty @Deprecated - private - String servedSegmentsPath; + private String announcementsPath; + @JsonProperty - private - String liveSegmentsPath; + @Deprecated + private String servedSegmentsPath; + @JsonProperty - private - String coordinatorPath; + private String liveSegmentsPath; + + @JsonProperty + private String coordinatorPath; + @JsonProperty - private - String loadQueuePath; + private String loadQueuePath; + @JsonProperty - private - String connectorPath; + private String connectorPath; public String getBase() { @@ -59,15 +60,51 @@ public String getPropertiesPath() return (null == propertiesPath) ? defaultPath("properties") : propertiesPath; } + @Deprecated public String getAnnouncementsPath() { return (null == announcementsPath) ? defaultPath("announcements") : announcementsPath; } + private String getAnnouncementsPathBase() + { + return (null == announcementsPath) ? defaultPath("announcements") : announcementsPath; + } + + public String getLeadershipPathForType(String type) + { + return ZKPaths.makePath(ZKPaths.makePath(getAnnouncementsPathBase(), "leadership"), type); + } + + public String getLeadershipPathForServer(DruidServerMetadata server) + { + return ZKPaths.makePath(getLeadershipPathForType(server.getType()), server.getName()); + } + + public String getAnnouncementPathForType(String type) + { + return ZKPaths.makePath(getAnnouncementsPathBase(), type); + } + + public String getAnnouncementPathForServer(DruidServerMetadata server) + { + return ZKPaths.makePath(getAnnouncementPathForType(server.getType()), server.getName()); + } + + public String getCapabilityPathFor(String capability) + { + return ZKPaths.makePath(ZKPaths.makePath(getAnnouncementsPathBase(), "capability"), capability); + } + + public String getCapabilityPathForServerWithCapability(DruidServerMetadata server, String capability) + { + return ZKPaths.makePath(getCapabilityPathFor(capability), server.getName()); + } + @Deprecated public String getServedSegmentsPath() { - return (null == servedSegmentsPath) ? defaultPath("servedSegments") : servedSegmentsPath; + return (null == servedSegmentsPath) ? defaultPath("servedSegments") : servedSegmentsPath; } public String getLiveSegmentsPath() @@ -77,17 +114,17 @@ public String getLiveSegmentsPath() public String getCoordinatorPath() { - return (null == coordinatorPath) ? defaultPath("coordinator") : coordinatorPath; + return (null == coordinatorPath) ? defaultPath("coordinator") : coordinatorPath; } public String getLoadQueuePath() { - return (null == loadQueuePath) ? defaultPath("loadQueue") : loadQueuePath; + return (null == loadQueuePath) ? defaultPath("loadQueue") : loadQueuePath; } public String getConnectorPath() { - return (null == connectorPath) ? defaultPath("connector") : connectorPath; + return (null == connectorPath) ? defaultPath("connector") : connectorPath; } protected String defaultPath(final String subPath) @@ -96,30 +133,26 @@ protected String defaultPath(final String subPath) } @Override - public boolean equals(Object other){ - if(null == other){ + public boolean equals(Object other) + { + if (null == other) { return false; } - if(this == other){ + if (this == other) { return true; } - if(!(other instanceof ZkPathsConfig)){ + if (!(other instanceof ZkPathsConfig)) { return false; } ZkPathsConfig otherConfig = (ZkPathsConfig) other; - if( - this.getBase().equals(otherConfig.getBase()) && - this.getAnnouncementsPath().equals(otherConfig.getAnnouncementsPath()) && - this.getConnectorPath().equals(otherConfig.getConnectorPath()) && - this.getLiveSegmentsPath().equals(otherConfig.getLiveSegmentsPath()) && - this.getCoordinatorPath().equals(otherConfig.getCoordinatorPath()) && - this.getLoadQueuePath().equals(otherConfig.getLoadQueuePath()) && - this.getPropertiesPath().equals(otherConfig.getPropertiesPath()) && - this.getServedSegmentsPath().equals(otherConfig.getServedSegmentsPath()) - ){ - return true; - } - return false; + return this.getBase().equals(otherConfig.getBase()) && + this.getAnnouncementsPathBase().equals(otherConfig.getAnnouncementsPathBase()) && + this.getConnectorPath().equals(otherConfig.getConnectorPath()) && + this.getLiveSegmentsPath().equals(otherConfig.getLiveSegmentsPath()) && + this.getCoordinatorPath().equals(otherConfig.getCoordinatorPath()) && + this.getLoadQueuePath().equals(otherConfig.getLoadQueuePath()) && + this.getPropertiesPath().equals(otherConfig.getPropertiesPath()) && + this.getServedSegmentsPath().equals(otherConfig.getServedSegmentsPath()); } @Override diff --git a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java index 6ab9df3a6107..2869a624e4bc 100644 --- a/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java +++ b/server/src/main/java/io/druid/server/router/CoordinatorRuleManager.java @@ -33,6 +33,7 @@ import com.metamx.http.client.Request; import com.metamx.http.client.response.FullResponseHandler; import com.metamx.http.client.response.FullResponseHolder; +import io.druid.client.coordinator.Coordinator; import io.druid.client.selector.Server; import io.druid.concurrent.Execs; import io.druid.curator.discovery.ServerDiscoverySelector; @@ -79,7 +80,7 @@ public CoordinatorRuleManager( @Global HttpClient httpClient, @Json ObjectMapper jsonMapper, Supplier config, - ServerDiscoverySelector selector + @Coordinator ServerDiscoverySelector selector ) { this.httpClient = httpClient; @@ -206,7 +207,7 @@ private String getRuleURL() throws URISyntaxException Server server = selector.pick(); if (server == null) { - log.error("No instances found for [%s]!", config.get().getCoordinatorServiceName()); + log.error("No instances found for Coordinator!"); return null; } diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java index 70689c25f4f3..93a6ffa4fc88 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerConfig.java @@ -52,6 +52,7 @@ public class TieredBrokerConfig @NotNull private String rulesEndpoint = "/druid/coordinator/v1/rules"; + @Deprecated @JsonProperty @NotNull private String coordinatorServiceName = DEFAULT_COORDINATOR_SERVICE_NAME; @@ -92,6 +93,7 @@ public String getRulesEndpoint() return rulesEndpoint; } + @Deprecated public String getCoordinatorServiceName() { return coordinatorServiceName; diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java index b9dd23587c46..3ae6b893d160 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java @@ -19,23 +19,29 @@ package io.druid.server.router; +import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.inject.Inject; import com.metamx.common.Pair; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; +import io.druid.client.DruidServerDiscovery; import io.druid.client.selector.HostSelector; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.query.Query; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.coordinator.rules.LoadRule; import io.druid.server.coordinator.rules.Rule; import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -80,9 +86,21 @@ public void start() } try { - for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { - ServerDiscoverySelector selector = serverDiscoveryFactory.createSelector(entry.getValue()); - selector.start(); + for (final Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { + final ServerDiscoverySelector selector = serverDiscoveryFactory.createSelector( + new Function>() + { + @Override + public List apply(DruidServerDiscovery discovery) + { + try { + return discovery.getServersForTypeWithService("broker", entry.getValue()); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + }); selectorMap.put(entry.getValue(), selector); } } @@ -103,15 +121,6 @@ public void stop() return; } - try { - for (ServerDiscoverySelector selector : selectorMap.values()) { - selector.stop(); - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } - started = false; } } diff --git a/server/src/test/java/io/druid/client/BrokerServerViewTest.java b/server/src/test/java/io/druid/client/BrokerServerViewTest.java index fd3fb91d75ff..830f3160ea8d 100644 --- a/server/src/test/java/io/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/io/druid/client/BrokerServerViewTest.java @@ -99,7 +99,10 @@ public void testSingleServerAddedRemovedSegment() throws Exception 10000000L, "historical", "default_tier", - 0 + 0, + "service", + "hostText", + -1 ); setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); @@ -165,7 +168,10 @@ public DruidServer apply(String input) 10000000L, "historical", "default_tier", - 0 + 0, + "service", + "hostText", + -1 ); } } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java index 99d30caec6be..9fbb3d650aca 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java @@ -158,7 +158,7 @@ public Comparator getComparator() { @Override public QueryableDruidServer pick(TreeMap> prioritizedServers, DataSegment segment) { return new QueryableDruidServer( - new DruidServer("localhost", "localhost", 100, "historical", "a", 10), + new DruidServer("localhost", "localhost", 100, "historical", "a", 10, "service", "hostText", -1), EasyMock.createNiceMock(DirectDruidClient.class) ); } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 4f09b05cfcfc..7439bbb1cca1 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -302,11 +302,11 @@ public void setUp() throws Exception client = makeClient(MoreExecutors.sameThreadExecutor()); servers = new DruidServer[]{ - new DruidServer("test1", "test1", 10, "historical", "bye", 0), - new DruidServer("test2", "test2", 10, "historical", "bye", 0), - new DruidServer("test3", "test3", 10, "historical", "bye", 0), - new DruidServer("test4", "test4", 10, "historical", "bye", 0), - new DruidServer("test5", "test5", 10, "historical", "bye", 0) + new DruidServer("test1", "test1", 10, "historical", "bye", 0, "service", "hostText", -1), + new DruidServer("test2", "test2", 10, "historical", "bye", 0, "service", "hostText", -1), + new DruidServer("test3", "test3", 10, "historical", "bye", 0, "service", "hostText", -1), + new DruidServer("test4", "test4", 10, "historical", "bye", 0, "service", "hostText", -1), + new DruidServer("test5", "test5", 10, "historical", "bye", 0, "service", "hostText", -1) }; } diff --git a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java index 17636c031227..93020ce4db7b 100644 --- a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java +++ b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java @@ -91,7 +91,10 @@ public void testSingleServerAddedRemovedSegment() throws Exception 10000000L, "historical", "default_tier", - 0 + 0, + "service", + "hostText", + -1 ); setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); @@ -158,7 +161,10 @@ public DruidServer apply(String input) 10000000L, "historical", "default_tier", - 0 + 0, + "service", + "hostText", + -1 ); } } diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index 413e6e25517d..f44ef5e379a9 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -133,12 +133,12 @@ public void testRun() throws Exception ); QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer( - new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0), + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0, "service", "hostText", -1), client1 ); serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment()); QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer( - new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0), + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0, "service", "hostText", -1), client2 ); serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); @@ -235,7 +235,7 @@ public void testCancel() throws Exception ); QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer( - new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0), + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0, "service", "hostText", -1), client1 ); serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment()); diff --git a/server/src/test/java/io/druid/client/TestDruidServerDiscovery.java b/server/src/test/java/io/druid/client/TestDruidServerDiscovery.java new file mode 100644 index 000000000000..9b68579330bc --- /dev/null +++ b/server/src/test/java/io/druid/client/TestDruidServerDiscovery.java @@ -0,0 +1,64 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import io.druid.server.coordination.DruidServerMetadata; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + */ +public class TestDruidServerDiscovery implements DruidServerDiscovery +{ + + private final Map> nodes; + + public TestDruidServerDiscovery(Map> nodes) + { + this.nodes = nodes; + } + + @Override + public List getServersForType(String type) + { + return nodes.get(type); + } + + @Override + public DruidServerMetadata getLeaderForType(String type) + { + return null; + } + + @Override + public List getServersForTypeWithService(String type, String service) + { + return null; + } + + @Override + public List getServersWithCapability(String capability) + { + return null; + } +} diff --git a/server/src/test/java/io/druid/client/ZookeeperDruidServerDiscoveryTest.java b/server/src/test/java/io/druid/client/ZookeeperDruidServerDiscoveryTest.java new file mode 100644 index 000000000000..2aee38d64810 --- /dev/null +++ b/server/src/test/java/io/druid/client/ZookeeperDruidServerDiscoveryTest.java @@ -0,0 +1,250 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.curator.CuratorTestBase; +import io.druid.curator.announcement.Announcer; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ZooKeeperServerAnnouncer; +import io.druid.server.initialization.ZkPathsConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + */ +public class ZookeeperDruidServerDiscoveryTest extends CuratorTestBase +{ + + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + private static final DruidServerMetadata COORDINATOR_LEADER = new DruidServerMetadata( + "coor1", + "localhost", + 100, + "coordinator", + "1", + 0, + "coordinator", + "hostText", + -1 + ); + private static final DruidServerMetadata OVERLORD_LEADER = new DruidServerMetadata( + "over1", + "localhost", + 100, + "overlord", + "1", + 0, + "overlord", + "hostText", + -1 + ); + + private static Map> cluster; + private static Map announcers; + private static ZkPathsConfig ZK_PATHS_CONFIG = new ZkPathsConfig() + { + @Override + public String getBase() + { + return "test/druid"; + } + }; + + private ZookeeperDruidServerDiscovery discovery; + private Announcer announcer; + + @BeforeClass + public static void setupStatic() throws Exception + { + cluster = ImmutableMap.>builder() + .put( + "historical", + ImmutableSet.of( + new DruidServerMetadata("hist1", "localhost", 100, "historical", "1", 0, "historical", "hostText", -1), + new DruidServerMetadata("hist2", "localhost", 100, "historical", "1", 0, "historical", "hostText", -1) + ) + ) + .put( + "broker", + ImmutableSet.of( + new DruidServerMetadata("brok1", "localhost", 100, "broker", "1", 0, "broker", "hostText", -1), + new DruidServerMetadata("brok2", "localhost", 100, "broker", "1", 0, "broker", "hostText", -1) + ) + ) + .put( + "overlord", + ImmutableSet.of( + OVERLORD_LEADER, + new DruidServerMetadata("over2", "localhost", 100, "overlord", "1", 0, "overlord", "hostText", -1) + ) + ) + .put( + "coordinator", + ImmutableSet.of( + COORDINATOR_LEADER, + new DruidServerMetadata("coor2", "localhost", 100, "coordinator", "1", 0, "coordinator", "hostText", -1) + ) + ) + .put( + "router", + ImmutableSet.of( + new DruidServerMetadata("rout1", "localhost", 100, "router", "1", 0, "router", "hostText", -1), + new DruidServerMetadata("rout2", "localhost", 100, "router", "1", 0, "router", "hostText", -1) + ) + ) + .put( + "realtime", + ImmutableSet.of( + new DruidServerMetadata("real1", "localhost", 100, "realtime", "1", 0, "realtime", "hostText", -1), + new DruidServerMetadata("real2", "localhost", 100, "realtime", "1", 0, "realtime", "hostText", -1) + ) + ) + .put( + "middleManager", + ImmutableSet.of( + new DruidServerMetadata( + "mm1", + "localhost", + 100, + "middleManager", + "1", + 0, + "middleManager", + "hostText", + -1 + ), + new DruidServerMetadata( + "mm2", + "localhost", + 100, + "middleManager", + "1", + 0, + "middleManager", + "hostText", + -1 + ) + ) + ) + .build(); + announcers = new HashMap<>(); + } + + @Before + public void setUp() throws Exception + { + setupServerAndCurator(); + curator.start(); + curator.blockUntilConnected(); + announcer = new Announcer(curator, MoreExecutors.sameThreadExecutor()); + announcer.start(); + + for (Set servers : cluster.values()) { + for (DruidServerMetadata server : servers) { + final ZooKeeperServerAnnouncer serverAnnouncer = new ZooKeeperServerAnnouncer( + server, + MAPPER, + ZK_PATHS_CONFIG, + announcer, + ((server.getType().equals("realtime") || server.getType().equals("historical")) ? + ImmutableSet.of("segmentServer") : ImmutableSet.of()) + ); + serverAnnouncer.start(); + announcers.put(server, serverAnnouncer); + } + } + + announcers.get(COORDINATOR_LEADER).announceLeadership(); + announcers.get(OVERLORD_LEADER).announceLeadership(); + discovery = new ZookeeperDruidServerDiscovery(curator, MAPPER, ZK_PATHS_CONFIG); + } + + @Test + public void testDiscovery() throws Exception + { + Assert.assertEquals(cluster.get("historical"), Sets.newHashSet(discovery.getServersForType("historical"))); + Assert.assertEquals(cluster.get("broker"), Sets.newHashSet(discovery.getServersForType("broker"))); + Assert.assertEquals(cluster.get("realtime"), Sets.newHashSet(discovery.getServersForType("realtime"))); + Assert.assertEquals(cluster.get("overlord"), Sets.newHashSet(discovery.getServersForType("overlord"))); + Assert.assertEquals(cluster.get("middleManager"), Sets.newHashSet(discovery.getServersForType("middleManager"))); + Assert.assertEquals(cluster.get("router"), Sets.newHashSet(discovery.getServersForType("router"))); + Assert.assertEquals(ImmutableList.of(), discovery.getServersForType("null")); + + Assert.assertEquals(COORDINATOR_LEADER, discovery.getLeaderForType("coordinator")); + Assert.assertEquals(OVERLORD_LEADER, discovery.getLeaderForType("overlord")); + Assert.assertEquals(null, discovery.getLeaderForType("null")); + + Assert.assertEquals( + cluster.get("historical"), + Sets.newHashSet(discovery.getServersForTypeWithService("historical", "historical")) + ); + Assert.assertEquals( + cluster.get("broker"), + Sets.newHashSet(discovery.getServersForTypeWithService("broker", "broker")) + ); + Assert.assertEquals( + cluster.get("realtime"), + Sets.newHashSet(discovery.getServersForTypeWithService("realtime", "realtime")) + ); + Assert.assertEquals( + cluster.get("overlord"), + Sets.newHashSet(discovery.getServersForTypeWithService("overlord", "overlord")) + ); + Assert.assertEquals( + cluster.get("middleManager"), + Sets.newHashSet(discovery.getServersForTypeWithService("middleManager", "middleManager")) + ); + Assert.assertEquals( + cluster.get("router"), + Sets.newHashSet(discovery.getServersForTypeWithService("router", "router")) + ); + Assert.assertEquals(ImmutableList.of(), discovery.getServersForType("null")); + + Assert.assertEquals( + Sets.union(cluster.get("historical"), cluster.get("realtime")), + Sets.newHashSet(discovery.getServersWithCapability("segmentServer")) + ); + } + + @After + public void tearDown() throws Exception + { + for (ZooKeeperServerAnnouncer announcer : announcers.values()) { + announcer.stop(); + } + announcer.stop(); + tearDownServerAndCurator(); + } + +} \ No newline at end of file diff --git a/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java b/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java index e9917f84c4ce..b17c10ce960c 100644 --- a/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java +++ b/server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java @@ -41,8 +41,11 @@ import io.druid.guice.GuiceInjectors; import io.druid.guice.JsonConfigProvider; import io.druid.guice.ManageLifecycle; +import io.druid.guice.NodeTypeConfig; import io.druid.initialization.Initialization; import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.coordination.NoopServerAnnouncer; +import io.druid.server.coordination.ServerAnnouncer; import net.spy.memcached.BroadcastOpFactory; import net.spy.memcached.CASResponse; import net.spy.memcached.CASValue; @@ -152,9 +155,10 @@ public void configure(Binder binder) { binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/test/memcached"); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); - binder.bind(MemcachedCacheConfig.class).toInstance(config); binder.bind(Cache.class).toProvider(MemcachedProviderWithConfig.class).in(ManageLifecycle.class); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("coordinator")); + binder.bind(ServerAnnouncer.class).to(NoopServerAnnouncer.class); } } ) diff --git a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java index df30f9abeffe..ea02a3f17912 100644 --- a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; @@ -36,6 +37,8 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.server.coordination.BatchDataSegmentAnnouncer; import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ServerAnnouncer; +import io.druid.server.coordination.ZooKeeperServerAnnouncer; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; @@ -54,6 +57,7 @@ import org.junit.rules.ExpectedException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; @@ -79,6 +83,10 @@ public class BatchServerInventoryViewTest private Set testSegments; private BatchServerInventoryView batchServerInventoryView; private BatchServerInventoryView filteredBatchServerInventoryView; + private DruidServerMetadata server; + private ZooKeeperServerAnnouncer serverAnnouncer; + private ZkPathsConfig zkPathsConfig; + private final AtomicInteger inventoryUpdateCounter = new AtomicInteger(); @Rule @@ -107,15 +115,36 @@ public void setUp() throws Exception ); announcer.start(); + zkPathsConfig = new ZkPathsConfig() + { + @Override + public String getBase() + { + return testBasePath; + } + }; + + server = new DruidServerMetadata( + "id", + "host", + Long.MAX_VALUE, + "historical", + "tier", + 0, + "service", + "hostText", -1 + ); + + serverAnnouncer = new ZooKeeperServerAnnouncer( + server, + jsonMapper, + zkPathsConfig, + announcer, + ImmutableSet.of("segmentServer") + ); + segmentAnnouncer = new BatchDataSegmentAnnouncer( - new DruidServerMetadata( - "id", - "host", - Long.MAX_VALUE, - "type", - "tier", - 0 - ), + server, new BatchDataSegmentAnnouncerConfig() { @Override @@ -124,17 +153,12 @@ public int getSegmentsPerNode() return 50; } }, - new ZkPathsConfig() - { - @Override - public String getBase() - { - return testBasePath; - } - }, + zkPathsConfig, announcer, jsonMapper ); + + serverAnnouncer.start(); segmentAnnouncer.start(); testSegments = Sets.newConcurrentHashSet(); @@ -307,7 +331,9 @@ public BatchDataSegmentAnnouncer call() Long.MAX_VALUE, "type", "tier", - 0 + 0, + "service", + "hostText", -1 ), new BatchDataSegmentAnnouncerConfig() { diff --git a/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java b/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java index 1fba30287f59..4b347fdd93b1 100644 --- a/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java +++ b/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import io.druid.client.ImmutableSegmentLoadInfo; -import io.druid.client.SegmentLoadInfo; import io.druid.jackson.DefaultObjectMapper; import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; @@ -52,7 +51,7 @@ public void testSerde() throws IOException null, new NoneShardSpec(), 0, 0 - ), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, "type", "tier", 1)) + ), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, "type", "tier", 1, "service", "hostText", -1)) ); ImmutableSegmentLoadInfo serde = mapper.readValue( diff --git a/server/src/test/java/io/druid/client/selector/TierSelectorStrategyTest.java b/server/src/test/java/io/druid/client/selector/TierSelectorStrategyTest.java index 3ecce840d564..f7240c7c6b7a 100644 --- a/server/src/test/java/io/druid/client/selector/TierSelectorStrategyTest.java +++ b/server/src/test/java/io/druid/client/selector/TierSelectorStrategyTest.java @@ -41,11 +41,11 @@ public void testHighestPriorityTierSelectorStrategy() { DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class); QueryableDruidServer lowPriority = new QueryableDruidServer( - new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0), + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0, "service", "hostText", -1), client ); QueryableDruidServer highPriority = new QueryableDruidServer( - new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 1), + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 1, "service", "hostText", -1), client ); @@ -61,11 +61,11 @@ public void testLowestPriorityTierSelectorStrategy() { DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class); QueryableDruidServer lowPriority = new QueryableDruidServer( - new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0), + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0, "service", "hostText", -1), client ); QueryableDruidServer highPriority = new QueryableDruidServer( - new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 1), + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 1, "service", "hostText", -1), client ); @@ -81,15 +81,15 @@ public void testCustomPriorityTierSelectorStrategy() { DirectDruidClient client = EasyMock.createMock(DirectDruidClient.class); QueryableDruidServer lowPriority = new QueryableDruidServer( - new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, -1), + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, -1, "service", "hostText", -1), client ); QueryableDruidServer mediumPriority = new QueryableDruidServer( - new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0), + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0, "service", "hostText", -1), client ); QueryableDruidServer highPriority = new QueryableDruidServer( - new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 1), + new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 1, "service", "hostText", -1), client ); diff --git a/server/src/test/java/io/druid/curator/CuratorTestBase.java b/server/src/test/java/io/druid/curator/CuratorTestBase.java index c0a4aa678885..282940148ef4 100644 --- a/server/src/test/java/io/druid/curator/CuratorTestBase.java +++ b/server/src/test/java/io/druid/curator/CuratorTestBase.java @@ -61,7 +61,7 @@ protected void setupServerAndCurator() throws Exception protected void setupZNodeForServer(DruidServer server, ZkPathsConfig zkPathsConfig, ObjectMapper jsonMapper) { - final String announcementsPath = zkPathsConfig.getAnnouncementsPath(); + final String announcementsPath = zkPathsConfig.getCapabilityPathFor("segmentServer"); final String inventoryPath = zkPathsConfig.getLiveSegmentsPath(); try { @@ -163,5 +163,3 @@ protected void tearDownServerAndCurator() } } - -//Build at Tue Dec 22 21:30:00 CST 2015 diff --git a/server/src/test/java/io/druid/curator/discovery/ServerDiscoverySelectorTest.java b/server/src/test/java/io/druid/curator/discovery/ServerDiscoverySelectorTest.java index 223e123eca53..26b36034d4a3 100644 --- a/server/src/test/java/io/druid/curator/discovery/ServerDiscoverySelectorTest.java +++ b/server/src/test/java/io/druid/curator/discovery/ServerDiscoverySelectorTest.java @@ -19,48 +19,69 @@ package io.druid.curator.discovery; +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.net.HostAndPort; +import com.metamx.common.ISE; +import io.druid.client.DruidServerDiscovery; import io.druid.client.selector.Server; -import org.apache.curator.x.discovery.ServiceInstance; -import org.apache.curator.x.discovery.ServiceProvider; +import io.druid.server.coordination.DruidServerMetadata; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.net.URI; +import java.util.List; public class ServerDiscoverySelectorTest { - private ServiceProvider serviceProvider; - private ServerDiscoverySelector serverDiscoverySelector; - private ServiceInstance instance; private static final int PORT = 8080; private static final String ADDRESS = "localhost"; + private ServerDiscoverySelector serverDiscoverySelector; + private DruidServerDiscovery discovery; + @Before - public void setUp() + public void setUp() throws Exception { - serviceProvider = EasyMock.createMock(ServiceProvider.class); - instance = EasyMock.createMock(ServiceInstance.class); - serverDiscoverySelector = new ServerDiscoverySelector(serviceProvider); + discovery = EasyMock.createMock(DruidServerDiscovery.class); + serverDiscoverySelector = new ServerDiscoverySelector( + discovery, + new Function>() + { + @Override + public List apply(DruidServerDiscovery discovery) + { + try { + return Lists.newArrayList(discovery.getLeaderForType("coordinator")); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); } @Test - public void testPick() throws Exception + public void testPickCoordinator() throws Exception { - EasyMock.expect(serviceProvider.getInstance()).andReturn(instance).anyTimes(); - EasyMock.expect(instance.getAddress()).andReturn(ADDRESS).anyTimes(); - EasyMock.expect(instance.getPort()).andReturn(PORT).anyTimes(); - EasyMock.replay(instance, serviceProvider); + EasyMock.expect(discovery.getLeaderForType("coordinator")) + .andReturn(new DruidServerMetadata("coor", ADDRESS + ":" + PORT, 0, "coordinator", "t1", 0, "service", + ADDRESS, + PORT)) + .once(); + EasyMock.replay(discovery); Server server = serverDiscoverySelector.pick(); + EasyMock.verify(discovery); Assert.assertEquals(PORT, server.getPort()); Assert.assertEquals(ADDRESS, server.getAddress()); Assert.assertTrue(server.getHost().contains(Integer.toString(PORT))); Assert.assertTrue(server.getHost().contains(ADDRESS)); Assert.assertEquals("http", server.getScheme()); - EasyMock.verify(instance, serviceProvider); final URI uri = new URI( server.getScheme(), null, @@ -75,22 +96,23 @@ public void testPick() throws Exception Assert.assertEquals("http", uri.getScheme()); } - @Test public void testPickIPv6() throws Exception { - final String ADDRESS = "2001:0db8:0000:0000:0000:ff00:0042:8329"; - EasyMock.expect(serviceProvider.getInstance()).andReturn(instance).anyTimes(); - EasyMock.expect(instance.getAddress()).andReturn(ADDRESS).anyTimes(); - EasyMock.expect(instance.getPort()).andReturn(PORT).anyTimes(); - EasyMock.replay(instance, serviceProvider); + final String address = "2001:0db8:0000:0000:0000:ff00:0042:8329"; + EasyMock.expect(discovery.getLeaderForType("coordinator")) + .andReturn(new DruidServerMetadata("coor", address + ":" + PORT, 0, "coordinator", "t1", 0, "service", + address, + PORT)) + .once(); + EasyMock.replay(discovery); Server server = serverDiscoverySelector.pick(); Assert.assertEquals(PORT, server.getPort()); - Assert.assertEquals(ADDRESS, server.getAddress()); + Assert.assertEquals(address, server.getAddress()); Assert.assertTrue(server.getHost().contains(Integer.toString(PORT))); - Assert.assertTrue(server.getHost().contains(ADDRESS)); + Assert.assertTrue(server.getHost().contains(address)); Assert.assertEquals("http", server.getScheme()); - EasyMock.verify(instance, serviceProvider); + EasyMock.verify(discovery); final URI uri = new URI( server.getScheme(), null, @@ -101,26 +123,27 @@ public void testPickIPv6() throws Exception null ); Assert.assertEquals(PORT, uri.getPort()); - Assert.assertEquals(String.format("[%s]", ADDRESS), uri.getHost()); + Assert.assertEquals(String.format("[%s]", address), uri.getHost()); Assert.assertEquals("http", uri.getScheme()); } - @Test public void testPickIPv6Bracket() throws Exception { - final String ADDRESS = "[2001:0db8:0000:0000:0000:ff00:0042:8329]"; - EasyMock.expect(serviceProvider.getInstance()).andReturn(instance).anyTimes(); - EasyMock.expect(instance.getAddress()).andReturn(ADDRESS).anyTimes(); - EasyMock.expect(instance.getPort()).andReturn(PORT).anyTimes(); - EasyMock.replay(instance, serviceProvider); + final String address = "[2001:0db8:0000:0000:0000:ff00:0042:8329]"; + EasyMock.expect(discovery.getLeaderForType("coordinator")) + .andReturn(new DruidServerMetadata("coor", address + ":" + PORT, 0, "coordinator", "t1", 0, "service", + address, + PORT)) + .once(); + EasyMock.replay(discovery); Server server = serverDiscoverySelector.pick(); Assert.assertEquals(PORT, server.getPort()); - Assert.assertEquals(ADDRESS, server.getAddress()); + Assert.assertEquals(address, server.getAddress()); Assert.assertTrue(server.getHost().contains(Integer.toString(PORT))); - Assert.assertTrue(server.getHost().contains(ADDRESS)); + Assert.assertTrue(server.getHost().contains(address)); Assert.assertEquals("http", server.getScheme()); - EasyMock.verify(instance, serviceProvider); + EasyMock.verify(discovery); final URI uri = new URI( server.getScheme(), null, @@ -131,45 +154,36 @@ public void testPickIPv6Bracket() throws Exception null ); Assert.assertEquals(PORT, uri.getPort()); - Assert.assertEquals(ADDRESS, uri.getHost()); + Assert.assertEquals(address, uri.getHost()); Assert.assertEquals("http", uri.getScheme()); } @Test public void testPickWithNullInstance() throws Exception { - EasyMock.expect(serviceProvider.getInstance()).andReturn(null).anyTimes(); - EasyMock.replay(serviceProvider); + EasyMock.expect(discovery.getLeaderForType("coordinator")) + .andReturn(null) + .once(); + EasyMock.replay(discovery); Server server = serverDiscoverySelector.pick(); Assert.assertNull(server); - EasyMock.verify(serviceProvider); + EasyMock.verify(discovery); } @Test public void testPickWithException() throws Exception { - EasyMock.expect(serviceProvider.getInstance()).andThrow(new Exception()).anyTimes(); - EasyMock.replay(serviceProvider); + EasyMock.expect(discovery.getLeaderForType("coordinator")).andThrow(new ISE("ise")).once(); + EasyMock.replay(discovery); Server server = serverDiscoverySelector.pick(); Assert.assertNull(server); - EasyMock.verify(serviceProvider); + EasyMock.verify(discovery); } @Test - public void testStart() throws Exception + public void testName() throws Exception { - serviceProvider.start(); - EasyMock.replay(serviceProvider); - serverDiscoverySelector.start(); - EasyMock.verify(serviceProvider); + System.out.println(HostAndPort.fromString("[2001:0db8:0000:0000:0000:ff00:0042:8329]:80")); } - @Test - public void testStop() throws IOException - { - serviceProvider.close(); - EasyMock.replay(serviceProvider); - serverDiscoverySelector.stop(); - EasyMock.verify(serviceProvider); - } } diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java index c57665344644..0a08e897b8e1 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.ImmutableSegmentLoadInfo; -import io.druid.client.SegmentLoadInfo; import io.druid.client.coordinator.CoordinatorClient; import io.druid.query.SegmentDescriptor; import io.druid.server.coordination.DruidServerMetadata; @@ -36,8 +35,6 @@ import org.junit.Test; import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class CoordinatorBasedSegmentHandoffNotifierTest @@ -337,7 +334,9 @@ private DruidServerMetadata createServerMetadata(String name, String type) 10000, type, "tier", - 1 + 1, + "service", + "hostText", -1 ); } diff --git a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java index e2bac72ab8bb..f4cdb5312389 100644 --- a/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java +++ b/server/src/test/java/io/druid/server/AsyncQueryForwardingServletTest.java @@ -35,11 +35,14 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; +import io.druid.guice.NodeTypeConfig; import io.druid.guice.annotations.Self; import io.druid.guice.annotations.Smile; import io.druid.guice.http.DruidHttpClientConfig; import io.druid.initialization.Initialization; import io.druid.query.Query; +import io.druid.server.coordination.NoopServerAnnouncer; +import io.druid.server.coordination.ServerAnnouncer; import io.druid.server.initialization.BaseJettyTest; import io.druid.server.initialization.jetty.JettyServerInitUtils; import io.druid.server.initialization.jetty.JettyServerInitializer; @@ -102,6 +105,8 @@ public void configure(Binder binder) JsonConfigProvider.bindInstance( binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null) ); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("nodeType")); + binder.bind(ServerAnnouncer.class).to(NoopServerAnnouncer.class); binder.bind(JettyServerInitializer.class).to(ProxyJettyServerInit.class).in(LazySingleton.class); Jerseys.addResource(binder, SlowResource.class); Jerseys.addResource(binder, ExceptionResource.class); diff --git a/server/src/test/java/io/druid/server/ClientInfoResourceTest.java b/server/src/test/java/io/druid/server/ClientInfoResourceTest.java index 2fec13f73185..b3dfcccb5bd2 100644 --- a/server/src/test/java/io/druid/server/ClientInfoResourceTest.java +++ b/server/src/test/java/io/druid/server/ClientInfoResourceTest.java @@ -78,7 +78,7 @@ public class ClientInfoResourceTest public void setup() { VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(Ordering.natural()); - DruidServer server = new DruidServer("name", "host", 1234, "type", "tier", 0); + DruidServer server = new DruidServer("name", "host", 1234, "type", "tier", 0, "service", "hostText", -1); addSegment(timeline, server, "1960-02-13/1961-02-14", ImmutableList.of("d5"), ImmutableList.of("m5"), "v0"); diff --git a/server/src/test/java/io/druid/server/coordination/NoopServerAnnouncer.java b/server/src/test/java/io/druid/server/coordination/NoopServerAnnouncer.java new file mode 100644 index 000000000000..80c3b0eefed1 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordination/NoopServerAnnouncer.java @@ -0,0 +1,49 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +/** + */ +public class NoopServerAnnouncer implements ServerAnnouncer +{ + @Override + public void announceSelf() + { + + } + + @Override + public void unannounceSelf() + { + + } + + @Override + public void announceLeadership() + { + + } + + @Override + public void unannounceLeadership() + { + + } +} diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index 04dcdebf3c1b..182d04170a3d 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -82,7 +82,9 @@ public class ZkCoordinatorTest extends CuratorTestBase 0, "dummyType", "normal", - 0 + 0, + "service", + "hostText", -1 ); private ZkCoordinator zkCoordinator; @@ -515,7 +517,9 @@ public String getBase() } ); binder.bind(DruidServerMetadata.class) - .toInstance(new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0)); + .toInstance(new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0, + "service", "hostText", -1 + )); binder.bind(DataSegmentAnnouncer.class).toInstance(announcer); binder.bind(CuratorFramework.class).toInstance(curator); binder.bind(ServerManager.class).toInstance(serverManager); diff --git a/server/src/test/java/io/druid/server/coordination/ZooKeeperServerAnnouncerTest.java b/server/src/test/java/io/druid/server/coordination/ZooKeeperServerAnnouncerTest.java new file mode 100644 index 000000000000..5e1ec2ab5863 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordination/ZooKeeperServerAnnouncerTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.concurrent.Execs; +import io.druid.curator.CuratorTestBase; +import io.druid.curator.announcement.Announcer; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.initialization.ZkPathsConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashSet; + +/** + */ +public class ZooKeeperServerAnnouncerTest extends CuratorTestBase +{ + + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + private static final DruidServerMetadata ME = new DruidServerMetadata( + "node", + "localhost:8080", + 100, + "sometype", + "default_tier", + 0, + "service", + "localhost", + 8080 + ); + + private static final ZkPathsConfig ZK_PATHS_CONFIG = new ZkPathsConfig() + { + @Override + public String getBase() + { + return "test"; + } + }; + + private ZooKeeperServerAnnouncer zkServerAnnouncer; + private Announcer announcer; + + @Before + public void setup() throws Exception + { + setupServerAndCurator(); + curator.start(); + curator.blockUntilConnected(); + + announcer = new Announcer(curator, Execs.singleThreaded("test-announcer-sanity-%s")); + announcer.start(); + + zkServerAnnouncer = new ZooKeeperServerAnnouncer( + ME, + MAPPER, + ZK_PATHS_CONFIG, + announcer, + new HashSet() + ); + } + + @Test + public void testAnnounceAndUnannounceLeadership() throws Exception + { + zkServerAnnouncer.start(); + final String leadershipPath = ZK_PATHS_CONFIG.getLeadershipPathForServer(ME); + Assert.assertNull(curator.checkExists().forPath(leadershipPath)); + zkServerAnnouncer.announceLeadership(); + Assert.assertEquals( + ME, + MAPPER.readValue( + curator.getData().decompressed().forPath(leadershipPath), + DruidServerMetadata.class + ) + ); + zkServerAnnouncer.unannounceLeadership(); + Assert.assertNull(curator.checkExists().forPath(leadershipPath)); + zkServerAnnouncer.stop(); + } + + @Test + public void testAnnounceAndUnannounceSelf() throws Exception + { + final String selfAnnouncementPath = ZK_PATHS_CONFIG.getAnnouncementPathForServer(ME); + Assert.assertNull(curator.checkExists().forPath(selfAnnouncementPath)); + zkServerAnnouncer.announceSelf(); + Assert.assertEquals( + ME, + MAPPER.readValue(curator.getData().decompressed().forPath(selfAnnouncementPath), DruidServerMetadata.class) + ); + zkServerAnnouncer.unannounceSelf(); + Assert.assertNull(curator.checkExists().forPath(selfAnnouncementPath)); + } + + @Test + public void testServerAnnouncerStartStop() throws Exception + { + final String selfAnnouncementPath = ZK_PATHS_CONFIG.getAnnouncementPathForServer(ME); + Assert.assertNull(curator.checkExists().forPath(selfAnnouncementPath)); + zkServerAnnouncer.start(); + Assert.assertEquals( + ME, + MAPPER.readValue(curator.getData().decompressed().forPath(selfAnnouncementPath), DruidServerMetadata.class) + ); + zkServerAnnouncer.stop(); + Assert.assertNull(curator.checkExists().forPath(selfAnnouncementPath)); + } + + @After + public void tearDown() throws Exception + { + announcer.stop(); + tearDownServerAndCurator(); + } +} \ No newline at end of file diff --git a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java index 08024ed7d1ad..70cde9e4560b 100644 --- a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java +++ b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java @@ -98,7 +98,9 @@ public void setUp() throws Exception Long.MAX_VALUE, "type", "tier", - 0 + 0, + "service", + "hostText", -1 ), new BatchDataSegmentAnnouncerConfig() { diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index cbc59e9fda0e..7bd687f44018 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -135,7 +135,10 @@ public void testRunThreeTiersOneReplicant() throws Exception 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -151,7 +154,10 @@ public void testRunThreeTiersOneReplicant() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -167,7 +173,10 @@ public void testRunThreeTiersOneReplicant() throws Exception 1000, "historical", "cold", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -234,7 +243,10 @@ public void testRunTwoTiersTwoReplicants() throws Exception 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ), @@ -245,7 +257,10 @@ public void testRunTwoTiersTwoReplicants() throws Exception 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -261,7 +276,10 @@ public void testRunTwoTiersTwoReplicants() throws Exception 1000, "historical", "cold", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -321,7 +339,10 @@ public void testRunTwoTiersWithExistingSegments() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ); for (DataSegment availableSegment : availableSegments) { normServer.addDataSegment(availableSegment.getIdentifier(), availableSegment); @@ -339,7 +360,10 @@ public void testRunTwoTiersWithExistingSegments() throws Exception 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -413,7 +437,10 @@ public void testRunTwoTiersTierDoesNotExist() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -465,7 +492,10 @@ public void testRunRuleDoesNotExist() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -520,7 +550,10 @@ public void testDropRemove() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ); for (DataSegment segment : availableSegments) { server.addDataSegment(segment.getIdentifier(), segment); @@ -583,7 +616,10 @@ public void testDropTooManyInSameTier() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ); server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0)); @@ -593,7 +629,10 @@ public void testDropTooManyInSameTier() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ); for (DataSegment segment : availableSegments) { server2.addDataSegment(segment.getIdentifier(), segment); @@ -663,7 +702,10 @@ public void testDropTooManyInDifferentTiers() throws Exception 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ); server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0)); DruidServer server2 = new DruidServer( @@ -672,7 +714,10 @@ public void testDropTooManyInDifferentTiers() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ); for (DataSegment segment : availableSegments) { server2.addDataSegment(segment.getIdentifier(), segment); @@ -745,7 +790,10 @@ public void testDontDropInDifferentTiers() throws Exception 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ); DruidServer server2 = new DruidServer( "serverNorm2", @@ -753,7 +801,10 @@ public void testDontDropInDifferentTiers() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ); for (DataSegment segment : availableSegments) { server2.addDataSegment(segment.getIdentifier(), segment); @@ -818,7 +869,10 @@ public void testDropServerActuallyServesSegment() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ); server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0)); DruidServer server2 = new DruidServer( @@ -827,7 +881,10 @@ public void testDropServerActuallyServesSegment() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ); server2.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1)); DruidServer server3 = new DruidServer( @@ -836,7 +893,10 @@ public void testDropServerActuallyServesSegment() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ); server3.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1)); server3.addDataSegment(availableSegments.get(2).getIdentifier(), availableSegments.get(2)); @@ -929,7 +989,10 @@ public void testReplicantThrottle() throws Exception 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ), @@ -940,7 +1003,10 @@ public void testReplicantThrottle() throws Exception 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -1043,7 +1109,10 @@ public void testReplicantThrottleAcrossTiers() throws Exception 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -1059,7 +1128,10 @@ public void testReplicantThrottleAcrossTiers() throws Exception 1000, "historical", DruidServer.DEFAULT_TIER, - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -1125,7 +1197,10 @@ public void testDropReplicantThrottle() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ); for (DataSegment availableSegment : longerAvailableSegments) { server1.addDataSegment(availableSegment.getIdentifier(), availableSegment); @@ -1136,7 +1211,10 @@ public void testDropReplicantThrottle() throws Exception 1000, "historical", "normal", - 0 + 0, + "service", + "hostText", + -1 ); for (DataSegment availableSegment : longerAvailableSegments) { server2.addDataSegment(availableSegment.getIdentifier(), availableSegment); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index f884c0131a3d..202d830d777a 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -43,6 +43,8 @@ import io.druid.metadata.MetadataSegmentManager; import io.druid.server.DruidNode; import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ServerAnnouncer; +import io.druid.server.coordination.ZooKeeperServerAnnouncer; import io.druid.server.coordinator.rules.ForeverLoadRule; import io.druid.server.coordinator.rules.Rule; import io.druid.server.initialization.ZkPathsConfig; @@ -186,7 +188,8 @@ public void unannounce(DruidNode node) } }, druidNode, - loadManagementPeons + loadManagementPeons, + EasyMock.createMock(ServerAnnouncer.class) ); } @@ -210,7 +213,7 @@ public void testMoveSegment() throws Exception EasyMock.replay(metadataRuleManager); EasyMock.expect(druidServer.toImmutableDruidServer()).andReturn( new ImmutableDruidServer( - new DruidServerMetadata("from", null, 5L, null, null, 0), + new DruidServerMetadata("from", null, 5L, null, null, 0, "service", "hostText", -1), 1L, null, ImmutableMap.of("dummySegment", segment) @@ -221,7 +224,7 @@ public void testMoveSegment() throws Exception druidServer2 = EasyMock.createMock(DruidServer.class); EasyMock.expect(druidServer2.toImmutableDruidServer()).andReturn( new ImmutableDruidServer( - new DruidServerMetadata("to", null, 5L, null, null, 0), + new DruidServerMetadata("to", null, 5L, null, null, 0, "service", "hostText", -1), 1L, null, ImmutableMap.of("dummySegment2", segment) @@ -295,7 +298,7 @@ public void testCoordinatorRun() throws Exception{ EasyMock.replay(immutableDruidDataSource); // Setup ServerInventoryView - druidServer = new DruidServer("server1", "localhost", 5L, "historical", tier, 0); + druidServer = new DruidServer("server1", "localhost", 5L, "historical", tier, 0, "service", "hostText", -1); loadManagementPeons.put("server1", loadQueuePeon); EasyMock.expect(serverInventoryView.getInventory()).andReturn( ImmutableList.of(druidServer) diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index d2d254ff4141..0a30801ba113 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -164,7 +164,10 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -180,7 +183,10 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) 1000, "historical", DruidServer.DEFAULT_TIER, - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -256,7 +262,10 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ); server1.addDataSegment(segment.getIdentifier(), segment); DruidServer server2 = new DruidServer( @@ -265,7 +274,10 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) 1000, "historical", DruidServer.DEFAULT_TIER, - 0 + 0, + "service", + "hostText", + -1 ); server2.addDataSegment(segment.getIdentifier(), segment); DruidCluster druidCluster = new DruidCluster( @@ -365,7 +377,10 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ).toImmutableDruidServer(), mockPeon ) @@ -440,7 +455,10 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ); DruidServer server2 = new DruidServer( "serverHo2t", @@ -448,7 +466,10 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) 1000, "historical", "hot", - 0 + 0, + "service", + "hostText", + -1 ); server1.addDataSegment(segment.getIdentifier(), segment); server2.addDataSegment(segment.getIdentifier(), segment); diff --git a/server/src/test/java/io/druid/server/http/ClusterInfoResourceTest.java b/server/src/test/java/io/druid/server/http/ClusterInfoResourceTest.java new file mode 100644 index 000000000000..c0e1c14685b6 --- /dev/null +++ b/server/src/test/java/io/druid/server/http/ClusterInfoResourceTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.client.TestDruidServerDiscovery; +import io.druid.curator.CuratorTestBase; +import io.druid.curator.announcement.Announcer; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ZooKeeperServerAnnouncer; +import io.druid.server.initialization.ZkPathsConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +/** + */ +public class ClusterInfoResourceTest extends CuratorTestBase +{ + + private static final ZkPathsConfig ZK_PATHS_CONFIG = new ZkPathsConfig() + { + @Override + public String getBase() + { + return "test/druid"; + } + }; + + private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); + + private static Map> expectedNodes; + private static Map announcers; + + private ClusterInfoResource clusterInfoResource; + + @BeforeClass + public static void setupStatic() throws Exception + { + expectedNodes = ImmutableMap.>builder().put( + "historical", + ImmutableList.of( + new DruidServerMetadata("hist1", "localhost", 100, "historical", "1", 0, "service", "hostText", -1), + new DruidServerMetadata("hist2", "localhost", 100, "historical", "1", 0, "service", "hostText", -1) + ) + ).put( + "broker", + ImmutableList.of( + new DruidServerMetadata("brok1", "localhost", 100, "broker", "1", 0, "service", "hostText", -1), + new DruidServerMetadata("brok2", "localhost", 100, "broker", "1", 0, "service", "hostText", -1) + ) + ).put( + "overlord", + ImmutableList.of( + new DruidServerMetadata("over1", "localhost", 100, "overlord", "1", 0, "service", "hostText", -1), + new DruidServerMetadata("over2", "localhost", 100, "overlord", "1", 0, "service", "hostText", -1) + ) + ).put( + "coordinator", + ImmutableList.of( + new DruidServerMetadata("coor1", "localhost", 100, "coordinator", "1", 0, "service", "hostText", -1), + new DruidServerMetadata("coor2", "localhost", 100, "coordinator", "1", 0, "service", "hostText", -1) + ) + ).put( + "router", + ImmutableList.of( + new DruidServerMetadata("rout1", "localhost", 100, "router", "1", 0, "service", "hostText", -1), + new DruidServerMetadata("rout2", "localhost", 100, "router", "1", 0, "service", "hostText", -1) + ) + ).put( + "realtime", + ImmutableList.of( + new DruidServerMetadata("real1", "localhost", 100, "realtime", "1", 0, "service", "hostText", -1), + new DruidServerMetadata("real2", "localhost", 100, "realtime", "1", 0, "service", "hostText", -1) + ) + ).build(); + announcers = new HashMap<>(); + } + + @Before + public void setUp() throws Exception + { + setupServerAndCurator(); + curator.start(); + curator.blockUntilConnected(); + clusterInfoResource = new ClusterInfoResource(new TestDruidServerDiscovery(expectedNodes)); + final Announcer announcer = new Announcer(curator, MoreExecutors.sameThreadExecutor()); + announcer.start(); + + for (List servers : expectedNodes.values()) { + for (DruidServerMetadata server : servers) { + + ZooKeeperServerAnnouncer serverAnnouncer = new ZooKeeperServerAnnouncer( + server, + OBJECT_MAPPER, + ZK_PATHS_CONFIG, + announcer, + new HashSet() + ); + serverAnnouncer.start(); + announcers.put(server, serverAnnouncer); + } + + } + } + + @After + public void tearDown() throws Exception + { + for (ZooKeeperServerAnnouncer announcer : announcers.values()) { + announcer.stop(); + } + tearDownServerAndCurator(); + } + + @Test + public void testGetClusterInfo() throws Exception + { + Map> actualNodes = + (Map>) clusterInfoResource.getClusterInfo().getEntity(); + + Assert.assertEquals(expectedNodes.keySet(), actualNodes.keySet()); + + for (String nodeType : actualNodes.keySet()) { + verifyNodes(expectedNodes.get(nodeType), actualNodes.get(nodeType)); + } + } + + private void verifyNodes(List expectedNodes, List actualNodes) + { + Assert.assertEquals(expectedNodes.size(), actualNodes.size()); + Assert.assertEquals(new HashSet<>(expectedNodes), new HashSet<>(actualNodes)); + } + + @Test + public void testGetHistoricalInfo() throws Exception + { + verifyNodes( + expectedNodes.get("historical"), + ((Map>) clusterInfoResource.getHistoricalInfo() + .getEntity()).get("historical") + ); + } + + @Test + public void testGetOverlordInfo() throws Exception + { + verifyNodes( + expectedNodes.get("overlord"), + ((Map>) clusterInfoResource.getOverlordInfo() + .getEntity()).get("overlord") + ); + } + + @Test + public void testGetBrokerInfo() throws Exception + { + verifyNodes( + expectedNodes.get("broker"), + ((Map>) clusterInfoResource.getBrokerInfo() + .getEntity()).get("broker") + ); + } + + @Test + public void testGetCoordinatorInfo() throws Exception + { + verifyNodes( + expectedNodes.get("coordinator"), + ((Map>) clusterInfoResource.getCoordinatorInfo() + .getEntity()).get("coordinator") + ); + } + + @Test + public void testGetRouterInfo() throws Exception + { + verifyNodes( + expectedNodes.get("router"), + ((Map>) clusterInfoResource.getRouterInfo() + .getEntity()).get("router") + ); + } + + @Test + public void testGetRealtimeInfo() throws Exception + { + verifyNodes( + expectedNodes.get("realtime"), + ((Map>) clusterInfoResource.getRealtimeInfo() + .getEntity()).get("realtime") + ); + } +} diff --git a/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java b/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java index 55c124a806b2..99cc2287c93b 100644 --- a/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java +++ b/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java @@ -25,9 +25,7 @@ import io.druid.client.CoordinatorServerView; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; -import io.druid.client.InventoryView; import io.druid.client.indexing.IndexingServiceClient; -import io.druid.metadata.MetadataSegmentManager; import io.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.Interval; @@ -272,7 +270,7 @@ public void testSimpleGetTheDataSourceManyTiers() throws Exception @Test public void testGetSegmentDataSourceIntervals() { - server = new DruidServer("who", "host", 1234, "historical", "tier1", 0); + server = new DruidServer("who", "host", 1234, "historical", "tier1", 0, "service", "hostText", -1); server.addDataSegment(dataSegmentList.get(0).getIdentifier(), dataSegmentList.get(0)); server.addDataSegment(dataSegmentList.get(1).getIdentifier(), dataSegmentList.get(1)); server.addDataSegment(dataSegmentList.get(2).getIdentifier(), dataSegmentList.get(2)); @@ -322,7 +320,7 @@ public void testGetSegmentDataSourceIntervals() @Test public void testGetSegmentDataSourceSpecificInterval() { - server = new DruidServer("who", "host", 1234, "historical", "tier1", 0); + server = new DruidServer("who", "host", 1234, "historical", "tier1", 0, "service", "hostText", -1); server.addDataSegment(dataSegmentList.get(0).getIdentifier(), dataSegmentList.get(0)); server.addDataSegment(dataSegmentList.get(1).getIdentifier(), dataSegmentList.get(1)); server.addDataSegment(dataSegmentList.get(2).getIdentifier(), dataSegmentList.get(2)); diff --git a/server/src/test/java/io/druid/server/http/IntervalsResourceTest.java b/server/src/test/java/io/druid/server/http/IntervalsResourceTest.java index b77842bff8dd..1d197b873b01 100644 --- a/server/src/test/java/io/druid/server/http/IntervalsResourceTest.java +++ b/server/src/test/java/io/druid/server/http/IntervalsResourceTest.java @@ -86,7 +86,7 @@ public void setUp() 5 ) ); - server = new DruidServer("who", "host", 1234, "historical", "tier1", 0); + server = new DruidServer("who", "host", 1234, "historical", "tier1", 0, "service", "hostText", -1); server.addDataSegment(dataSegmentList.get(0).getIdentifier(), dataSegmentList.get(0)); server.addDataSegment(dataSegmentList.get(1).getIdentifier(), dataSegmentList.get(1)); server.addDataSegment(dataSegmentList.get(2).getIdentifier(), dataSegmentList.get(2)); diff --git a/server/src/test/java/io/druid/server/initialization/BaseJettyTest.java b/server/src/test/java/io/druid/server/initialization/BaseJettyTest.java index 57c84b2d8ffa..f8d72abd21cc 100644 --- a/server/src/test/java/io/druid/server/initialization/BaseJettyTest.java +++ b/server/src/test/java/io/druid/server/initialization/BaseJettyTest.java @@ -36,9 +36,12 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; +import io.druid.guice.NodeTypeConfig; import io.druid.guice.annotations.Self; import io.druid.initialization.Initialization; import io.druid.server.DruidNode; +import io.druid.server.coordination.NoopServerAnnouncer; +import io.druid.server.coordination.ServerAnnouncer; import io.druid.server.initialization.jetty.JettyServerInitUtils; import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.initialization.jetty.ServletFilterHolder; @@ -115,6 +118,8 @@ public void configure(Binder binder) JsonConfigProvider.bindInstance( binder, Key.get(DruidNode.class, Self.class), new DruidNode("test", "localhost", null) ); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("coordinator")); + binder.bind(ServerAnnouncer.class).to(NoopServerAnnouncer.class); binder.bind(JettyServerInitializer.class).to(JettyServerInit.class).in(LazySingleton.class); Multibinder multibinder = Multibinder.newSetBinder(binder, ServletFilterHolder.class); diff --git a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java index 52d3808e9b1a..b79b8a0421dc 100644 --- a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java @@ -20,6 +20,7 @@ package io.druid.server.router; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.metamx.http.client.HttpClient; @@ -84,13 +85,8 @@ public String getDefaultBrokerServiceName() factory, Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1)) ); - EasyMock.expect(factory.createSelector(EasyMock.anyObject())).andReturn(selector).atLeastOnce(); + EasyMock.expect(factory.createSelector(EasyMock.anyObject())).andReturn(selector).atLeastOnce(); EasyMock.replay(factory); - - selector.start(); - EasyMock.expectLastCall().atLeastOnce(); - selector.stop(); - EasyMock.expectLastCall().atLeastOnce(); EasyMock.replay(selector); brokerSelector.start(); diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 8af4bd40df20..66414866284d 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -38,7 +38,7 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; -import io.druid.guice.ManageLifecycle; +import io.druid.guice.NodeTypeConfig; import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChestWarehouse; @@ -84,6 +84,7 @@ public void configure(Binder binder) TieredBrokerConfig.DEFAULT_BROKER_SERVICE_NAME ); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8082); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("broker")); binder.bind(QueryToolChestWarehouse.class).to(MapQueryToolChestWarehouse.class); diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 85dc77f47580..33378a966369 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -37,6 +37,7 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; +import io.druid.guice.NodeTypeConfig; import io.druid.metadata.MetadataRuleManager; import io.druid.metadata.MetadataRuleManagerConfig; import io.druid.metadata.MetadataRuleManagerProvider; @@ -49,6 +50,7 @@ import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.LoadQueueTaskMaster; +import io.druid.server.http.ClusterInfoResource; import io.druid.server.http.CoordinatorDynamicConfigsResource; import io.druid.server.http.CoordinatorRedirectInfo; import io.druid.server.http.CoordinatorResource; @@ -92,8 +94,11 @@ protected List getModules() @Override public void configure(Binder binder) { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to(TieredBrokerConfig.DEFAULT_COORDINATOR_SERVICE_NAME); + binder.bindConstant() + .annotatedWith(Names.named("serviceName")) + .to(TieredBrokerConfig.DEFAULT_COORDINATOR_SERVICE_NAME); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8081); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("coordinator")); ConfigProvider.bind(binder, DruidCoordinatorConfig.class); @@ -138,6 +143,7 @@ public void configure(Binder binder) Jerseys.addResource(binder, DatasourcesResource.class); Jerseys.addResource(binder, MetadataResource.class); Jerseys.addResource(binder, IntervalsResource.class); + Jerseys.addResource(binder, ClusterInfoResource.class); LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, DatasourcesResource.class); diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index 42ea5b9d8cd6..09bd3223f409 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -22,12 +22,14 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Module; +import com.google.inject.multibindings.Multibinder; import com.google.inject.name.Names; import com.metamx.common.logger.Logger; import io.airlift.airline.Command; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheMonitor; import io.druid.guice.CacheModule; +import io.druid.guice.Capability; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; @@ -80,6 +82,7 @@ public void configure(Binder binder) binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class); binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("historical")); + Multibinder.newSetBinder(binder, String.class, Capability.class).addBinding().toInstance("segmentServer"); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); Jerseys.addResource(binder, QueryResource.class); Jerseys.addResource(binder, HistoricalResource.class); diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index e8435659a321..124aa56d3dab 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -35,6 +35,7 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; +import io.druid.guice.NodeTypeConfig; import io.druid.guice.annotations.Self; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.overlord.ForkingTaskRunner; @@ -77,6 +78,7 @@ public void configure(Binder binder) { binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/middlemanager"); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8091); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("middleManager")); IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder); diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 551064aaafd2..1668c9d3cd24 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -33,6 +33,7 @@ import io.airlift.airline.Command; import io.druid.audit.AuditManager; import io.druid.client.indexing.IndexingServiceSelectorConfig; +import io.druid.curator.discovery.ServiceAnnouncer; import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.IndexingServiceTaskLogsModule; @@ -43,6 +44,7 @@ import io.druid.guice.LifecycleModule; import io.druid.guice.ListProvider; import io.druid.guice.ManageLifecycle; +import io.druid.guice.NodeTypeConfig; import io.druid.guice.PolyBind; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionClientFactory; @@ -116,6 +118,7 @@ public void configure(Binder binder) .annotatedWith(Names.named("serviceName")) .to(IndexingServiceSelectorConfig.DEFAULT_SERVICE_NAME); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8090); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("overlord")); JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index b65fa9a2328b..f40f0e11b202 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -34,6 +34,7 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; +import io.druid.guice.NodeTypeConfig; import io.druid.guice.annotations.Self; import io.druid.guice.http.JettyHttpClientModule; import io.druid.query.extraction.LookupReferencesManager; @@ -76,6 +77,7 @@ public void configure(Binder binder) { binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/router"); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8888); + binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("router")); JsonConfigProvider.bind(binder, "druid.router", TieredBrokerConfig.class); @@ -94,17 +96,6 @@ public void configure(Binder binder) DiscoveryModule.register(binder, Self.class); LifecycleModule.register(binder, LookupReferencesManager.class); } - - @Provides - @ManageLifecycle - public ServerDiscoverySelector getCoordinatorServerDiscoverySelector( - TieredBrokerConfig config, - ServerDiscoveryFactory factory - - ) - { - return factory.createSelector(config.getCoordinatorServiceName()); - } } ); }