diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java index 73a940f05ea4..fcfc2be4e50d 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java @@ -24,6 +24,7 @@ import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -57,13 +58,17 @@ public class CommonCacheNotifier { private static final EmittingLogger LOG = new EmittingLogger(CommonCacheNotifier.class); - private static final List NODE_TYPES = Arrays.asList( - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, - DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, - DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, - DruidNodeDiscoveryProvider.NODE_TYPE_MM + /** + * {@link NodeType#COORDINATOR} is intentionally omitted because it gets its information about the auth state directly + * from metadata storage. + */ + private static final List NODE_TYPES = Arrays.asList( + NodeType.BROKER, + NodeType.OVERLORD, + NodeType.HISTORICAL, + NodeType.PEON, + NodeType.ROUTER, + NodeType.MIDDLE_MANAGER ); private final DruidNodeDiscoveryProvider discoveryProvider; @@ -154,7 +159,7 @@ public void addUpdate(String updatedItemName, byte[] updatedItemData) private List> sendUpdate(String updatedAuthorizerPrefix, byte[] serializedUserMap) { List> futures = new ArrayList<>(); - for (String nodeType : NODE_TYPES) { + for (NodeType nodeType : NODE_TYPES) { DruidNodeDiscovery nodeDiscovery = discoveryProvider.getForNodeType(nodeType); Collection nodes = nodeDiscovery.getAllNodes(); for (DiscoveryDruidNode node : nodes) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index e815cfa30167..34eca06c626e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -40,8 +40,8 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.discovery.DiscoveryDruidNode; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; @@ -310,7 +310,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception new LookupNodeService(lookupTier); DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode( toolbox.getDruidNode(), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java index aa50e687d438..88dfe70ef7b5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java @@ -31,8 +31,8 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.discovery.DiscoveryDruidNode; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; @@ -255,7 +255,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception new LookupNodeService(lookupTier); DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode( toolbox.getDruidNode(), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index bc6af54f00ad..b7ab3a0ff77a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -37,8 +37,8 @@ import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputRow; import org.apache.druid.discovery.DiscoveryDruidNode; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator; @@ -672,7 +672,7 @@ private DiscoveryDruidNode createDiscoveryDruidNode(TaskToolbox toolbox) new LookupNodeService(getContextValue(CTX_KEY_LOOKUP_TIER)); return new DiscoveryDruidNode( toolbox.getDruidNode(), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java index 8fb63428b0ee..fda71775e7fe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java @@ -32,8 +32,8 @@ import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.discovery.DiscoveryDruidNode; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; @@ -364,7 +364,7 @@ public String getVersion(final Interval interval) new LookupNodeService((String) getContextValue(CTX_KEY_LOOKUP_TIER)); DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode( toolbox.getDruidNode(), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 2047d53633f6..b3c6f77985c3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -41,6 +41,7 @@ import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; @@ -445,7 +446,7 @@ private void taskComplete( private void startWorkersHandling() throws InterruptedException { final CountDownLatch workerViewInitialized = new CountDownLatch(1); - DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM); + DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER); druidNodeDiscovery.registerListener( new DruidNodeDiscovery.Listener() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 3f7b5c3c6f25..86194f13dc65 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -33,6 +33,7 @@ import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; @@ -84,7 +85,7 @@ public void testFreshStart() throws Exception { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); @@ -129,7 +130,7 @@ protected WorkerHolder createWorkerHolder( DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode( new DruidNode("service", "host1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") ) @@ -137,7 +138,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode( new DruidNode("service", "host2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0") ) @@ -172,7 +173,7 @@ public void testOneStuckTaskAssignmentDoesntBlockOthers() throws Exception { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); @@ -221,7 +222,7 @@ protected WorkerHolder createWorkerHolder( DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode( new DruidNode("service", "host1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") ) @@ -229,7 +230,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode( new DruidNode("service", "host2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0") ) @@ -259,7 +260,7 @@ public void testTaskRunnerRestart() throws Exception { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); @@ -324,7 +325,7 @@ protected WorkerHolder createWorkerHolder( DiscoveryDruidNode druidNode = new DiscoveryDruidNode( new DruidNode("service", "host", 1234, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") ) @@ -404,7 +405,7 @@ public void testWorkerDisapperAndReappearBeforeItsCleanup() throws Exception { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); @@ -458,7 +459,7 @@ protected WorkerHolder createWorkerHolder( DiscoveryDruidNode druidNode = new DiscoveryDruidNode( new DruidNode("service", "host", 1234, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") ) @@ -567,7 +568,7 @@ public void testWorkerDisapperAndReappearAfterItsCleanup() throws Exception { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); @@ -621,7 +622,7 @@ protected WorkerHolder createWorkerHolder( DiscoveryDruidNode druidNode = new DiscoveryDruidNode( new DruidNode("service", "host", 1234, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0") ) @@ -730,7 +731,7 @@ public void testMarkWorkersLazy() throws Exception { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); @@ -788,7 +789,7 @@ protected WorkerHolder createWorkerHolder( DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode( new DruidNode("service", "host1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0") ) @@ -832,7 +833,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0") DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode( new DruidNode("service", "host2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0") ) @@ -865,7 +866,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0") DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode( new DruidNode("service", "host3", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_MM, + NodeType.MIDDLE_MANAGER, ImmutableMap.of( WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0") ) @@ -1159,7 +1160,7 @@ private HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated( { TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java index 70102fd67dcf..eaa5a36e0fa6 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java @@ -42,11 +42,7 @@ public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer private final ObjectMapper jsonMapper; @Inject - public CuratorDruidNodeAnnouncer( - Announcer announcer, - ZkPathsConfig config, - @Json ObjectMapper jsonMapper - ) + public CuratorDruidNodeAnnouncer(Announcer announcer, ZkPathsConfig config, @Json ObjectMapper jsonMapper) { this.announcer = announcer; this.config = config; @@ -59,14 +55,12 @@ public void announce(DiscoveryDruidNode discoveryDruidNode) try { log.info("Announcing [%s].", discoveryDruidNode); - announcer.announce( - ZKPaths.makePath( - config.getInternalDiscoveryPath(), - discoveryDruidNode.getNodeType(), - discoveryDruidNode.getDruidNode().getHostAndPortToUse() - ), - jsonMapper.writeValueAsBytes(discoveryDruidNode) + String path = ZKPaths.makePath( + config.getInternalDiscoveryPath(), + discoveryDruidNode.getNodeType().toString(), + discoveryDruidNode.getDruidNode().getHostAndPortToUse() ); + announcer.announce(path, jsonMapper.writeValueAsBytes(discoveryDruidNode)); log.info("Announced [%s].", discoveryDruidNode); } @@ -80,13 +74,12 @@ public void unannounce(DiscoveryDruidNode discoveryDruidNode) { log.info("Unannouncing [%s].", discoveryDruidNode); - announcer.unannounce( - ZKPaths.makePath( - config.getInternalDiscoveryPath(), - discoveryDruidNode.getNodeType(), - discoveryDruidNode.getDruidNode().getHostAndPortToUse() - ) + String path = ZKPaths.makePath( + config.getInternalDiscoveryPath(), + discoveryDruidNode.getNodeType().toString(), + discoveryDruidNode.getDruidNode().getHostAndPortToUse() ); + announcer.unannounce(path); log.info("Unannounced [%s].", discoveryDruidNode); } diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java index 212d27d9f995..46b8ce912dfb 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java @@ -32,6 +32,7 @@ import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.ISE; @@ -65,7 +66,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide private ExecutorService listenerExecutor; - private final Map nodeTypeWatchers = new ConcurrentHashMap<>(); + private final Map nodeTypeWatchers = new ConcurrentHashMap<>(); private final LifecycleLock lifecycleLock = new LifecycleLock(); @@ -82,27 +83,23 @@ public CuratorDruidNodeDiscoveryProvider( } @Override - public DruidNodeDiscovery getForNodeType(String nodeType) + public DruidNodeDiscovery getForNodeType(NodeType nodeType) { Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); - return nodeTypeWatchers.compute( + return nodeTypeWatchers.computeIfAbsent( nodeType, - (k, v) -> { - if (v != null) { - return v; - } - - log.info("Creating NodeTypeWatcher for nodeType [%s].", nodeType); + nType -> { + log.info("Creating NodeTypeWatcher for nodeType [%s].", nType); NodeTypeWatcher nodeTypeWatcher = new NodeTypeWatcher( listenerExecutor, curatorFramework, config.getInternalDiscoveryPath(), jsonMapper, - nodeType + nType ); nodeTypeWatcher.start(); - log.info("Created NodeTypeWatcher for nodeType [%s].", nodeType); + log.info("Created NodeTypeWatcher for nodeType [%s].", nType); return nodeTypeWatcher; } ); @@ -154,7 +151,7 @@ private static class NodeTypeWatcher implements DruidNodeDiscovery private final CuratorFramework curatorFramework; - private final String nodeType; + private final NodeType nodeType; private final ObjectMapper jsonMapper; // hostAndPort -> DiscoveryDruidNode @@ -165,7 +162,7 @@ private static class NodeTypeWatcher implements DruidNodeDiscovery private final ExecutorService listenerExecutor; - private final List nodeListeners = new ArrayList(); + private final List nodeListeners = new ArrayList<>(); private final Object lock = new Object(); @@ -176,7 +173,7 @@ private static class NodeTypeWatcher implements DruidNodeDiscovery CuratorFramework curatorFramework, String basePath, ObjectMapper jsonMapper, - String nodeType + NodeType nodeType ) { this.listenerExecutor = listenerExecutor; @@ -188,7 +185,7 @@ private static class NodeTypeWatcher implements DruidNodeDiscovery this.cacheExecutor = Execs.singleThreaded(StringUtils.format("NodeTypeWatcher[%s]", nodeType)); this.cache = new PathChildrenCache( curatorFramework, - ZKPaths.makePath(basePath, nodeType), + ZKPaths.makePath(basePath, nodeType.toString()), true, true, cacheExecutor @@ -241,10 +238,7 @@ public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent eve return; } - DiscoveryDruidNode druidNode = jsonMapper.readValue( - data, - DiscoveryDruidNode.class - ); + DiscoveryDruidNode druidNode = jsonMapper.readValue(data, DiscoveryDruidNode.class); if (!nodeType.equals(druidNode.getNodeType())) { log.warn( @@ -255,11 +249,7 @@ public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent eve return; } - log.info( - "Node[%s:%s] appeared.", - druidNode.getDruidNode().getHostAndPortToUse(), - druidNode - ); + log.info("Node[%s:%s] appeared.", druidNode.getDruidNode().getHostAndPortToUse(), druidNode); addNode(druidNode); @@ -330,10 +320,7 @@ private boolean isCacheInitialized(long waitFor, TimeUnit timeUnit) } } - private void safeSchedule( - Runnable runnable, - String errMsgFormat, Object... args - ) + private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args) { listenerExecutor.submit(() -> { try { diff --git a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java index fbb124afd2b2..dd843397b1f8 100644 --- a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java +++ b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java @@ -36,7 +36,7 @@ public class DiscoveryDruidNode { private final DruidNode druidNode; - private final String nodeType; + private final NodeType nodeType; // Other metadata associated with the node e.g. // if its a historical node then lookup information, segment loading capacity etc. @@ -45,7 +45,7 @@ public class DiscoveryDruidNode @JsonCreator public DiscoveryDruidNode( @JsonProperty("druidNode") DruidNode druidNode, - @JsonProperty("nodeType") String nodeType, + @JsonProperty("nodeType") NodeType nodeType, @JsonProperty("services") Map services ) { @@ -64,7 +64,7 @@ public Map getServices() } @JsonProperty - public String getNodeType() + public NodeType getNodeType() { return nodeType; } diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java index 406703e4b987..23c5ffe11a9d 100644 --- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java +++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java @@ -66,7 +66,7 @@ public class DruidLeaderClient private final HttpClient httpClient; private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; - private final String nodeTypeToWatch; + private final NodeType nodeTypeToWatch; private final String leaderRequestPath; @@ -80,7 +80,7 @@ public class DruidLeaderClient public DruidLeaderClient( HttpClient httpClient, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, - String nodeTypeToWatch, + NodeType nodeTypeToWatch, String leaderRequestPath, ServerDiscoverySelector serverDiscoverySelector ) diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java index dcd30ccb5e69..9ae34a3c19a7 100644 --- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java +++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java @@ -35,63 +35,40 @@ import java.util.concurrent.ConcurrentHashMap; /** - * Provider of DruidNodeDiscovery instances. + * Provider of {@link DruidNodeDiscovery} instances. */ public abstract class DruidNodeDiscoveryProvider { - private static final Logger log = new Logger(DruidNodeDiscoveryProvider.class); - - public static final String NODE_TYPE_COORDINATOR = "coordinator"; - public static final String NODE_TYPE_HISTORICAL = "historical"; - public static final String NODE_TYPE_BROKER = "broker"; - public static final String NODE_TYPE_OVERLORD = "overlord"; - public static final String NODE_TYPE_PEON = "peon"; - public static final String NODE_TYPE_ROUTER = "router"; - public static final String NODE_TYPE_MM = "middleManager"; - - public static final Set ALL_NODE_TYPES = ImmutableSet.of( - NODE_TYPE_COORDINATOR, - NODE_TYPE_HISTORICAL, - NODE_TYPE_BROKER, - NODE_TYPE_OVERLORD, - NODE_TYPE_PEON, - NODE_TYPE_ROUTER, - NODE_TYPE_MM + private static final Map> SERVICE_TO_NODE_TYPES = ImmutableMap.of( + LookupNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.BROKER, NodeType.HISTORICAL, NodeType.PEON), + DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.HISTORICAL, NodeType.PEON), + WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.PEON) ); - private static final Map> SERVICE_TO_NODE_TYPES = ImmutableMap.of( - LookupNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_BROKER, NODE_TYPE_HISTORICAL, NODE_TYPE_PEON), - DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_HISTORICAL, NODE_TYPE_PEON), - WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_MM) - ); - - private final ConcurrentHashMap serviceDiscoveryMap = new ConcurrentHashMap<>( - SERVICE_TO_NODE_TYPES.size()); + private final ConcurrentHashMap serviceDiscoveryMap = + new ConcurrentHashMap<>(SERVICE_TO_NODE_TYPES.size()); /** * Get DruidNodeDiscovery instance to discover nodes of given nodeType. */ - public abstract DruidNodeDiscovery getForNodeType(String nodeType); + public abstract DruidNodeDiscovery getForNodeType(NodeType nodeType); /** * Get DruidNodeDiscovery instance to discover nodes that announce given service in its metadata. */ public DruidNodeDiscovery getForService(String serviceName) { - return serviceDiscoveryMap.compute( + return serviceDiscoveryMap.computeIfAbsent( serviceName, - (k, v) -> { - if (v != null) { - return v; - } + service -> { - Set nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName); + Set nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(service); if (nodeTypesToWatch == null) { - throw new IAE("Unknown service [%s].", serviceName); + throw new IAE("Unknown service [%s].", service); } - ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(serviceName); - for (String nodeType : nodeTypesToWatch) { + ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(service); + for (NodeType nodeType : nodeTypesToWatch) { getForNodeType(nodeType).registerListener(serviceDiscovery.nodeTypeListener()); } return serviceDiscovery; diff --git a/server/src/main/java/org/apache/druid/discovery/NodeType.java b/server/src/main/java/org/apache/druid/discovery/NodeType.java new file mode 100644 index 000000000000..841656bf1fc9 --- /dev/null +++ b/server/src/main/java/org/apache/druid/discovery/NodeType.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.discovery; + +import com.fasterxml.jackson.annotation.JsonValue; + +/** + * + * This is a historical occasion that this enum is different from {@link + * org.apache.druid.server.coordination.ServerType} because they are essentially the same abstraction, but merging them + * could only increase the complexity and drop the code safety, because they name the same types differently ("peon" - + * "indexer-executor" and "middleManager" - "realtime") and both expose them via JSON APIs. + */ +public enum NodeType +{ + COORDINATOR("coordinator"), + HISTORICAL("historical"), + BROKER("broker"), + OVERLORD("overlord"), + PEON("peon"), + ROUTER("router"), + MIDDLE_MANAGER("middleManager"); + + private final String jsonName; + + NodeType(String jsonName) + { + this.jsonName = jsonName; + } + + /** + * Lowercase for backward compatibility, as a part of the {@link DiscoveryDruidNode}'s JSON format. + * + * Don't need to define {@link com.fasterxml.jackson.annotation.JsonCreator} because for enum types {@link JsonValue} + * serves for both serialization and deserialization, see the Javadoc comment of {@link JsonValue}. + */ + @JsonValue + public String getJsonName() + { + return jsonName; + } +} diff --git a/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java index 3440b545e266..d4ac2c454a04 100644 --- a/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java +++ b/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java @@ -28,6 +28,7 @@ import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.java.util.http.client.HttpClient; @@ -64,7 +65,7 @@ public DruidLeaderClient getLeaderHttpClient( return new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + NodeType.COORDINATOR, "/druid/coordinator/v1/leader", serverDiscoverySelector ); diff --git a/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java index 4348b5b80270..05b76ed2294f 100644 --- a/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java +++ b/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java @@ -28,6 +28,7 @@ import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.java.util.http.client.HttpClient; @@ -64,7 +65,7 @@ public DruidLeaderClient getLeaderHttpClient( return new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, + NodeType.OVERLORD, "/druid/indexer/v1/leader", serverDiscoverySelector ); diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java index 9e5a46e642c2..240cee64904e 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java @@ -26,7 +26,8 @@ /** * This enum represents types of druid services that hold segments. *

- * These types are externally visible (e.g., from the output of /druid/coordinator/v1/servers). + * These types are externally visible (e.g., from the output of {@link + * org.apache.druid.server.http.ServersResource#makeSimpleServer}). *

* For backwards compatibility, when presenting these types externally, the toString() representation * of the enum should be used. @@ -34,6 +35,11 @@ * The toString() method converts the enum name() to lowercase and replaces underscores with hyphens, * which is the format expected for the server type string prior to the patch that introduced ServerType: * https://github.com/apache/incubator-druid/pull/4148 + * + * This is a historical occasion that this enum is different from {@link org.apache.druid.discovery.NodeType} because + * they are essentially the same abstraction, but merging them could only increase the complexity and drop the code + * safety, because they name the same types differently ("indexer-executor" - "peon" and "realtime" - "middleManager") + * and both expose them via JSON APIs. */ public enum ServerType { diff --git a/server/src/main/java/org/apache/druid/server/http/ClusterResource.java b/server/src/main/java/org/apache/druid/server/http/ClusterResource.java index 5fff19e4a10a..0bc871386c1e 100644 --- a/server/src/main/java/org/apache/druid/server/http/ClusterResource.java +++ b/server/src/main/java/org/apache/druid/server/http/ClusterResource.java @@ -28,8 +28,8 @@ import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.LazySingleton; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.DruidNode; import org.apache.druid.server.http.security.StateResourceFilter; @@ -40,6 +40,7 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.util.Arrays; import java.util.Collection; /** @@ -59,33 +60,23 @@ public ClusterResource(DruidNodeDiscoveryProvider discoveryProvider) @GET @Produces(MediaType.APPLICATION_JSON) - public Response getClusterServers( - @QueryParam("full") boolean full - ) + public Response getClusterServers(@QueryParam("full") boolean full) { - ImmutableMap.Builder entityBuilder = new ImmutableMap.Builder<>(); - - entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, - getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, full) - ); - entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, - getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, full) - ); - entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, - getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, full) - ); - entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, - getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, full) - ); - - Collection mmNodes = getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_MM, full); + ImmutableMap.Builder entityBuilder = new ImmutableMap.Builder<>(); + + entityBuilder.put(NodeType.COORDINATOR, getNodes(NodeType.COORDINATOR, full)); + entityBuilder.put(NodeType.OVERLORD, getNodes(NodeType.OVERLORD, full)); + entityBuilder.put(NodeType.BROKER, getNodes(NodeType.BROKER, full)); + entityBuilder.put(NodeType.HISTORICAL, getNodes(NodeType.HISTORICAL, full)); + + Collection mmNodes = getNodes(NodeType.MIDDLE_MANAGER, full); if (!mmNodes.isEmpty()) { - entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_MM, mmNodes); + entityBuilder.put(NodeType.MIDDLE_MANAGER, mmNodes); } - Collection routerNodes = getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, full); + Collection routerNodes = getNodes(NodeType.ROUTER, full); if (!routerNodes.isEmpty()) { - entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, routerNodes); + entityBuilder.put(NodeType.ROUTER, routerNodes); } return Response.status(Response.Status.OK).entity(entityBuilder.build()).build(); @@ -94,28 +85,19 @@ public Response getClusterServers( @GET @Produces({MediaType.APPLICATION_JSON}) @Path("/{nodeType}") - public Response getClusterServers( - @PathParam("nodeType") String nodeType, - @QueryParam("full") boolean full - ) + public Response getClusterServers(@PathParam("nodeType") NodeType nodeType, @QueryParam("full") boolean full) { - if (nodeType == null || !DruidNodeDiscoveryProvider.ALL_NODE_TYPES.contains(nodeType)) { + if (nodeType == null) { return Response.serverError() .status(Response.Status.BAD_REQUEST) - .entity(StringUtils.format( - "Invalid nodeType [%s]. Valid node types are %s .", - nodeType, - DruidNodeDiscoveryProvider.ALL_NODE_TYPES - )) + .entity("Invalid nodeType of null. Valid node types are " + Arrays.toString(NodeType.values())) .build(); } else { - return Response.status(Response.Status.OK).entity( - getNodes(nodeType, full) - ).build(); + return Response.status(Response.Status.OK).entity(getNodes(nodeType, full)).build(); } } - private Collection getNodes(String nodeType, boolean full) + private Collection getNodes(NodeType nodeType, boolean full) { Collection discoveryDruidNodes = druidNodeDiscoveryProvider.getForNodeType(nodeType) .getAllNodes(); diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java index 4b224c89ee7d..fcc3303e7c1c 100644 --- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java +++ b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java @@ -29,6 +29,7 @@ import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -125,7 +126,7 @@ public void start() servers.put(entry.getValue(), new NodesHolder()); } - DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER); + DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.BROKER); druidNodeDiscovery.registerListener( new DruidNodeDiscovery.Listener() { diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java index 96bc3893b011..806815767754 100644 --- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java @@ -29,6 +29,7 @@ import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.http.client.HttpClient; @@ -62,7 +63,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; - /** */ public class HttpServerInventoryViewTest @@ -168,7 +168,7 @@ public void testSimple() throws Exception DiscoveryDruidNode druidNode = new DiscoveryDruidNode( new DruidNode("service", "host", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + NodeType.HISTORICAL, ImmutableMap.of( DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0) ) diff --git a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java index a8be9337b6f7..daac842dd575 100644 --- a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java +++ b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java @@ -28,7 +28,7 @@ import org.apache.druid.curator.announcement.Announcer; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.ServerConfig; @@ -81,25 +81,25 @@ public void testAnnouncementAndDiscovery() throws Exception DiscoveryDruidNode node1 = new DiscoveryDruidNode( new DruidNode("s1", "h1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + NodeType.COORDINATOR, ImmutableMap.of() ); DiscoveryDruidNode node2 = new DiscoveryDruidNode( new DruidNode("s2", "h2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + NodeType.COORDINATOR, ImmutableMap.of() ); DiscoveryDruidNode node3 = new DiscoveryDruidNode( new DruidNode("s3", "h3", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, + NodeType.OVERLORD, ImmutableMap.of() ); DiscoveryDruidNode node4 = new DiscoveryDruidNode( new DruidNode("s4", "h4", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, + NodeType.OVERLORD, ImmutableMap.of() ); @@ -113,8 +113,8 @@ public void testAnnouncementAndDiscovery() throws Exception ); druidNodeDiscoveryProvider.start(); - DruidNodeDiscovery coordDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR); - DruidNodeDiscovery overlordDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD); + DruidNodeDiscovery coordDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.COORDINATOR); + DruidNodeDiscovery overlordDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.OVERLORD); while (!checkNodes(ImmutableSet.of(node1), coordDiscovery.getAllNodes())) { Thread.sleep(100); diff --git a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java index 60bd5662e6d4..b9f00c07b089 100644 --- a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java +++ b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java @@ -64,6 +64,7 @@ import javax.ws.rs.core.Response; import java.io.IOException; import java.net.URI; +import java.nio.charset.StandardCharsets; /** */ @@ -79,7 +80,7 @@ public class DruidLeaderClientTest extends BaseJettyTest protected Injector setupInjector() { final DruidNode node = new DruidNode("test", "localhost", null, null, true, false); - discoveryDruidNode = new DiscoveryDruidNode(node, "test", ImmutableMap.of()); + discoveryDruidNode = new DiscoveryDruidNode(node, NodeType.PEON, ImmutableMap.of()); Injector injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), ImmutableList.of( @@ -114,21 +115,21 @@ public void testSimple() throws Exception ); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - "nodetype", + NodeType.PEON, "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); druidLeaderClient.start(); Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/direct"); - request.setContent("hello".getBytes("UTF-8")); + request.setContent("hello".getBytes(StandardCharsets.UTF_8)); Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); } @@ -139,14 +140,14 @@ public void testNoLeaderFound() throws Exception EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of()); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - "nodetype", + NodeType.PEON, "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); @@ -166,21 +167,21 @@ public void testRedirection() throws Exception ); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - "nodetype", + NodeType.PEON, "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); druidLeaderClient.start(); Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect"); - request.setContent("hello".getBytes("UTF-8")); + request.setContent("hello".getBytes(StandardCharsets.UTF_8)); Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); } @@ -191,33 +192,30 @@ public void testServerFailureAndRedirect() throws Exception EasyMock.expect(serverDiscoverySelector.pick()).andReturn(null).anyTimes(); DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); - EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( - ImmutableList.of(new DiscoveryDruidNode( - new DruidNode("test", "dummyhost", 64231, null, true, false), - "test", - ImmutableMap.of() - )) - ); - EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn( - ImmutableList.of(discoveryDruidNode) + DiscoveryDruidNode dummyNode = new DiscoveryDruidNode( + new DruidNode("test", "dummyhost", 64231, null, true, false), + NodeType.PEON, + ImmutableMap.of() ); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(dummyNode)); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(discoveryDruidNode)); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery).anyTimes(); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery).anyTimes(); EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - "nodetype", + NodeType.PEON, "/simple/leader", serverDiscoverySelector ); druidLeaderClient.start(); Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect"); - request.setContent("hello".getBytes("UTF-8")); + request.setContent("hello".getBytes(StandardCharsets.UTF_8)); Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); } @@ -230,14 +228,14 @@ public void testFindCurrentLeader() ); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); DruidLeaderClient druidLeaderClient = new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - "nodetype", + NodeType.PEON, "/simple/leader", EasyMock.createNiceMock(ServerDiscoverySelector.class) ); diff --git a/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java b/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java index abce2b802aef..5c13dacc1efa 100644 --- a/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java +++ b/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java @@ -87,7 +87,7 @@ public void nodesRemoved(List nodes) DiscoveryDruidNode node1 = new DiscoveryDruidNode( new DruidNode("s1", "h1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + NodeType.HISTORICAL, ImmutableMap.of( DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0), LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) @@ -95,21 +95,21 @@ LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) DiscoveryDruidNode node2 = new DiscoveryDruidNode( new DruidNode("s2", "h2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + NodeType.HISTORICAL, ImmutableMap.of( DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)) ); DiscoveryDruidNode node3 = new DiscoveryDruidNode( new DruidNode("s3", "h3", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + NodeType.HISTORICAL, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) ); DiscoveryDruidNode node4 = new DiscoveryDruidNode( new DruidNode("s4", "h4", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0), LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) @@ -117,35 +117,35 @@ LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) DiscoveryDruidNode node5 = new DiscoveryDruidNode( new DruidNode("s5", "h5", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)) ); DiscoveryDruidNode node6 = new DiscoveryDruidNode( new DruidNode("s6", "h6", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) ); DiscoveryDruidNode node7 = new DiscoveryDruidNode( new DruidNode("s7", "h7", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + NodeType.BROKER, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) ); DiscoveryDruidNode node7Clone = new DiscoveryDruidNode( new DruidNode("s7", "h7", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + NodeType.BROKER, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier")) ); DiscoveryDruidNode node8 = new DiscoveryDruidNode( new DruidNode("s8", "h8", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + NodeType.COORDINATOR, ImmutableMap.of() ); @@ -183,7 +183,7 @@ private static class TestDruidNodeDiscoveryProvider extends DruidNodeDiscoveryPr private List listeners = new ArrayList<>(); @Override - public DruidNodeDiscovery getForNodeType(String nodeType) + public DruidNodeDiscovery getForNodeType(NodeType nodeType) { return new DruidNodeDiscovery() { diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupNodeDiscoveryTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupNodeDiscoveryTest.java index d69e676601c6..144792756353 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupNodeDiscoveryTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupNodeDiscoveryTest.java @@ -26,6 +26,7 @@ import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.server.DruidNode; import org.apache.druid.server.http.HostAndPortWithScheme; import org.easymock.EasyMock; @@ -53,21 +54,21 @@ public void setup() DiscoveryDruidNode node1 = new DiscoveryDruidNode( new DruidNode("s1", "h1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + NodeType.HISTORICAL, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier1")) ); DiscoveryDruidNode node2 = new DiscoveryDruidNode( new DruidNode("s2", "h2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier1")) ); DiscoveryDruidNode node3 = new DiscoveryDruidNode( new DruidNode("s3", "h3", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + NodeType.PEON, ImmutableMap.of( LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier2")) ); diff --git a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java index aa3d2f8c8cfa..8d2a3947ac75 100644 --- a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java @@ -32,6 +32,7 @@ import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; @@ -74,19 +75,19 @@ public void setUp() node1 = new DiscoveryDruidNode( new DruidNode("hotBroker", "hotHost", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + NodeType.BROKER, ImmutableMap.of() ); node2 = new DiscoveryDruidNode( new DruidNode("coldBroker", "coldHost1", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + NodeType.BROKER, ImmutableMap.of() ); node3 = new DiscoveryDruidNode( new DruidNode("coldBroker", "coldHost2", 8080, null, true, false), - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + NodeType.BROKER, ImmutableMap.of() ); @@ -105,7 +106,7 @@ public void registerListener(Listener listener) } }; - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER)) + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.BROKER)) .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index a1fa6cda85f4..009a6fedbc54 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -34,8 +34,8 @@ import org.apache.druid.client.selector.CustomTierSelectorStrategyConfig; import org.apache.druid.client.selector.ServerSelectorStrategy; import org.apache.druid.client.selector.TierSelectorStrategy; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.CacheModule; import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.Jerseys; @@ -125,12 +125,10 @@ protected List getModules() LifecycleModule.register(binder, Server.class); - binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, - ImmutableList.of(LookupNodeService.class) - ) - ).in(LazySingleton.class); + binder + .bind(DiscoverySideEffectsProvider.Child.class) + .toProvider(new DiscoverySideEffectsProvider(NodeType.BROKER, ImmutableList.of(LookupNodeService.class))) + .in(LazySingleton.class); LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); }, new LookupModule(), diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 902684261279..5e4cd062e7f9 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -36,7 +36,7 @@ import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.HttpIndexingServiceClient; import org.apache.druid.client.indexing.IndexingServiceClient; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.ConditionalMultibind; import org.apache.druid.guice.ConfigProvider; import org.apache.druid.guice.Jerseys; @@ -217,12 +217,11 @@ public void configure(Binder binder) DruidCoordinatorCleanupPendingSegments.class ); - binder.bind(DiscoverySideEffectsProvider.Child.class).annotatedWith(Coordinator.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, - ImmutableList.of() - ) - ).in(LazySingleton.class); + binder + .bind(DiscoverySideEffectsProvider.Child.class) + .annotatedWith(Coordinator.class) + .toProvider(new DiscoverySideEffectsProvider(NodeType.COORDINATOR, ImmutableList.of())) + .in(LazySingleton.class); LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, Coordinator.class)); } diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java index fafeadb6ace0..7d3c7e8767b3 100644 --- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java +++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java @@ -27,8 +27,8 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CacheMonitor; import org.apache.druid.discovery.DataNodeService; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.LookupNodeService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.CacheModule; import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.Jerseys; @@ -103,12 +103,15 @@ protected List getModules() binder.install(new CacheModule()); MetricsModule.register(binder, CacheMonitor.class); - binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, - ImmutableList.of(DataNodeService.class, LookupNodeService.class) + binder + .bind(DiscoverySideEffectsProvider.Child.class) + .toProvider( + new DiscoverySideEffectsProvider( + NodeType.HISTORICAL, + ImmutableList.of(DataNodeService.class, LookupNodeService.class) + ) ) - ).in(LazySingleton.class); + .in(LazySingleton.class); LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); }, new LookupModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index e2c31af7200b..8fa80e57ff97 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -30,7 +30,7 @@ import com.google.inject.util.Providers; import io.airlift.airline.Command; import org.apache.druid.client.indexing.IndexingServiceClient; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.guice.IndexingServiceFirehoseModule; import org.apache.druid.guice.IndexingServiceModuleHelper; @@ -129,12 +129,12 @@ public void configure(Binder binder) LifecycleModule.register(binder, Server.class); - binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_MM, - ImmutableList.of(WorkerNodeService.class) + binder + .bind(DiscoverySideEffectsProvider.Child.class) + .toProvider( + new DiscoverySideEffectsProvider(NodeType.MIDDLE_MANAGER, ImmutableList.of(WorkerNodeService.class)) ) - ).in(LazySingleton.class); + .in(LazySingleton.class); LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); } diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 000ef53de9ae..45299a83d770 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -38,7 +38,7 @@ import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.IndexingServiceSelectorConfig; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.IndexingServiceFirehoseModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; @@ -234,12 +234,11 @@ public void configure(Binder binder) LifecycleModule.register(binder, Server.class); } - binder.bind(DiscoverySideEffectsProvider.Child.class).annotatedWith(IndexingService.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, - ImmutableList.of() - ) - ).in(LazySingleton.class); + binder + .bind(DiscoverySideEffectsProvider.Child.class) + .annotatedWith(IndexingService.class) + .toProvider(new DiscoverySideEffectsProvider(NodeType.OVERLORD, ImmutableList.of())) + .in(LazySingleton.class); LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, IndexingService.class)); } diff --git a/services/src/main/java/org/apache/druid/cli/CliRouter.java b/services/src/main/java/org/apache/druid/cli/CliRouter.java index 88a77a346623..8c0d83e4d938 100644 --- a/services/src/main/java/org/apache/druid/cli/CliRouter.java +++ b/services/src/main/java/org/apache/druid/cli/CliRouter.java @@ -32,6 +32,7 @@ import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; @@ -120,12 +121,10 @@ public void configure(Binder binder) LifecycleModule.register(binder, Server.class); DiscoveryModule.register(binder, Self.class); - binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, - ImmutableList.of() - ) - ).in(LazySingleton.class); + binder + .bind(DiscoverySideEffectsProvider.Child.class) + .toProvider(new DiscoverySideEffectsProvider(NodeType.ROUTER, ImmutableList.of())) + .in(LazySingleton.class); LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); } @@ -134,7 +133,6 @@ public void configure(Binder binder) public ServerDiscoverySelector getCoordinatorServerDiscoverySelector( TieredBrokerConfig config, ServerDiscoveryFactory factory - ) { return factory.createSelector(config.getCoordinatorServiceName()); @@ -151,7 +149,7 @@ public DruidLeaderClient getLeaderHttpClient( return new DruidLeaderClient( httpClient, druidNodeDiscoveryProvider, - DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, + NodeType.COORDINATOR, "/druid/coordinator/v1/leader", serverDiscoverySelector ); diff --git a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java index bc9c68bbaccd..7f60f1f15d2c 100644 --- a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java +++ b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java @@ -27,6 +27,7 @@ import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.DruidService; +import org.apache.druid.discovery.NodeType; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; @@ -58,8 +59,8 @@ public void run() } /** - * This is a helper class used by CliXXX classes to announce DiscoveryDruidNode - * as part of lifecycle Stage.LAST . + * This is a helper class used by CliXXX classes to announce {@link DiscoveryDruidNode} + * as part of {@link Lifecycle.Stage#LAST}. */ protected static class DiscoverySideEffectsProvider implements Provider { @@ -77,10 +78,10 @@ public static class Child {} @Inject private Injector injector; - private final String nodeType; + private final NodeType nodeType; private final List> serviceClasses; - public DiscoverySideEffectsProvider(String nodeType, List> serviceClasses) + public DiscoverySideEffectsProvider(NodeType nodeType, List> serviceClasses) { this.nodeType = nodeType; this.serviceClasses = serviceClasses; @@ -95,10 +96,7 @@ public Child get() builder.put(service.getName(), service); } - DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode, - nodeType, - builder.build() - ); + DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode, nodeType, builder.build()); lifecycle.addHandler( new Lifecycle.Handler()