diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 0a7a57f3dc74..68450cc5066c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -42,6 +42,9 @@ import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.discovery.LookupNodeService; import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.TaskStatus; @@ -50,6 +53,7 @@ import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.task.AbstractTask; +import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.TaskResource; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; @@ -289,12 +293,25 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception ) ); + LookupNodeService lookupNodeService = getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER) == null ? + toolbox.getLookupNodeService() : + new LookupNodeService((String) getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER)); + DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode( + toolbox.getDruidNode(), + DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + ImmutableMap.of( + toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), + lookupNodeService.getName(), lookupNodeService + ) + ); + try ( final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox); final AppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics); final KafkaConsumer consumer = newConsumer() ) { toolbox.getDataSegmentServerAnnouncer().announce(); + toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); appenderator = appenderator0; @@ -597,6 +614,7 @@ public String apply(DataSegment input) } } + toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); toolbox.getDataSegmentServerAnnouncer().unannounce(); return success(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index a6bc82afe8ae..cbcaca7ff81c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -53,6 +53,9 @@ import io.druid.data.input.impl.JSONPathSpec; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; +import io.druid.discovery.DataNodeService; +import io.druid.discovery.DruidNodeAnnouncer; +import io.druid.discovery.LookupNodeService; import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; @@ -117,7 +120,9 @@ import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.server.DruidNode; import io.druid.server.coordination.DataSegmentServerAnnouncer; +import io.druid.server.coordination.ServerType; import io.druid.timeline.DataSegment; import org.apache.curator.test.TestingCluster; import org.apache.kafka.clients.producer.KafkaProducer; @@ -1637,7 +1642,11 @@ public List getLocations() testUtils.getTestIndexIO(), MapCache.create(1024), new CacheConfig(), - testUtils.getTestIndexMergerV9() + testUtils.getTestIndexMergerV9(), + EasyMock.createNiceMock(DruidNodeAnnouncer.class), + EasyMock.createNiceMock(DruidNode.class), + new LookupNodeService("tier"), + new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0) ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 2d0ce1ac9b66..16c6729145fa 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -31,6 +31,9 @@ import com.metamx.metrics.MonitorScheduler; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.discovery.DataNodeService; +import io.druid.discovery.DruidNodeAnnouncer; +import io.druid.discovery.LookupNodeService; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.config.TaskConfig; @@ -44,6 +47,7 @@ import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.server.DruidNode; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.timeline.DataSegment; @@ -87,6 +91,11 @@ public class TaskToolbox private final CacheConfig cacheConfig; private final IndexMergerV9 indexMergerV9; + private final DruidNodeAnnouncer druidNodeAnnouncer; + private final DruidNode druidNode; + private final LookupNodeService lookupNodeService; + private final DataNodeService dataNodeService; + public TaskToolbox( TaskConfig config, TaskActionClient taskActionClient, @@ -107,7 +116,11 @@ public TaskToolbox( IndexIO indexIO, Cache cache, CacheConfig cacheConfig, - IndexMergerV9 indexMergerV9 + IndexMergerV9 indexMergerV9, + DruidNodeAnnouncer druidNodeAnnouncer, + DruidNode druidNode, + LookupNodeService lookupNodeService, + DataNodeService dataNodeService ) { this.config = config; @@ -130,6 +143,10 @@ public TaskToolbox( this.cache = cache; this.cacheConfig = cacheConfig; this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9"); + this.druidNodeAnnouncer = druidNodeAnnouncer; + this.druidNode = druidNode; + this.lookupNodeService = lookupNodeService; + this.dataNodeService = dataNodeService; } public TaskConfig getConfig() @@ -271,4 +288,24 @@ public File getPersistDir() { return new File(taskWorkDir, "persist"); } + + public DruidNodeAnnouncer getDruidNodeAnnouncer() + { + return druidNodeAnnouncer; + } + + public LookupNodeService getLookupNodeService() + { + return lookupNodeService; + } + + public DataNodeService getDataNodeService() + { + return dataNodeService; + } + + public DruidNode getDruidNode() + { + return druidNode; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index ab0b5bf06d1b..43d2abea594c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -27,7 +27,11 @@ import com.metamx.metrics.MonitorScheduler; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.discovery.DataNodeService; +import io.druid.discovery.DruidNodeAnnouncer; +import io.druid.discovery.LookupNodeService; import io.druid.guice.annotations.Processing; +import io.druid.guice.annotations.RemoteChatHandler; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; @@ -39,6 +43,7 @@ import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.server.DruidNode; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentServerAnnouncer; @@ -69,6 +74,10 @@ public class TaskToolboxFactory private final Cache cache; private final CacheConfig cacheConfig; private final IndexMergerV9 indexMergerV9; + private final DruidNodeAnnouncer druidNodeAnnouncer; + private final DruidNode druidNode; + private final LookupNodeService lookupNodeService; + private final DataNodeService dataNodeService; @Inject public TaskToolboxFactory( @@ -90,7 +99,11 @@ public TaskToolboxFactory( IndexIO indexIO, Cache cache, CacheConfig cacheConfig, - IndexMergerV9 indexMergerV9 + IndexMergerV9 indexMergerV9, + DruidNodeAnnouncer druidNodeAnnouncer, + @RemoteChatHandler DruidNode druidNode, + LookupNodeService lookupNodeService, + DataNodeService dataNodeService ) { this.config = config; @@ -112,6 +125,10 @@ public TaskToolboxFactory( this.cache = cache; this.cacheConfig = cacheConfig; this.indexMergerV9 = indexMergerV9; + this.druidNodeAnnouncer = druidNodeAnnouncer; + this.druidNode = druidNode; + this.lookupNodeService = lookupNodeService; + this.dataNodeService = dataNodeService; } public TaskToolbox build(Task task) @@ -137,7 +154,11 @@ public TaskToolbox build(Task task) indexIO, cache, cacheConfig, - indexMergerV9 + indexMergerV9, + druidNodeAnnouncer, + druidNode, + lookupNodeService, + dataNodeService ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 9595a28cb2f2..7e641da3f84b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -31,6 +31,9 @@ import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.discovery.LookupNodeService; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; @@ -78,6 +81,8 @@ public class RealtimeIndexTask extends AbstractTask { + public static final String CTX_KEY_LOOKUP_TIER = "lookupTier"; + private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class); private final static Random random = new Random(); @@ -324,8 +329,22 @@ public String getVersion(final Interval interval) Supplier committerSupplier = null; final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); + LookupNodeService lookupNodeService = getContextValue(CTX_KEY_LOOKUP_TIER) == null ? + toolbox.getLookupNodeService() : + new LookupNodeService((String) getContextValue(CTX_KEY_LOOKUP_TIER)); + DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode( + toolbox.getDruidNode(), + DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + ImmutableMap.of( + toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), + lookupNodeService.getName(), lookupNodeService + ) + ); + try { toolbox.getDataSegmentServerAnnouncer().announce(); + toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); + plumber.startJob(); @@ -431,6 +450,7 @@ public void run() } toolbox.getDataSegmentServerAnnouncer().unannounce(); + toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); } log.info("Job done!"); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index 5430938f96c6..da3b66f32800 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -110,7 +110,11 @@ public void setUp() throws IOException mockIndexIO, mockCache, mockCacheConfig, - mockIndexMergerV9 + mockIndexMergerV9, + null, + null, + null, + null ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 8f93db29ebfa..fd2adc740a5c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -950,7 +950,7 @@ public Map makeLoadSpec(URI uri) throw new UnsupportedOperationException(); } }, null, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(), - indexIO, null, null, indexMergerV9 + indexIO, null, null, indexMergerV9, null, null, null, null ) ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index ce6510fa316d..c6d954420fc2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -46,6 +46,9 @@ import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; +import io.druid.discovery.DataNodeService; +import io.druid.discovery.DruidNodeAnnouncer; +import io.druid.discovery.LookupNodeService; import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; @@ -104,7 +107,9 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; +import io.druid.server.DruidNode; import io.druid.server.coordination.DataSegmentServerAnnouncer; +import io.druid.server.coordination.ServerType; import io.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.hamcrest.CoreMatchers; @@ -1048,7 +1053,11 @@ public List getLocations() testUtils.getTestIndexIO(), MapCache.create(1024), new CacheConfig(), - testUtils.getTestIndexMergerV9() + testUtils.getTestIndexMergerV9(), + EasyMock.createNiceMock(DruidNodeAnnouncer.class), + EasyMock.createNiceMock(DruidNode.class), + new LookupNodeService("tier"), + new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0) ); return toolboxFactory.build(task); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java index 4dfb87641576..265e66a177cd 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java @@ -233,7 +233,8 @@ public void cleanup(DataSegment segment) throws SegmentLoadingException { } }, jsonMapper, temporaryFolder.newFolder(), - indexIO, null, null, EasyMock.createMock(IndexMergerV9.class) + indexIO, null, null, EasyMock.createMock(IndexMergerV9.class), + null, null, null, null ) ); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 067ec8b4cd41..7b8b0c33c659 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -292,7 +292,11 @@ public List getLocations() INDEX_IO, null, null, - INDEX_MERGER_V9 + INDEX_MERGER_V9, + null, + null, + null, + null ); Collection values = new LinkedList<>(); for (InputRowParser parser : Arrays.asList( diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index ffcddf29e95b..668564a05790 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -333,7 +333,11 @@ public List getLocations() INDEX_IO, null, null, - INDEX_MERGER_V9 + INDEX_MERGER_V9, + null, + null, + null, + null ); final Injector injector = Guice.createInjector( new Module() diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 569590cbbcd9..1aa8a3f51ba4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -45,6 +45,9 @@ import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; +import io.druid.discovery.DataNodeService; +import io.druid.discovery.DruidNodeAnnouncer; +import io.druid.discovery.LookupNodeService; import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; @@ -107,6 +110,7 @@ import io.druid.server.DruidNode; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentServerAnnouncer; +import io.druid.server.coordination.ServerType; import io.druid.server.initialization.ServerConfig; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; @@ -600,7 +604,11 @@ public List getLocations() INDEX_IO, MapCache.create(0), FireDepartmentTest.NO_CACHE_CONFIG, - INDEX_MERGER_V9 + INDEX_MERGER_V9, + EasyMock.createNiceMock(DruidNodeAnnouncer.class), + EasyMock.createNiceMock(DruidNode.class), + new LookupNodeService("tier"), + new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0) ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index ef4bdba34737..82cacf88dcde 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -187,7 +187,11 @@ public List getLocations() indexIO, null, null, - indexMergerV9 + indexMergerV9, + null, + null, + null, + null ), taskConfig, new NoopServiceEmitter(), diff --git a/server/src/main/java/io/druid/client/DruidServerDiscovery.java b/server/src/main/java/io/druid/client/DruidServerDiscovery.java deleted file mode 100644 index 7b28ef6850ae..000000000000 --- a/server/src/main/java/io/druid/client/DruidServerDiscovery.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.client; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.metamx.emitter.EmittingLogger; -import io.druid.concurrent.Execs; -import io.druid.curator.inventory.CuratorInventoryManager; -import io.druid.curator.inventory.CuratorInventoryManagerStrategy; -import io.druid.curator.inventory.InventoryManagerConfig; -import io.druid.java.util.common.ISE; -import org.apache.curator.framework.CuratorFramework; - -import java.io.IOException; - -/** - * Discovers DruidServer instances that serve segments using CuratorInventoryManager. - */ -public class DruidServerDiscovery -{ - private final EmittingLogger log = new EmittingLogger(DruidServerDiscovery.class); - private final CuratorInventoryManager curatorInventoryManager; - private volatile Listener listener; - - DruidServerDiscovery( - final CuratorFramework curatorFramework, - final String announcementsPath, - final ObjectMapper jsonMapper - ) - { - curatorInventoryManager = initCuratorInventoryManager(curatorFramework, announcementsPath, jsonMapper); - } - - public void start() throws Exception - { - Preconditions.checkNotNull(listener, "listener is not configured yet"); - curatorInventoryManager.start(); - } - - public void stop() throws IOException - { - curatorInventoryManager.stop(); - } - - private CuratorInventoryManager initCuratorInventoryManager( - final CuratorFramework curator, - final String announcementsPath, - final ObjectMapper jsonMapper - ) - { - return new CuratorInventoryManager<>( - curator, - new InventoryManagerConfig() - { - @Override - public String getContainerPath() - { - return announcementsPath; - } - - @Override - public String getInventoryPath() - { - return "/NON_EXISTENT_DUMMY_INVENTORY_PATH"; - } - }, - Execs.singleThreaded("CuratorInventoryManagerBasedServerDiscovery-%s"), - new CuratorInventoryManagerStrategy() - { - @Override - public DruidServer deserializeContainer(byte[] bytes) - { - try { - return jsonMapper.readValue(bytes, DruidServer.class); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - - @Override - public void newContainer(DruidServer container) - { - log.info("New Server[%s]", container.getName()); - listener.serverAdded(container); - } - - @Override - public void deadContainer(DruidServer container) - { - log.info("Server Disappeared[%s]", container.getName()); - listener.serverRemoved(container); - } - - @Override - public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer) - { - log.info("Server updated[%s]", oldContainer.getName()); - return listener.serverUpdated(oldContainer, newContainer); - } - - @Override - public Object deserializeInventory(byte[] bytes) - { - throw new ISE("no inventory should exist."); - } - - @Override - public DruidServer addInventory( - final DruidServer container, - String inventoryKey, - final Object inventory - ) - { - throw new ISE("no inventory should exist."); - } - - @Override - public DruidServer updateInventory( - DruidServer container, String inventoryKey, Object inventory - ) - { - throw new ISE("no inventory should exist."); - } - - @Override - public DruidServer removeInventory(final DruidServer container, String inventoryKey) - { - throw new ISE("no inventory should exist."); - } - - @Override - public void inventoryInitialized() - { - log.info("Server inventory initialized."); - listener.initialized(); - } - } - ); - } - - public void registerListener(Listener listener) - { - Preconditions.checkArgument(this.listener == null, "listener registered already."); - this.listener = listener; - } - - public interface Listener - { - void serverAdded(DruidServer server); - DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer); - void serverRemoved(DruidServer server); - void initialized(); - } -} diff --git a/server/src/main/java/io/druid/client/FilteredHttpServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/FilteredHttpServerInventoryViewProvider.java index 6dbb7cf4bcb6..2741739ce429 100644 --- a/server/src/main/java/io/druid/client/FilteredHttpServerInventoryViewProvider.java +++ b/server/src/main/java/io/druid/client/FilteredHttpServerInventoryViewProvider.java @@ -23,14 +23,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicates; import com.metamx.http.client.HttpClient; +import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.annotations.Client; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.Pair; import io.druid.server.coordination.DruidServerMetadata; -import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; import javax.validation.constraints.NotNull; @@ -59,18 +58,14 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn @JacksonInject @NotNull - private ZkPathsConfig zkPaths = null; - - @JacksonInject - @NotNull - private CuratorFramework curator = null; + private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = null; @Override public HttpServerInventoryView get() { return new HttpServerInventoryView( jsonMapper, smileMapper, httpClient, - new DruidServerDiscovery(curator, zkPaths.getAnnouncementsPath(), jsonMapper), + druidNodeDiscoveryProvider, Predicates.>alwaysTrue(), config ); diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryView.java b/server/src/main/java/io/druid/client/HttpServerInventoryView.java index 03390f982eaa..201276ed473e 100644 --- a/server/src/main/java/io/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/io/druid/client/HttpServerInventoryView.java @@ -40,6 +40,10 @@ import com.metamx.http.client.response.ClientResponse; import com.metamx.http.client.response.InputStreamResponseHandler; import io.druid.concurrent.LifecycleLock; +import io.druid.discovery.DataNodeService; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscovery; +import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; @@ -83,7 +87,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredServerInventoryView { private final EmittingLogger log = new EmittingLogger(HttpServerInventoryView.class); - private final DruidServerDiscovery serverDiscovery; + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; private final LifecycleLock lifecycleLock = new LifecycleLock(); @@ -105,8 +109,6 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer // to this queue again for next update. private final BlockingQueue queue = new LinkedBlockingDeque<>(); - - private final HttpClient httpClient; private final ObjectMapper smileMapper; private final HttpServerInventoryViewConfig config; @@ -116,14 +118,14 @@ public HttpServerInventoryView( final @Json ObjectMapper jsonMapper, final @Smile ObjectMapper smileMapper, final @Global HttpClient httpClient, - final DruidServerDiscovery serverDiscovery, + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, final Predicate> defaultFilter, final HttpServerInventoryViewConfig config ) { this.httpClient = httpClient; this.smileMapper = smileMapper; - this.serverDiscovery = serverDiscovery; + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; this.defaultFilter = defaultFilter; this.finalPredicate = defaultFilter; this.config = config; @@ -178,36 +180,38 @@ public void run() } ); - serverDiscovery.registerListener( - new DruidServerDiscovery.Listener() + DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY); + druidNodeDiscovery.registerListener( + new DruidNodeDiscovery.Listener() { - @Override - public void serverAdded(DruidServer server) - { - serverAddedOrUpdated(server); - } @Override - public DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer) + public void nodeAdded(DiscoveryDruidNode node) { - return serverAddedOrUpdated(newServer); + serverAddedOrUpdated(toDruidServer(node)); } @Override - public void serverRemoved(DruidServer server) + public void nodeRemoved(DiscoveryDruidNode node) { - HttpServerInventoryView.this.serverRemoved(server); - runServerCallbacks(server); + serverRemoved(toDruidServer(node)); } - @Override - public void initialized() + private DruidServer toDruidServer(DiscoveryDruidNode node) { - serverInventoryInitialized(); + + return new DruidServer( + node.getDruidNode().getHostAndPortToUse(), + node.getDruidNode().getHostAndPort(), + node.getDruidNode().getHostAndTlsPort(), + ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getMaxSize(), + ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getType(), + ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getTier(), + ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getPriority() + ); } } ); - serverDiscovery.start(); log.info("Started HttpServerInventoryView."); lifecycleLock.started(); @@ -228,8 +232,6 @@ public void stop() throws IOException log.info("Stopping HttpServerInventoryView."); - serverDiscovery.stop(); - if (executor != null) { executor.shutdownNow(); executor = null; @@ -373,11 +375,6 @@ private void serverRemoved(DruidServer server) servers.remove(server.getName()); } - public DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer) - { - return serverAddedOrUpdated(newServer); - } - @Override public boolean isStarted() { diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/HttpServerInventoryViewProvider.java index efc109542ec2..13a71f0928e7 100644 --- a/server/src/main/java/io/druid/client/HttpServerInventoryViewProvider.java +++ b/server/src/main/java/io/druid/client/HttpServerInventoryViewProvider.java @@ -23,14 +23,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicates; import com.metamx.http.client.HttpClient; +import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.annotations.Client; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.Pair; import io.druid.server.coordination.DruidServerMetadata; -import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; import javax.validation.constraints.NotNull; @@ -59,11 +58,7 @@ public class HttpServerInventoryViewProvider implements ServerInventoryViewProvi @JacksonInject @NotNull - private ZkPathsConfig zkPaths = null; - - @JacksonInject - @NotNull - private CuratorFramework curator = null; + private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = null; @Override public HttpServerInventoryView get() @@ -72,7 +67,7 @@ public HttpServerInventoryView get() jsonMapper, smileMapper, httpClient, - new DruidServerDiscovery(curator, zkPaths.getAnnouncementsPath(), jsonMapper), + druidNodeDiscoveryProvider, Predicates.>alwaysTrue(), config ); diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeAnnouncer.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeAnnouncer.java new file mode 100644 index 000000000000..f2ea115ceb9e --- /dev/null +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeAnnouncer.java @@ -0,0 +1,93 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator.discovery; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import io.druid.curator.announcement.Announcer; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeAnnouncer; +import io.druid.guice.annotations.Json; +import io.druid.java.util.common.logger.Logger; +import io.druid.server.initialization.ZkPathsConfig; +import org.apache.curator.utils.ZKPaths; + +/** + */ +public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer +{ + private static final Logger log = new Logger(CuratorDruidNodeAnnouncer.class); + + private final Announcer announcer; + private final ZkPathsConfig config; + private final ObjectMapper jsonMapper; + + @Inject + public CuratorDruidNodeAnnouncer( + Announcer announcer, + ZkPathsConfig config, + @Json ObjectMapper jsonMapper + ) + { + this.announcer = announcer; + this.config = config; + this.jsonMapper = jsonMapper; + } + + @Override + public void announce(DiscoveryDruidNode discoveryDruidNode) + { + try { + log.info("Announcing [%s].", discoveryDruidNode); + + announcer.announce( + ZKPaths.makePath( + config.getInternalDiscoveryPath(), + discoveryDruidNode.getNodeType(), + discoveryDruidNode.getDruidNode().getHostAndPortToUse() + ), + jsonMapper.writeValueAsBytes(discoveryDruidNode) + ); + + log.info("Announced [%s].", discoveryDruidNode); + } + catch (JsonProcessingException e) { + throw Throwables.propagate(e); + } + } + + @Override + public void unannounce(DiscoveryDruidNode discoveryDruidNode) + { + log.info("Unannouncing [%s].", discoveryDruidNode); + + announcer.unannounce( + ZKPaths.makePath( + config.getInternalDiscoveryPath(), + discoveryDruidNode.getNodeType(), + discoveryDruidNode.getDruidNode().getHostAndPortToUse() + ) + ); + + log.info("Unannounced [%s].", discoveryDruidNode); + } +} diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java new file mode 100644 index 000000000000..5996f500adb7 --- /dev/null +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java @@ -0,0 +1,371 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator.discovery; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import io.druid.concurrent.Execs; +import io.druid.concurrent.LifecycleLock; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscovery; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.guice.ManageLifecycle; +import io.druid.guice.annotations.Json; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.lifecycle.LifecycleStart; +import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.java.util.common.logger.Logger; +import io.druid.server.initialization.ZkPathsConfig; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.utils.ZKPaths; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + */ +@ManageLifecycle +public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider +{ + private static final Logger log = new Logger(CuratorDruidNodeDiscoveryProvider.class); + + private final CuratorFramework curatorFramework; + private final ZkPathsConfig config; + private final ObjectMapper jsonMapper; + + private ExecutorService listenerExecutor; + + private final Map nodeTypeWatchers = new ConcurrentHashMap<>(); + + private final LifecycleLock lifecycleLock = new LifecycleLock(); + + @Inject + public CuratorDruidNodeDiscoveryProvider( + CuratorFramework curatorFramework, + ZkPathsConfig config, + @Json ObjectMapper jsonMapper + ) + { + this.curatorFramework = curatorFramework; + this.config = config; + this.jsonMapper = jsonMapper; + } + + @Override + public DruidNodeDiscovery getForNodeType(String nodeType) + { + Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); + + return nodeTypeWatchers.compute( + nodeType, + (k, v) -> { + if (v != null) { + return v; + } + + log.info("Creating NodeTypeWatcher for nodeType [%s].", nodeType); + NodeTypeWatcher nodeTypeWatcher = new NodeTypeWatcher( + listenerExecutor, + curatorFramework, + config.getInternalDiscoveryPath(), + jsonMapper, + nodeType + ); + nodeTypeWatcher.start(); + log.info("Created NodeTypeWatcher for nodeType [%s].", nodeType); + return nodeTypeWatcher; + } + ); + } + + @LifecycleStart + public void start() + { + if (!lifecycleLock.canStart()) { + throw new ISE("can't start."); + } + + try { + log.info("starting"); + + // This is single-threaded to ensure that all listener calls are executed precisely in the oder of add/remove + // event occurences. + listenerExecutor = Execs.singleThreaded("CuratorDruidNodeDiscoveryProvider-ListenerExecutor"); + + log.info("started"); + + lifecycleLock.started(); + } + finally { + lifecycleLock.exitStart(); + } + } + + @LifecycleStop + public void stop() + { + if (!lifecycleLock.canStop()) { + throw new ISE("can't stop."); + } + + log.info("stopping"); + + for (NodeTypeWatcher watcher : nodeTypeWatchers.values()) { + watcher.stop(); + } + listenerExecutor.shutdownNow(); + + log.info("stopped"); + } + + private static class NodeTypeWatcher implements DruidNodeDiscovery + { + private static final Logger log = new Logger(NodeTypeWatcher.class); + + private final CuratorFramework curatorFramework; + + private final String nodeType; + private final ObjectMapper jsonMapper; + + // hostAndPort -> DiscoveryDruidNode + private final Map nodes = new ConcurrentHashMap<>(); + + private final PathChildrenCache cache; + private final ExecutorService cacheExecutor; + + private final ExecutorService listenerExecutor; + + private final List nodeListeners = new ArrayList(); + + private final Object lock = new Object(); + + NodeTypeWatcher( + ExecutorService listenerExecutor, + CuratorFramework curatorFramework, + String basePath, + ObjectMapper jsonMapper, + String nodeType + ) + { + this.listenerExecutor = listenerExecutor; + this.curatorFramework = curatorFramework; + this.nodeType = nodeType; + this.jsonMapper = jsonMapper; + + // This is required to be single threaded from Docs in PathChildrenCache; + this.cacheExecutor = Execs.singleThreaded(String.format("NodeTypeWatcher[%s]", nodeType)); + this.cache = new PathChildrenCache( + curatorFramework, + ZKPaths.makePath(basePath, nodeType), + true, + true, + cacheExecutor + ); + } + + @Override + public Collection getAllNodes() + { + return Collections.unmodifiableCollection(nodes.values()); + } + + @Override + public void registerListener(DruidNodeDiscovery.Listener listener) + { + synchronized (lock) { + for (DiscoveryDruidNode node : nodes.values()) { + listenerExecutor.submit(() -> { + try { + listener.nodeAdded(node); + } + catch (Exception ex) { + log.error( + ex, + "Exception occured in DiscoveryDruidNode.nodeAdded(node=[%s]) in listener [%s].", + node, + listener + ); + } + }); + } + nodeListeners.add(listener); + } + } + + public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + { + synchronized (lock) { + try { + switch (event.getType()) { + case CHILD_ADDED: { + final byte[] data; + try { + data = curatorFramework.getData().decompressed().forPath(event.getData().getPath()); + } + catch (Exception ex) { + log.error( + ex, + "Failed to get data for path [%s]. Ignoring event [%s].", + event.getData().getPath(), + event.getType() + ); + return; + } + + DiscoveryDruidNode druidNode = jsonMapper.readValue( + data, + DiscoveryDruidNode.class + ); + + if (!nodeType.equals(druidNode.getNodeType())) { + log.warn( + "Node[%s:%s] add is discovered by node watcher of nodeType [%s]. Ignored.", + druidNode.getNodeType(), + druidNode, + nodeType + ); + return; + } + + log.info("Received event [%s] for Node[%s:%s].", event.getType(), druidNode.getNodeType(), druidNode); + + addNode(druidNode); + + break; + } + case CHILD_REMOVED: { + DiscoveryDruidNode druidNode = jsonMapper.readValue(event.getData().getData(), DiscoveryDruidNode.class); + + if (!nodeType.equals(druidNode.getNodeType())) { + log.warn( + "Node[%s:%s] removal is discovered by node watcher of nodeType [%s]. Ignored.", + druidNode.getNodeType(), + druidNode, + nodeType + ); + return; + } + + log.info("Node[%s:%s] disappeared.", druidNode.getNodeType(), druidNode); + + removeNode(druidNode); + + break; + } + default: { + log.error("Ignored event type [%s] for nodeType [%s] watcher.", event.getType(), nodeType); + } + } + } + catch (Exception ex) { + log.error(ex, "unknown error in node watcher for type [%s].", nodeType); + } + } + } + + private void addNode(DiscoveryDruidNode druidNode) + { + synchronized (lock) { + DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode); + if (prev == null) { + for (DruidNodeDiscovery.Listener l : nodeListeners) { + listenerExecutor.submit(() -> { + try { + l.nodeAdded(druidNode); + } + catch (Exception ex) { + log.error( + ex, + "Exception occured in DiscoveryDruidNode.nodeAdded(node=[%s]) in listener [%s].", + druidNode, + l + ); + } + }); + } + } else { + log.warn("Node[%s] discovered but existed already [%s].", druidNode, prev); + } + } + } + + private void removeNode(DiscoveryDruidNode druidNode) + { + synchronized (lock) { + DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse()); + + if (prev == null) { + log.warn("Noticed disappearance of unknown druid node [%s:%s].", druidNode.getNodeType(), druidNode); + return; + } + + for (DruidNodeDiscovery.Listener l : nodeListeners) { + listenerExecutor.submit(() -> { + try { + l.nodeRemoved(druidNode); + } + catch (Exception ex) { + log.error( + ex, + "Exception occured in DiscoveryDruidNode.nodeRemoved(node=[%s]) in listener [%s].", + druidNode, + l + ); + } + }); + } + } + } + + public void start() + { + try { + cache.getListenable().addListener( + (client, event) -> handleChildEvent(client, event) + ); + cache.start(); + } + catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + public void stop() + { + try { + cache.close(); + cacheExecutor.shutdownNow(); + } + catch (Exception ex) { + log.error(ex, "Failed to stop node watcher for type [%s].", nodeType); + } + } + } +} diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index 50eb29f8e875..f30a2060e420 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -30,12 +30,14 @@ import com.google.inject.TypeLiteral; import com.google.inject.name.Named; import com.google.inject.name.Names; - +import io.druid.discovery.DruidNodeAnnouncer; +import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.DruidBinders; import io.druid.guice.JsonConfigProvider; import io.druid.guice.KeyHolder; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; +import io.druid.guice.PolyBind; import io.druid.java.util.common.lifecycle.Lifecycle; import io.druid.server.DruidNode; import io.druid.server.initialization.CuratorDiscoveryConfig; @@ -74,6 +76,9 @@ public class DiscoveryModule implements Module { private static final String NAME = "DiscoveryModule:internal"; + private static final String INTERNAL_DISCOVERY_PROP = "druid.discovery.type"; + private static final String CURATOR_KEY = "curator"; + /** * Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle. * @@ -146,6 +151,25 @@ public void configure(Binder binder) binder.bind(ServiceAnnouncer.class) .to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME))) .in(LazySingleton.class); + + // internal discovery bindings. + PolyBind.createChoiceWithDefault( + binder, INTERNAL_DISCOVERY_PROP, Key.get(DruidNodeAnnouncer.class), CURATOR_KEY + ); + + PolyBind.createChoiceWithDefault( + binder, INTERNAL_DISCOVERY_PROP, Key.get(DruidNodeDiscoveryProvider.class), CURATOR_KEY + ); + + PolyBind.optionBinder(binder, Key.get(DruidNodeDiscoveryProvider.class)) + .addBinding(CURATOR_KEY) + .to(CuratorDruidNodeDiscoveryProvider.class) + .in(LazySingleton.class); + + PolyBind.optionBinder(binder, Key.get(DruidNodeAnnouncer.class)) + .addBinding(CURATOR_KEY) + .to(CuratorDruidNodeAnnouncer.class) + .in(LazySingleton.class); } @Provides diff --git a/server/src/main/java/io/druid/discovery/DataNodeService.java b/server/src/main/java/io/druid/discovery/DataNodeService.java new file mode 100644 index 000000000000..250449b57ec8 --- /dev/null +++ b/server/src/main/java/io/druid/discovery/DataNodeService.java @@ -0,0 +1,116 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.server.coordination.ServerType; + +import java.util.Objects; + +/** + * Metadata announced by any node that serves segments. + */ +public class DataNodeService extends DruidService +{ + public static final String DISCOVERY_SERVICE_KEY = "dataNodeService"; + + private final String tier; + private final long maxSize; + private final ServerType type; + private final int priority; + + @JsonCreator + public DataNodeService( + @JsonProperty("tier") String tier, + @JsonProperty("maxSize") long maxSize, + @JsonProperty("type") ServerType type, + @JsonProperty("priority") int priority + ) + { + this.tier = tier; + this.maxSize = maxSize; + this.type = type; + this.priority = priority; + } + + @Override + public String getName() + { + return DISCOVERY_SERVICE_KEY; + } + + @JsonProperty + public String getTier() + { + return tier; + } + + @JsonProperty + public long getMaxSize() + { + return maxSize; + } + + @JsonProperty + public ServerType getType() + { + return type; + } + + @JsonProperty + public int getPriority() + { + return priority; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DataNodeService that = (DataNodeService) o; + return maxSize == that.maxSize && + priority == that.priority && + Objects.equals(tier, that.tier) && + type == that.type; + } + + @Override + public int hashCode() + { + return Objects.hash(tier, maxSize, type, priority); + } + + @Override + public String toString() + { + return "DataNodeService{" + + "tier='" + tier + '\'' + + ", maxSize=" + maxSize + + ", type=" + type + + ", priority=" + priority + + '}'; + } +} diff --git a/server/src/main/java/io/druid/discovery/DiscoveryDruidNode.java b/server/src/main/java/io/druid/discovery/DiscoveryDruidNode.java new file mode 100644 index 000000000000..c6ab1fcbf6f6 --- /dev/null +++ b/server/src/main/java/io/druid/discovery/DiscoveryDruidNode.java @@ -0,0 +1,108 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.server.DruidNode; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Representation of all information related to discovery of a node and all the other metadata associated with + * the node per nodeType such as broker, historical etc. + * Note that one Druid process might announce multiple DiscoveryDruidNode if it acts as multiple nodeTypes e.g. + * coordinator would announce DiscoveryDruidNode for overlord nodeType as well when acting as overlord. + */ +public class DiscoveryDruidNode +{ + private final DruidNode druidNode; + private final String nodeType; + + // Other metadata associated with the node e.g. + // if its a historical node then lookup information, segment loading capacity etc. + private final Map services = new HashMap<>(); + + @JsonCreator + public DiscoveryDruidNode( + @JsonProperty("druidNode") DruidNode druidNode, + @JsonProperty("nodeType") String nodeType, + @JsonProperty("services") Map services + ) + { + this.druidNode = druidNode; + this.nodeType = nodeType; + + if (services != null && !services.isEmpty()) { + this.services.putAll(services); + } + } + + @JsonProperty + public Map getServices() + { + return services; + } + + @JsonProperty + public String getNodeType() + { + return nodeType; + } + + @JsonProperty + public DruidNode getDruidNode() + { + return druidNode; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DiscoveryDruidNode that = (DiscoveryDruidNode) o; + return Objects.equals(druidNode, that.druidNode) && + Objects.equals(nodeType, that.nodeType) && + Objects.equals(services, that.services); + } + + @Override + public int hashCode() + { + return Objects.hash(druidNode, nodeType, services); + } + + @Override + public String toString() + { + return "DiscoveryDruidNode{" + + "druidNode=" + druidNode + + ", nodeType='" + nodeType + '\'' + + ", services=" + services + + '}'; + } +} diff --git a/server/src/main/java/io/druid/discovery/DruidNodeAnnouncer.java b/server/src/main/java/io/druid/discovery/DruidNodeAnnouncer.java new file mode 100644 index 000000000000..78892ba51d59 --- /dev/null +++ b/server/src/main/java/io/druid/discovery/DruidNodeAnnouncer.java @@ -0,0 +1,29 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +/** + * DiscoveryDruidNode announcer for internal discovery. + */ +public interface DruidNodeAnnouncer +{ + void announce(DiscoveryDruidNode discoveryDruidNode); + void unannounce(DiscoveryDruidNode discoveryDruidNode); +} diff --git a/server/src/main/java/io/druid/discovery/DruidNodeDiscovery.java b/server/src/main/java/io/druid/discovery/DruidNodeDiscovery.java new file mode 100644 index 000000000000..7b051ccbf00b --- /dev/null +++ b/server/src/main/java/io/druid/discovery/DruidNodeDiscovery.java @@ -0,0 +1,37 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import java.util.Collection; + +/** + * Interface for discovering Druid Nodes announced by DruidNodeAnnouncer. + */ +public interface DruidNodeDiscovery +{ + Collection getAllNodes(); + void registerListener(Listener listener); + + interface Listener + { + void nodeAdded(DiscoveryDruidNode node); + void nodeRemoved(DiscoveryDruidNode node); + } +} diff --git a/server/src/main/java/io/druid/discovery/DruidNodeDiscoveryProvider.java b/server/src/main/java/io/druid/discovery/DruidNodeDiscoveryProvider.java new file mode 100644 index 000000000000..035840958369 --- /dev/null +++ b/server/src/main/java/io/druid/discovery/DruidNodeDiscoveryProvider.java @@ -0,0 +1,154 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.logger.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Provider of DruidNodeDiscovery instances. + */ +public abstract class DruidNodeDiscoveryProvider +{ + private static final Logger log = new Logger(DruidNodeDiscoveryProvider.class); + + public static final String NODE_TYPE_COORDINATOR = "coordinator"; + public static final String NODE_TYPE_HISTORICAL = "historical"; + public static final String NODE_TYPE_BROKER = "broker"; + public static final String NODE_TYPE_OVERLORD = "overlord"; + public static final String NODE_TYPE_PEON = "peon"; + public static final String NODE_TYPE_ROUTER = "router"; + public static final String NODE_TYPE_MM = "middleManager"; + + public static final Set ALL_NODE_TYPES = ImmutableSet.of( + NODE_TYPE_COORDINATOR, + NODE_TYPE_HISTORICAL, + NODE_TYPE_BROKER, + NODE_TYPE_OVERLORD, + NODE_TYPE_PEON, + NODE_TYPE_ROUTER, + NODE_TYPE_MM + ); + + private static final Map> SERVICE_TO_NODE_TYPES = ImmutableMap.of( + LookupNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_BROKER, NODE_TYPE_HISTORICAL, NODE_TYPE_PEON), + DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_HISTORICAL, NODE_TYPE_PEON), + WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_MM) + ); + + private final Map serviceDiscoveryMap = new ConcurrentHashMap<>(SERVICE_TO_NODE_TYPES.size()); + + /** + * Get DruidNodeDiscovery instance to discover nodes of given nodeType. + */ + public abstract DruidNodeDiscovery getForNodeType(String nodeType); + + /** + * Get DruidNodeDiscovery instance to discover nodes that announce given service in its metadata. + */ + public synchronized DruidNodeDiscovery getForService(String serviceName) + { + ServiceListener nodeDiscovery = serviceDiscoveryMap.get(serviceName); + + if (nodeDiscovery == null) { + Set nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName); + if (nodeTypesToWatch == null) { + throw new IAE("Unknown service [%s].", serviceName); + } + + nodeDiscovery = new ServiceListener(serviceName); + for (String nodeType : nodeTypesToWatch) { + getForNodeType(nodeType).registerListener(nodeDiscovery); + } + serviceDiscoveryMap.put(serviceName, nodeDiscovery); + } + + return nodeDiscovery; + } + + private static class ServiceListener implements DruidNodeDiscovery, DruidNodeDiscovery.Listener + { + private final String service; + private final Map nodes = new ConcurrentHashMap<>(); + + private final List listeners = new ArrayList<>(); + + ServiceListener(String service) + { + this.service = service; + } + + @Override + public synchronized void nodeAdded(DiscoveryDruidNode node) + { + if (node.getServices().containsKey(service)) { + DiscoveryDruidNode prev = nodes.putIfAbsent(node.getDruidNode().getHostAndPortToUse(), node); + + if (prev == null) { + for (Listener listener : listeners) { + listener.nodeAdded(node); + } + } else { + log.warn("Node[%s] discovered but already exists [%s].", node, prev); + } + } else { + log.warn("Node[%s] discovered but doesn't have service[%s]. Ignored.", node, service); + } + } + + @Override + public synchronized void nodeRemoved(DiscoveryDruidNode node) + { + DiscoveryDruidNode prev = nodes.remove(node.getDruidNode().getHostAndPortToUse()); + if (prev != null) { + for (Listener listener : listeners) { + listener.nodeRemoved(node); + } + } else { + log.warn("Node[%s] disappeared but was unknown for service listener [%s].", node, service); + } + } + + @Override + public Collection getAllNodes() + { + return Collections.unmodifiableCollection(nodes.values()); + } + + @Override + public synchronized void registerListener(Listener listener) + { + for (DiscoveryDruidNode node : nodes.values()) { + listener.nodeAdded(node); + } + listeners.add(listener); + } + } +} diff --git a/server/src/main/java/io/druid/discovery/DruidService.java b/server/src/main/java/io/druid/discovery/DruidService.java new file mode 100644 index 000000000000..6036864966a1 --- /dev/null +++ b/server/src/main/java/io/druid/discovery/DruidService.java @@ -0,0 +1,37 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +/** + * Metadata of a service announced by node. See DataNodeService and LookupNodeService for examples. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = DataNodeService.DISCOVERY_SERVICE_KEY, value = DataNodeService.class), + @JsonSubTypes.Type(name = LookupNodeService.DISCOVERY_SERVICE_KEY, value = LookupNodeService.class), + @JsonSubTypes.Type(name = WorkerNodeService.DISCOVERY_SERVICE_KEY, value = WorkerNodeService.class) +}) +public abstract class DruidService +{ + public abstract String getName(); +} diff --git a/server/src/main/java/io/druid/discovery/LookupNodeService.java b/server/src/main/java/io/druid/discovery/LookupNodeService.java new file mode 100644 index 000000000000..7560795bbad0 --- /dev/null +++ b/server/src/main/java/io/druid/discovery/LookupNodeService.java @@ -0,0 +1,80 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Metadata announced by any node that serves queries and hence applies lookups. + */ +public class LookupNodeService extends DruidService +{ + public static final String DISCOVERY_SERVICE_KEY = "lookupNodeService"; + + private final String lookupTier; + + public LookupNodeService( + @JsonProperty("lookupTier") String lookupTier + ) + { + this.lookupTier = lookupTier; + } + + @Override + public String getName() + { + return DISCOVERY_SERVICE_KEY; + } + + @JsonProperty + public String getLookupTier() + { + return lookupTier; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LookupNodeService that = (LookupNodeService) o; + return Objects.equals(lookupTier, that.lookupTier); + } + + @Override + public int hashCode() + { + return Objects.hash(lookupTier); + } + + @Override + public String toString() + { + return "LookupNodeService{" + + "lookupTier='" + lookupTier + '\'' + + '}'; + } +} diff --git a/server/src/main/java/io/druid/discovery/WorkerNodeService.java b/server/src/main/java/io/druid/discovery/WorkerNodeService.java new file mode 100644 index 000000000000..070f54b22d50 --- /dev/null +++ b/server/src/main/java/io/druid/discovery/WorkerNodeService.java @@ -0,0 +1,102 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Worker metadata announced by Middle Manager. + */ +public class WorkerNodeService extends DruidService +{ + public static final String DISCOVERY_SERVICE_KEY = "workerNodeService"; + + private final String ip; + private final int capacity; + private final String version; + + public WorkerNodeService( + @JsonProperty("ip") String ip, + @JsonProperty("capacity") int capacity, + @JsonProperty("version") String version + ) + { + this.ip = ip; + this.capacity = capacity; + this.version = version; + } + + @Override + public String getName() + { + return DISCOVERY_SERVICE_KEY; + } + + @JsonProperty + public String getIp() + { + return ip; + } + + @JsonProperty + public int getCapacity() + { + return capacity; + } + + @JsonProperty + public String getVersion() + { + return version; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WorkerNodeService that = (WorkerNodeService) o; + return capacity == that.capacity && + Objects.equals(ip, that.ip) && + Objects.equals(version, that.version); + } + + @Override + public int hashCode() + { + return Objects.hash(ip, capacity, version); + } + + @Override + public String toString() + { + return "WorkerNodeService{" + + "ip='" + ip + '\'' + + ", capacity=" + capacity + + ", version='" + version + '\'' + + '}'; + } +} diff --git a/server/src/main/java/io/druid/guice/StorageNodeModule.java b/server/src/main/java/io/druid/guice/StorageNodeModule.java index 2a1cee02b065..1bff1220ab37 100644 --- a/server/src/main/java/io/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/io/druid/guice/StorageNodeModule.java @@ -25,6 +25,7 @@ import com.google.inject.ProvisionException; import com.google.inject.util.Providers; import io.druid.client.DruidServerConfig; +import io.druid.discovery.DataNodeService; import io.druid.guice.annotations.Self; import io.druid.query.DruidProcessingConfig; import io.druid.segment.column.ColumnConfig; @@ -66,4 +67,23 @@ public DruidServerMetadata getMetadata(@Self DruidNode node, @Nullable NodeTypeC config.getPriority() ); } + + @Provides + @LazySingleton + public DataNodeService getDataNodeService( + @Nullable NodeTypeConfig nodeType, + DruidServerConfig config + ) + { + if (nodeType == null) { + throw new ProvisionException("Must override the binding for NodeTypeConfig if you want a DruidServerMetadata."); + } + + return new DataNodeService( + config.getTier(), + config.getMaxSize(), + nodeType.getNodeType(), + config.getPriority() + ); + } } diff --git a/server/src/main/java/io/druid/query/lookup/LookupModule.java b/server/src/main/java/io/druid/query/lookup/LookupModule.java index 51b7ba9b36ae..151c65fbbedc 100644 --- a/server/src/main/java/io/druid/query/lookup/LookupModule.java +++ b/server/src/main/java/io/druid/query/lookup/LookupModule.java @@ -32,11 +32,14 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; import com.google.inject.Inject; +import com.google.inject.Provides; import io.druid.common.utils.ServletResourceUtils; import io.druid.curator.announcement.Announcer; import io.druid.guice.ExpressionModule; +import io.druid.discovery.LookupNodeService; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Json; @@ -101,6 +104,13 @@ public void configure(Binder binder) 2 // 1 for "normal" operation and 1 for "emergency" or other ); } + + @Provides + @LazySingleton + public LookupNodeService getLookupNodeService(LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig) + { + return new LookupNodeService(lookupListeningAnnouncerConfig.getLookupTier()); + } } @Path(ListenerResource.BASE_PATH + "/" + LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY) diff --git a/server/src/main/java/io/druid/server/http/ClusterResource.java b/server/src/main/java/io/druid/server/http/ClusterResource.java new file mode 100644 index 000000000000..4092ed34a38f --- /dev/null +++ b/server/src/main/java/io/druid/server/http/ClusterResource.java @@ -0,0 +1,113 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.http; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import com.sun.jersey.spi.container.ResourceFilters; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.guice.LazySingleton; +import io.druid.server.http.security.StateResourceFilter; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.util.Collection; + +/** + */ +@Path("/druid/coordinator/v1/cluster") +@LazySingleton +@ResourceFilters(StateResourceFilter.class) +public class ClusterResource +{ + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + + @Inject + public ClusterResource(DruidNodeDiscoveryProvider discoveryProvider) + { + this.druidNodeDiscoveryProvider = discoveryProvider; + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getClusterServers() + { + ImmutableMap.Builder entityBuilder = new ImmutableMap.Builder<>(); + + entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR) + .getAllNodes() + ); + entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, + druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD) + .getAllNodes() + ); + entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER) + .getAllNodes() + ); + entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL) + .getAllNodes() + ); + + Collection mmNodes = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM) + .getAllNodes(); + if (!mmNodes.isEmpty()) { + entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_MM, mmNodes); + } + + Collection routerNodes = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER) + .getAllNodes(); + if (!routerNodes.isEmpty()) { + entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, routerNodes); + } + + return Response.status(Response.Status.OK).entity(entityBuilder.build()).build(); + } + + @GET + @Produces({MediaType.APPLICATION_JSON}) + @Path("/{nodeType}") + public Response getClusterServers( + @PathParam("nodeType") String nodeType + ) + { + if (nodeType == null || !DruidNodeDiscoveryProvider.ALL_NODE_TYPES.contains(nodeType)) { + return Response.serverError() + .status(Response.Status.BAD_REQUEST) + .entity(String.format( + "Invalid nodeType [%s]. Valid node types are %s .", + nodeType, + DruidNodeDiscoveryProvider.ALL_NODE_TYPES + )) + .build(); + } else { + return Response.status(Response.Status.OK).entity( + druidNodeDiscoveryProvider.getForNodeType(nodeType).getAllNodes() + ).build(); + } + } +} diff --git a/server/src/main/java/io/druid/server/initialization/ServerConfig.java b/server/src/main/java/io/druid/server/initialization/ServerConfig.java index b299908bca4d..82e85318ee58 100644 --- a/server/src/main/java/io/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ServerConfig.java @@ -24,6 +24,7 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import java.util.Objects; /** */ @@ -81,6 +82,30 @@ public boolean isTls() return tls; } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ServerConfig that = (ServerConfig) o; + return numThreads == that.numThreads && + defaultQueryTimeout == that.defaultQueryTimeout && + maxScatterGatherBytes == that.maxScatterGatherBytes && + plaintext == that.plaintext && + tls == that.tls && + Objects.equals(maxIdleTime, that.maxIdleTime); + } + + @Override + public int hashCode() + { + return Objects.hash(numThreads, maxIdleTime, defaultQueryTimeout, maxScatterGatherBytes, plaintext, tls); + } + @Override public String toString() { diff --git a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java index 96e3532eb86d..3e8006a029d4 100644 --- a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java @@ -90,6 +90,11 @@ public String getConnectorPath() return (null == connectorPath) ? defaultPath("connector") : connectorPath; } + public String getInternalDiscoveryPath() + { + return defaultPath("internal-discovery"); + } + public String defaultPath(final String subPath) { return ZKPaths.makePath(getBase(), subPath); diff --git a/server/src/test/java/io/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java b/server/src/test/java/io/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java new file mode 100644 index 000000000000..2ddce0843d94 --- /dev/null +++ b/server/src/test/java/io/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator.discovery; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.curator.CuratorTestBase; +import io.druid.curator.announcement.Announcer; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscovery; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.DruidNode; +import io.druid.server.initialization.ServerConfig; +import io.druid.server.initialization.ZkPathsConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + */ +public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase +{ + @Before + public void setUp() throws Exception + { + setupServerAndCurator(); + } + + @Test(timeout = 5000) + public void testAnnouncementAndDiscovery() throws Exception + { + ObjectMapper objectMapper = new DefaultObjectMapper(); + + //additional setup to serde DruidNode + objectMapper.setInjectableValues(new InjectableValues.Std() + .addValue(ServerConfig.class, new ServerConfig()) + .addValue("java.lang.String", "dummy") + .addValue("java.lang.Integer", 1234) + ); + + curator.start(); + curator.blockUntilConnected(); + + Announcer announcer = new Announcer( + curator, + MoreExecutors.sameThreadExecutor() + ); + announcer.start(); + + CuratorDruidNodeAnnouncer druidNodeAnnouncer = new CuratorDruidNodeAnnouncer( + announcer, + new ZkPathsConfig(), + objectMapper + ); + + DiscoveryDruidNode node1 = new DiscoveryDruidNode( + new DruidNode("s1", "h1", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + ImmutableMap.of() + ); + + DiscoveryDruidNode node2 = new DiscoveryDruidNode( + new DruidNode("s2", "h2", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + ImmutableMap.of() + ); + + DiscoveryDruidNode node3 = new DiscoveryDruidNode( + new DruidNode("s3", "h3", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, + ImmutableMap.of() + ); + + DiscoveryDruidNode node4 = new DiscoveryDruidNode( + new DruidNode("s4", "h4", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, + ImmutableMap.of() + ); + + druidNodeAnnouncer.announce(node1); + druidNodeAnnouncer.announce(node3); + + CuratorDruidNodeDiscoveryProvider druidNodeDiscoveryProvider = new CuratorDruidNodeDiscoveryProvider( + curator, + new ZkPathsConfig(), + objectMapper + ); + druidNodeDiscoveryProvider.start(); + + DruidNodeDiscovery coordDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR); + DruidNodeDiscovery overlordDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD); + + while (!checkNodes(ImmutableSet.of(node1), coordDiscovery.getAllNodes())) { + Thread.sleep(100); + } + + while (!checkNodes(ImmutableSet.of(node3), overlordDiscovery.getAllNodes())) { + Thread.sleep(100); + } + + HashSet coordNodes = new HashSet<>(); + coordDiscovery.registerListener( + new DruidNodeDiscovery.Listener() + { + @Override + public void nodeAdded(DiscoveryDruidNode node) + { + coordNodes.add(node); + } + + @Override + public void nodeRemoved(DiscoveryDruidNode node) + { + coordNodes.remove(node); + } + } + ); + + HashSet overlordNodes = new HashSet<>(); + overlordDiscovery.registerListener( + new DruidNodeDiscovery.Listener() + { + @Override + public void nodeAdded(DiscoveryDruidNode node) + { + overlordNodes.add(node); + } + + @Override + public void nodeRemoved(DiscoveryDruidNode node) + { + overlordNodes.remove(node); + } + } + ); + + while (!checkNodes(ImmutableSet.of(node1), coordNodes)) { + Thread.sleep(100); + } + + while (!checkNodes(ImmutableSet.of(node3), overlordNodes)) { + Thread.sleep(100); + } + + druidNodeAnnouncer.announce(node2); + druidNodeAnnouncer.announce(node4); + + while (!checkNodes(ImmutableSet.of(node1, node2), coordDiscovery.getAllNodes())) { + Thread.sleep(100); + } + + while (!checkNodes(ImmutableSet.of(node3, node4), overlordDiscovery.getAllNodes())) { + Thread.sleep(100); + } + + while (!checkNodes(ImmutableSet.of(node1, node2), coordNodes)) { + Thread.sleep(100); + } + + while (!checkNodes(ImmutableSet.of(node3, node4), overlordNodes)) { + Thread.sleep(100); + } + + druidNodeAnnouncer.unannounce(node1); + druidNodeAnnouncer.unannounce(node2); + druidNodeAnnouncer.unannounce(node3); + druidNodeAnnouncer.unannounce(node4); + + while (!checkNodes(ImmutableSet.of(), coordDiscovery.getAllNodes())) { + Thread.sleep(100); + } + + while (!checkNodes(ImmutableSet.of(), overlordDiscovery.getAllNodes())) { + Thread.sleep(100); + } + + while (!coordNodes.isEmpty()) { + Thread.sleep(100); + } + + while (!overlordNodes.isEmpty()) { + Thread.sleep(100); + } + + druidNodeDiscoveryProvider.stop(); + announcer.stop(); + } + + private boolean checkNodes(Set expected, Collection actual) + { + return expected.equals(ImmutableSet.copyOf(actual)); + } + + @After + public void tearDown() + { + tearDownServerAndCurator(); + } +} diff --git a/server/src/test/java/io/druid/discovery/DataNodeServiceTest.java b/server/src/test/java/io/druid/discovery/DataNodeServiceTest.java new file mode 100644 index 000000000000..f2f13c41adf0 --- /dev/null +++ b/server/src/test/java/io/druid/discovery/DataNodeServiceTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.segment.TestHelper; +import io.druid.server.coordination.ServerType; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class DataNodeServiceTest +{ + @Test + public void testSerde() throws Exception + { + DruidService expected = new DataNodeService( + "tier", + 100, + ServerType.HISTORICAL, + 1 + ); + + ObjectMapper mapper = TestHelper.getJsonMapper(); + DruidService actual = mapper.readValue( + mapper.writeValueAsString(expected), + DruidService.class + ); + + Assert.assertEquals(expected, actual); + } +} diff --git a/server/src/test/java/io/druid/discovery/DruidNodeDiscoveryProviderTest.java b/server/src/test/java/io/druid/discovery/DruidNodeDiscoveryProviderTest.java new file mode 100644 index 000000000000..e9e5b0e689fd --- /dev/null +++ b/server/src/test/java/io/druid/discovery/DruidNodeDiscoveryProviderTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.druid.server.DruidNode; +import io.druid.server.coordination.ServerType; +import io.druid.server.initialization.ServerConfig; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + */ +public class DruidNodeDiscoveryProviderTest +{ + @Test + public void testGetForService() + { + TestDruidNodeDiscoveryProvider provider = new TestDruidNodeDiscoveryProvider(); + + DruidNodeDiscovery dataNodeDiscovery = provider.getForService(DataNodeService.DISCOVERY_SERVICE_KEY); + Set dataNodes = new HashSet<>(); + dataNodeDiscovery.registerListener( + new DruidNodeDiscovery.Listener() + { + @Override + public void nodeAdded(DiscoveryDruidNode node) + { + dataNodes.add(node); + } + + @Override + public void nodeRemoved(DiscoveryDruidNode node) + { + dataNodes.remove(node); + } + } + ); + + DruidNodeDiscovery lookupNodeDiscovery = provider.getForService(LookupNodeService.DISCOVERY_SERVICE_KEY); + Set lookupNodes = new HashSet<>(); + lookupNodeDiscovery.registerListener( + new DruidNodeDiscovery.Listener() + { + @Override + public void nodeAdded(DiscoveryDruidNode node) + { + lookupNodes.add(node); + } + + @Override + public void nodeRemoved(DiscoveryDruidNode node) + { + lookupNodes.remove(node); + } + } + ); + + Assert.assertTrue(dataNodes.isEmpty()); + Assert.assertTrue(dataNodes.isEmpty()); + Assert.assertTrue(dataNodeDiscovery.getAllNodes().isEmpty()); + Assert.assertTrue(lookupNodes.isEmpty()); + Assert.assertTrue(lookupNodeDiscovery.getAllNodes().isEmpty()); + + DiscoveryDruidNode node1 = new DiscoveryDruidNode( + new DruidNode("s1", "h1", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0), + LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) + ); + + DiscoveryDruidNode node2 = new DiscoveryDruidNode( + new DruidNode("s2", "h2", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)) + ); + + DiscoveryDruidNode node3 = new DiscoveryDruidNode( + new DruidNode("s3", "h3", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + ImmutableMap.of( + LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) + ); + + DiscoveryDruidNode node4 = new DiscoveryDruidNode( + new DruidNode("s4", "h4", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0), + LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) + ); + + DiscoveryDruidNode node5 = new DiscoveryDruidNode( + new DruidNode("s5", "h5", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + ImmutableMap.of( + DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)) + ); + + DiscoveryDruidNode node6 = new DiscoveryDruidNode( + new DruidNode("s6", "h6", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + ImmutableMap.of( + LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) + ); + + DiscoveryDruidNode node7 = new DiscoveryDruidNode( + new DruidNode("s7", "h7", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + ImmutableMap.of( + LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) + ); + + DiscoveryDruidNode node7Clone = new DiscoveryDruidNode( + new DruidNode("s7", "h7", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + ImmutableMap.of( + LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) + ); + + DiscoveryDruidNode node8 = new DiscoveryDruidNode( + new DruidNode("s8", "h8", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + ImmutableMap.of() + ); + + provider.add(node1); + provider.add(node2); + provider.add(node3); + provider.add(node4); + provider.add(node5); + provider.add(node6); + provider.add(node7); + provider.add(node7Clone); + provider.add(node8); + + Assert.assertEquals(ImmutableSet.of(node1, node2, node4, node5), ImmutableSet.copyOf(dataNodeDiscovery.getAllNodes())); + Assert.assertEquals(ImmutableSet.of(node1, node2, node4, node5), dataNodes); + + Assert.assertEquals(ImmutableSet.of(node1, node3, node4, node6, node7), ImmutableSet.copyOf(lookupNodeDiscovery.getAllNodes())); + Assert.assertEquals(ImmutableSet.of(node1, node3, node4, node6, node7), lookupNodes); + + provider.remove(node8); + provider.remove(node7Clone); + provider.remove(node6); + provider.remove(node5); + provider.remove(node4); + + Assert.assertEquals(ImmutableSet.of(node1, node2), ImmutableSet.copyOf(dataNodeDiscovery.getAllNodes())); + Assert.assertEquals(ImmutableSet.of(node1, node2), dataNodes); + + Assert.assertEquals(ImmutableSet.of(node1, node3), ImmutableSet.copyOf(lookupNodeDiscovery.getAllNodes())); + Assert.assertEquals(ImmutableSet.of(node1, node3), lookupNodes); + } + + private static class TestDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider + { + private List listeners = new ArrayList<>(); + + @Override + public DruidNodeDiscovery getForNodeType(String nodeType) + { + return new DruidNodeDiscovery() + { + @Override + public Set getAllNodes() + { + throw new UnsupportedOperationException(); + } + + @Override + public void registerListener(Listener listener) + { + TestDruidNodeDiscoveryProvider.this.listeners.add(listener); + } + }; + } + + void add(DiscoveryDruidNode node) + { + for (DruidNodeDiscovery.Listener listener : listeners) { + listener.nodeAdded(node); + } + } + + void remove(DiscoveryDruidNode node) + { + for (DruidNodeDiscovery.Listener listener : listeners) { + listener.nodeRemoved(node); + } + } + } +} diff --git a/server/src/test/java/io/druid/discovery/LookupNodeServiceTest.java b/server/src/test/java/io/druid/discovery/LookupNodeServiceTest.java new file mode 100644 index 000000000000..69e5ae6930e2 --- /dev/null +++ b/server/src/test/java/io/druid/discovery/LookupNodeServiceTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class LookupNodeServiceTest +{ + @Test + public void testSerde() throws Exception + { + DruidService expected = new LookupNodeService( + "tier" + ); + + ObjectMapper mapper = TestHelper.getJsonMapper(); + DruidService actual = mapper.readValue( + mapper.writeValueAsString(expected), + DruidService.class + ); + + Assert.assertEquals(expected, actual); + } +} diff --git a/server/src/test/java/io/druid/discovery/WorkerNodeServiceTest.java b/server/src/test/java/io/druid/discovery/WorkerNodeServiceTest.java new file mode 100644 index 000000000000..fd860cb568a0 --- /dev/null +++ b/server/src/test/java/io/druid/discovery/WorkerNodeServiceTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class WorkerNodeServiceTest +{ + @Test + public void testSerde() throws Exception + { + DruidService expected = new WorkerNodeService( + "1.1.1.1", + 100, + "v1" + ); + + ObjectMapper mapper = TestHelper.getJsonMapper(); + DruidService actual = mapper.readValue( + mapper.writeValueAsString(expected), + DruidService.class + ); + + Assert.assertEquals(expected, actual); + } +} diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 95586e7af29f..3bade6247bf3 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.name.Names; import io.airlift.airline.Command; @@ -33,6 +34,8 @@ import io.druid.client.selector.CustomTierSelectorStrategyConfig; import io.druid.client.selector.ServerSelectorStrategy; import io.druid.client.selector.TierSelectorStrategy; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.discovery.LookupNodeService; import io.druid.guice.CacheModule; import io.druid.guice.DruidProcessingModule; import io.druid.guice.Jerseys; @@ -120,6 +123,14 @@ public void configure(Binder binder) MetricsModule.register(binder, CacheMonitor.class); LifecycleModule.register(binder, Server.class); + + binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( + new DiscoverySideEffectsProvider( + DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + ImmutableList.of(LookupNodeService.class) + ) + ).in(LazySingleton.class); + LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); } }, new LookupModule(), diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 87fba0a3d145..939ddeaceb59 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -21,16 +21,19 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Inject; +import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.name.Names; - import io.airlift.airline.Command; import io.druid.audit.AuditManager; import io.druid.client.CoordinatorServerView; +import io.druid.client.coordinator.Coordinator; import io.druid.client.indexing.IndexingServiceClient; +import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.ConditionalMultibind; import io.druid.guice.ConfigProvider; import io.druid.guice.Jerseys; @@ -58,6 +61,7 @@ import io.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller; import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger; import io.druid.server.coordinator.helper.DruidCoordinatorVersionConverter; +import io.druid.server.http.ClusterResource; import io.druid.server.http.CoordinatorDynamicConfigsResource; import io.druid.server.http.CoordinatorRedirectInfo; import io.druid.server.http.CoordinatorResource; @@ -182,6 +186,7 @@ public void configure(Binder binder) Jerseys.addResource(binder, MetadataResource.class); Jerseys.addResource(binder, IntervalsResource.class); Jerseys.addResource(binder, LookupCoordinatorResource.class); + Jerseys.addResource(binder, ClusterResource.class); LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, DatasourcesResource.class); @@ -204,6 +209,14 @@ public void configure(Binder binder) Predicates.equalTo("true"), DruidCoordinatorSegmentKiller.class ); + + binder.bind(DiscoverySideEffectsProvider.Child.class).annotatedWith(Coordinator.class).toProvider( + new DiscoverySideEffectsProvider( + DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + ImmutableList.of() + ) + ).in(LazySingleton.class); + LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, Coordinator.class)); } @Provides diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index d23364b81b92..7f761049fef2 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -21,11 +21,15 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.name.Names; import io.airlift.airline.Command; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheMonitor; +import io.druid.discovery.DataNodeService; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.discovery.LookupNodeService; import io.druid.guice.CacheModule; import io.druid.guice.DruidProcessingModule; import io.druid.guice.Jerseys; @@ -103,6 +107,14 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class); binder.install(new CacheModule()); MetricsModule.register(binder, CacheMonitor.class); + + binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( + new DiscoverySideEffectsProvider( + DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + ImmutableList.of(DataNodeService.class, LookupNodeService.class) + ) + ).in(LazySingleton.class); + LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); } }, new LookupModule() diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index 764c7cb9e025..7be166cb40a5 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -21,12 +21,14 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.name.Names; import com.google.inject.util.Providers; - import io.airlift.airline.Command; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.discovery.WorkerNodeService; import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.IndexingServiceTaskLogsModule; @@ -98,6 +100,14 @@ public void configure(Binder binder) Jerseys.addResource(binder, WorkerResource.class); LifecycleModule.register(binder, Server.class); + + binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( + new DiscoverySideEffectsProvider( + DruidNodeDiscoveryProvider.NODE_TYPE_MM, + ImmutableList.of(WorkerNodeService.class) + ) + ).in(LazySingleton.class); + LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); } @Provides @@ -112,6 +122,17 @@ public Worker getWorker(@Self DruidNode node, WorkerConfig config) config.getVersion() ); } + + @Provides + @LazySingleton + public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) + { + return new WorkerNodeService( + workerConfig.getIp(), + workerConfig.getCapacity(), + workerConfig.getVersion() + ); + } }, new IndexingServiceFirehoseModule(), new IndexingServiceTaskLogsModule() diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 9ae4b5233309..c3db181df839 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -32,7 +32,9 @@ import com.google.inject.util.Providers; import io.airlift.airline.Command; import io.druid.audit.AuditManager; +import io.druid.client.indexing.IndexingService; import io.druid.client.indexing.IndexingServiceSelectorConfig; +import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.IndexingServiceModuleHelper; import io.druid.guice.IndexingServiceTaskLogsModule; @@ -182,6 +184,14 @@ public void configure(Binder binder) if (standalone) { LifecycleModule.register(binder, Server.class); } + + binder.bind(DiscoverySideEffectsProvider.Child.class).annotatedWith(IndexingService.class).toProvider( + new DiscoverySideEffectsProvider( + DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, + ImmutableList.of() + ) + ).in(LazySingleton.class); + LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, IndexingService.class)); } private void configureTaskStorage(Binder binder) diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index a84ac258b66b..aa28951c836d 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.TypeLiteral; @@ -29,6 +30,7 @@ import io.druid.curator.discovery.DiscoveryModule; import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; @@ -109,6 +111,14 @@ public void configure(Binder binder) LifecycleModule.register(binder, RouterResource.class); LifecycleModule.register(binder, Server.class); DiscoveryModule.register(binder, Self.class); + + binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( + new DiscoverySideEffectsProvider( + DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, + ImmutableList.of() + ) + ).in(LazySingleton.class); + LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); } @Provides diff --git a/services/src/main/java/io/druid/cli/ServerRunnable.java b/services/src/main/java/io/druid/cli/ServerRunnable.java index 5e2c63816241..a2f741b0f3f5 100644 --- a/services/src/main/java/io/druid/cli/ServerRunnable.java +++ b/services/src/main/java/io/druid/cli/ServerRunnable.java @@ -20,10 +20,19 @@ package io.druid.cli; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; import com.google.inject.Injector; - +import com.google.inject.Provider; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeAnnouncer; +import io.druid.discovery.DruidService; +import io.druid.guice.annotations.Self; import io.druid.java.util.common.lifecycle.Lifecycle; import io.druid.java.util.common.logger.Logger; +import io.druid.server.DruidNode; + +import java.util.List; /** */ @@ -47,4 +56,69 @@ public void run() throw Throwables.propagate(e); } } + + /** + * This is a helper class used by CliXXX classes to announce DiscoveryDruidNode + * as part of lifecycle Stage.LAST . + */ + protected static class DiscoverySideEffectsProvider implements Provider + { + public static class Child {} + + @Inject @Self + private DruidNode druidNode; + + @Inject + private DruidNodeAnnouncer announcer; + + @Inject + private Lifecycle lifecycle; + + @Inject + private Injector injector; + + private final String nodeType; + private final List> serviceClasses; + + public DiscoverySideEffectsProvider(String nodeType, List> serviceClasses) + { + this.nodeType = nodeType; + this.serviceClasses = serviceClasses; + } + + @Override + public Child get() + { + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + for (Class clazz : serviceClasses) { + DruidService service = injector.getInstance(clazz); + builder.put(service.getName(), service); + } + + DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode, + nodeType, + builder.build() + ); + + lifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() throws Exception + { + announcer.announce(discoveryDruidNode); + } + + @Override + public void stop() + { + announcer.unannounce(discoveryDruidNode); + } + }, + Lifecycle.Stage.LAST + ); + + return new Child(); + } + } }