From 155dbb1e0f9b616416475b2131ab9ddaedc13e78 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 22 Sep 2021 14:36:55 +0530 Subject: [PATCH 1/8] Add config `druid.broker.segment.watchRealtimeNodes` --- docs/configuration/index.md | 1 + .../client/BrokerSegmentWatcherConfig.java | 8 ++ .../apache/druid/client/BrokerServerView.java | 6 +- .../BrokerSegmentWatcherConfigTest.java | 4 +- .../druid/client/BrokerServerViewTest.java | 87 ++++++++++++++++++- 5 files changed, 100 insertions(+), 6 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 7e170f4573a8..2651fbe16040 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1795,6 +1795,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti |`druid.serverview.type`|batch or http|Segment discovery method to use. "http" enables discovering segments using HTTP instead of zookeeper.|batch| |`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none| |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| +|`druid.broker.segment.watchRealtimeNodes`|Boolean|Broker watches segment announcements from processes serving segments to build cache of which process is serving which segments. This configuration allows the exclusion of segments being served by realtime processes. By default, Broker would consider both realtime and historical processes. This can be used to configure brokers in partitions so that they are only queryable for Historical tiers or both Historical tiers and realtime processes.|true| |`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true| ## Cache Configuration diff --git a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java index 0abb45673631..a192ecb9aaa0 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java +++ b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java @@ -33,6 +33,9 @@ public class BrokerSegmentWatcherConfig @JsonProperty private Set watchedDataSources = null; + @JsonProperty + private boolean watchRealtimeNodes = true; + @JsonProperty private boolean awaitInitializationOnStart = true; @@ -46,6 +49,11 @@ public Set getWatchedDataSources() return watchedDataSources; } + public boolean isWatchRealtimeNodes() + { + return watchRealtimeNodes; + } + public boolean isAwaitInitializationOnStart() { return awaitInitializationOnStart; diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 6d668839d7ae..4e24249ddba4 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -112,17 +112,21 @@ public BrokerServerView( this.timelines = new HashMap<>(); this.segmentFilter = (Pair metadataAndSegment) -> { + // Include only watched tiers if specified if (segmentWatcherConfig.getWatchedTiers() != null && !segmentWatcherConfig.getWatchedTiers().contains(metadataAndSegment.lhs.getTier())) { return false; } + // Include only watched datasources if specified if (segmentWatcherConfig.getWatchedDataSources() != null && !segmentWatcherConfig.getWatchedDataSources().contains(metadataAndSegment.rhs.getDataSource())) { return false; } - return true; + // Include realtime nodes only if they are watched + return segmentWatcherConfig.isWatchRealtimeNodes() + || metadataAndSegment.lhs.getType() != ServerType.REALTIME; }; ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s"); baseView.registerSegmentCallback( diff --git a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java index aa69b9c0d6f5..6fa2c167ebd3 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java @@ -45,9 +45,10 @@ public void testSerde() throws Exception ); Assert.assertNull(config.getWatchedTiers()); + Assert.assertTrue(config.isWatchRealtimeNodes()); //non-defaults - json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }"; + json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"], \"watchRealtimeNodes\": false }"; config = MAPPER.readValue( MAPPER.writeValueAsString( @@ -58,6 +59,7 @@ public void testSerde() throws Exception Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers()); Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); + Assert.assertFalse(config.isWatchRealtimeNodes()); } } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index d864d3206bf1..920a9d88ffb7 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -383,22 +383,90 @@ public void testMultipleTiers() throws Exception Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2)); } + @Test + public void testRealtimeNodesNotWatched() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(4); + segmentRemovedLatch = new CountDownLatch(0); + + // Setup a Broker that watches only Historicals + setupViews(null, false); + + // Historical has segments 2 and 3, Realtime has segments 1 and 2 + final DruidServer realtimeServer = setupDruidServer(ServerType.REALTIME, null, "realtime:1", 1); + final DruidServer historicalServer = setupHistoricalServer("tier1", "historical:2", 1); + + final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1"); + announceSegmentForServer(realtimeServer, segment1, zkPathsConfig, jsonMapper); + + final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1"); + announceSegmentForServer(realtimeServer, segment2, zkPathsConfig, jsonMapper); + announceSegmentForServer(historicalServer, segment2, zkPathsConfig, jsonMapper); + + final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1"); + announceSegmentForServer(historicalServer, segment3, zkPathsConfig, jsonMapper); + + // Wait for the segments to be added + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + // Get the timeline for the datasource + TimelineLookup timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource())) + ).get(); + + // Verify that the timeline has no entry for the interval of segment 1 + Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty()); + + // Verify that there is one entry for the interval of segment 2 + List> timelineHolders = + timeline.lookup(segment2.getInterval()); + Assert.assertEquals(1, timelineHolders.size()); + + TimelineObjectHolder timelineHolder = timelineHolders.get(0); + Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval()); + Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion()); + + PartitionHolder partitionHolder = timelineHolder.getObject(); + Assert.assertTrue(partitionHolder.isComplete()); + Assert.assertEquals(1, Iterables.size(partitionHolder)); + + ServerSelector selector = (partitionHolder.iterator().next()).getObject(); + Assert.assertFalse(selector.isEmpty()); + Assert.assertEquals(segment2, selector.getSegment()); + + // Verify that the ServerSelector always picks the Historical server + for (int i = 0; i < 5; ++i) { + Assert.assertEquals(historicalServer, selector.pick(null).getServer()); + } + Assert.assertEquals(Collections.singletonList(historicalServer.getMetadata()), selector.getCandidates(2)); + } + /** * Creates a DruidServer of type HISTORICAL and sets up a ZNode for it. */ private DruidServer setupHistoricalServer(String tier, String name, int priority) { - final DruidServer historical = new DruidServer( + return setupDruidServer(ServerType.HISTORICAL, tier, name, priority); + } + + /** + * Creates a DruidServer of the specified type and sets up a ZNode for it. + */ + private DruidServer setupDruidServer(ServerType serverType, String tier, String name, int priority) + { + final DruidServer druidServer = new DruidServer( name, name, null, 1000000, - ServerType.HISTORICAL, + serverType, tier, priority ); - setupZNodeForServer(historical, zkPathsConfig, jsonMapper); - return historical; + setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); + return druidServer; } private Pair>> createExpected( @@ -442,6 +510,11 @@ private void setupViews() throws Exception } private void setupViews(Set watchedTiers) throws Exception + { + setupViews(watchedTiers, true); + } + + private void setupViews(Set watchedTiers, boolean watchRealtimeNodes) throws Exception { baseView = new BatchServerInventoryView( zkPathsConfig, @@ -500,6 +573,12 @@ public Set getWatchedTiers() { return watchedTiers; } + + @Override + public boolean isWatchRealtimeNodes() + { + return watchRealtimeNodes; + } } ); From 880c9e7bcef6fb439c0df131bdde029d434740af Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sun, 3 Oct 2021 08:32:31 +0530 Subject: [PATCH 2/8] temp stuff --- .../java/org/apache/druid/client/BrokerServerView.java | 10 ++++++++++ .../main/java/org/apache/druid/client/DruidServer.java | 1 + .../src/main/java/org/apache/druid/cli/CliIndexer.java | 2 +- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 4e24249ddba4..d091ef0a4a2a 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -112,6 +112,14 @@ public BrokerServerView( this.timelines = new HashMap<>(); this.segmentFilter = (Pair metadataAndSegment) -> { + log.info( + "Kashif: Filtering segment [%s] for Server [%s:%s:%s]", + metadataAndSegment.rhs.getId(), + metadataAndSegment.lhs.getType(), + metadataAndSegment.lhs.getTier(), + metadataAndSegment.lhs.getHostAndPort() + ); + // Include only watched tiers if specified if (segmentWatcherConfig.getWatchedTiers() != null && !segmentWatcherConfig.getWatchedTiers().contains(metadataAndSegment.lhs.getTier())) { @@ -228,6 +236,7 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query // loop... if (!server.getType().equals(ServerType.BROKER)) { + log.info("Adding segment[%s] for server[%s:%s:%s]", segmentId, server.getType(), server.getTier(), server.getHostAndPort()); log.debug("Adding segment[%s] for server[%s]", segment, server); ServerSelector selector = selectors.get(segmentId); if (selector == null) { @@ -261,6 +270,7 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen final ServerSelector selector; synchronized (lock) { + log.info("Removing segment[%s] from server[%s:%s:%s]", segmentId, server.getType(), server.getTier(), server.getHostAndPort()); log.debug("Removing segment[%s] from server[%s].", segmentId, server); // we don't store broker segments here, but still run the callbacks for the segment being removed from the server diff --git a/server/src/main/java/org/apache/druid/client/DruidServer.java b/server/src/main/java/org/apache/druid/client/DruidServer.java index 6c52866d0586..22c4c5427489 100644 --- a/server/src/main/java/org/apache/druid/client/DruidServer.java +++ b/server/src/main/java/org/apache/druid/client/DruidServer.java @@ -50,6 +50,7 @@ public class DruidServer implements Comparable public static final int DEFAULT_PRIORITY = 0; public static final int DEFAULT_NUM_REPLICANTS = 2; public static final String DEFAULT_TIER = "_default_tier"; + public static final String REALTIME_TIER = "_realtime_tier"; private static final Logger log = new Logger(DruidServer.class); diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index c11d1a424333..8fd7b2a8a309 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -210,7 +210,7 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) public DataNodeService getDataNodeService(DruidServerConfig serverConfig) { return new DataNodeService( - DruidServer.DEFAULT_TIER, + DruidServer.REALTIME_TIER, serverConfig.getMaxSize(), ServerType.INDEXER_EXECUTOR, DruidServer.DEFAULT_PRIORITY From e609c19292fdba4ae68bbc34c9961171897789ac Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 21 Oct 2021 13:26:36 +0530 Subject: [PATCH 3/8] Rename field watchRealtimeNodes to watchRealtimeTasks --- .../org/apache/druid/client/BrokerSegmentWatcherConfig.java | 6 +++--- .../main/java/org/apache/druid/client/BrokerServerView.java | 4 ++-- .../apache/druid/client/BrokerSegmentWatcherConfigTest.java | 6 +++--- .../java/org/apache/druid/client/BrokerServerViewTest.java | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java index a192ecb9aaa0..cce64759c1f0 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java +++ b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java @@ -34,7 +34,7 @@ public class BrokerSegmentWatcherConfig private Set watchedDataSources = null; @JsonProperty - private boolean watchRealtimeNodes = true; + private boolean watchRealtimeTasks = true; @JsonProperty private boolean awaitInitializationOnStart = true; @@ -49,9 +49,9 @@ public Set getWatchedDataSources() return watchedDataSources; } - public boolean isWatchRealtimeNodes() + public boolean isWatchRealtimeTasks() { - return watchRealtimeNodes; + return watchRealtimeTasks; } public boolean isAwaitInitializationOnStart() diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index d091ef0a4a2a..d203ee6eb5c0 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -133,8 +133,8 @@ public BrokerServerView( } // Include realtime nodes only if they are watched - return segmentWatcherConfig.isWatchRealtimeNodes() - || metadataAndSegment.lhs.getType() != ServerType.REALTIME; + return segmentWatcherConfig.isWatchRealtimeTasks() + || metadataAndSegment.lhs.getType() != ServerType.INDEXER_EXECUTOR; }; ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s"); baseView.registerSegmentCallback( diff --git a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java index 6fa2c167ebd3..8f6e25106ed1 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java @@ -45,10 +45,10 @@ public void testSerde() throws Exception ); Assert.assertNull(config.getWatchedTiers()); - Assert.assertTrue(config.isWatchRealtimeNodes()); + Assert.assertTrue(config.isWatchRealtimeTasks()); //non-defaults - json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"], \"watchRealtimeNodes\": false }"; + json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"], \"watchRealtimeTasks\": false }"; config = MAPPER.readValue( MAPPER.writeValueAsString( @@ -59,7 +59,7 @@ public void testSerde() throws Exception Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers()); Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); - Assert.assertFalse(config.isWatchRealtimeNodes()); + Assert.assertFalse(config.isWatchRealtimeTasks()); } } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index 920a9d88ffb7..b92a42b7670b 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -575,7 +575,7 @@ public Set getWatchedTiers() } @Override - public boolean isWatchRealtimeNodes() + public boolean isWatchRealtimeTasks() { return watchRealtimeNodes; } From fed56644ce21a60f950f5027c988c5240a3e7f12 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 21 Oct 2021 13:43:05 +0530 Subject: [PATCH 4/8] Update docs/configuration/index.md Co-authored-by: Charles Smith --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index bfa3fe9f2601..c0ba6f843df6 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1854,7 +1854,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti |`druid.broker.segment.watchedTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to only consider segments being served from a list of tiers. By default, Broker considers all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources. This config is mutually exclusive from `druid.broker.segment.ignoredTiers` and at most one of these can be configured on a Broker.|none| |`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker considers all tiers. This config is mutually exclusive from `druid.broker.segment.watchedTiers` and at most one of these can be configured on a Broker.|none| |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| -|`druid.broker.segment.watchRealtimeNodes`|Boolean|Broker watches segment announcements from processes serving segments to build cache of which process is serving which segments. This configuration allows the exclusion of segments being served by realtime processes. By default, Broker would consider both realtime and historical processes. This can be used to configure brokers in partitions so that they are only queryable for Historical tiers or both Historical tiers and realtime processes.|true| +|`druid.broker.segment.watchRealtimeNodes`|Boolean|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. When `watchRealtimeNodes` is true, the Broker watches for segment announcements from both Historicals and realtime processes. To configure a broker to exclude segments served by realtime processes, set `watchRealtimeNodes` to false. |true| |`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true| ## Cache Configuration From c2090fc46fd76996dbfccf9185c2324e827ec21d Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 21 Oct 2021 13:44:48 +0530 Subject: [PATCH 5/8] Rename config watchRealtimeNodes to watchRealtimeTasks --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index c0ba6f843df6..4896eba623df 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1854,7 +1854,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti |`druid.broker.segment.watchedTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to only consider segments being served from a list of tiers. By default, Broker considers all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources. This config is mutually exclusive from `druid.broker.segment.ignoredTiers` and at most one of these can be configured on a Broker.|none| |`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker considers all tiers. This config is mutually exclusive from `druid.broker.segment.watchedTiers` and at most one of these can be configured on a Broker.|none| |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| -|`druid.broker.segment.watchRealtimeNodes`|Boolean|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. When `watchRealtimeNodes` is true, the Broker watches for segment announcements from both Historicals and realtime processes. To configure a broker to exclude segments served by realtime processes, set `watchRealtimeNodes` to false. |true| +|`druid.broker.segment.watchRealtimeTasks`|Boolean|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. When `watchRealtimeTasks` is true, the Broker watches for segment announcements from both Historicals and realtime processes. To configure a broker to exclude segments served by realtime processes, set `watchRealtimeTasks` to false. |true| |`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true| ## Cache Configuration From 624f075a8013b0546db1a6349a0076677a0f2276 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 21 Oct 2021 14:04:26 +0530 Subject: [PATCH 6/8] Fix changes made for testing --- server/src/main/java/org/apache/druid/client/DruidServer.java | 1 - services/src/main/java/org/apache/druid/cli/CliIndexer.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DruidServer.java b/server/src/main/java/org/apache/druid/client/DruidServer.java index 22c4c5427489..6c52866d0586 100644 --- a/server/src/main/java/org/apache/druid/client/DruidServer.java +++ b/server/src/main/java/org/apache/druid/client/DruidServer.java @@ -50,7 +50,6 @@ public class DruidServer implements Comparable public static final int DEFAULT_PRIORITY = 0; public static final int DEFAULT_NUM_REPLICANTS = 2; public static final String DEFAULT_TIER = "_default_tier"; - public static final String REALTIME_TIER = "_realtime_tier"; private static final Logger log = new Logger(DruidServer.class); diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 8fd7b2a8a309..c11d1a424333 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -210,7 +210,7 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) public DataNodeService getDataNodeService(DruidServerConfig serverConfig) { return new DataNodeService( - DruidServer.REALTIME_TIER, + DruidServer.DEFAULT_TIER, serverConfig.getMaxSize(), ServerType.INDEXER_EXECUTOR, DruidServer.DEFAULT_PRIORITY From 88f812c263147a532e6e55a03d535a20973afc3b Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 21 Oct 2021 14:08:09 +0530 Subject: [PATCH 7/8] Remove test log lines --- .../main/java/org/apache/druid/client/BrokerServerView.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index a91f59084233..5fcd8cdc85f8 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -135,8 +135,8 @@ public BrokerServerView( } // Include realtime tasks only if they are watched - return segmentWatcherConfig.isWatchRealtimeTasks() - || metadataAndSegment.lhs.getType() != ServerType.INDEXER_EXECUTOR; + return metadataAndSegment.lhs.getType() != ServerType.INDEXER_EXECUTOR + || segmentWatcherConfig.isWatchRealtimeTasks(); }; ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s"); baseView.registerSegmentCallback( @@ -267,7 +267,6 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query // loop... if (!server.getType().equals(ServerType.BROKER)) { - log.info("Adding segment[%s] for server[%s:%s:%s]", segmentId, server.getType(), server.getTier(), server.getHostAndPort()); log.debug("Adding segment[%s] for server[%s]", segment, server); ServerSelector selector = selectors.get(segmentId); if (selector == null) { @@ -301,7 +300,6 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen final ServerSelector selector; synchronized (lock) { - log.info("Removing segment[%s] from server[%s:%s:%s]", segmentId, server.getType(), server.getTier(), server.getHostAndPort()); log.debug("Removing segment[%s] from server[%s].", segmentId, server); // we don't store broker segments here, but still run the callbacks for the segment being removed from the server From c2eb1c1120203a0d6fb726111f6da34728ee363f Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 21 Oct 2021 14:10:37 +0530 Subject: [PATCH 8/8] Restore test BrokerServerViewTest.testIgnoredTiers() --- .../druid/client/BrokerServerViewTest.java | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index ab50224c55e2..0a0e6b83cac1 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -444,6 +444,68 @@ public void testRealtimeTasksNotWatched() throws Exception Assert.assertEquals(Collections.singletonList(historicalServer.getMetadata()), selector.getCandidates(2)); } + @Test + public void testIgnoredTiers() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(4); + segmentRemovedLatch = new CountDownLatch(0); + + // Setup a Broker that does not watch Tier 1 + final String tier1 = "tier1"; + final String tier2 = "tier2"; + setupViews(null, Sets.newHashSet(tier1), false); + + // Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3 + final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1); + final DruidServer server21 = setupHistoricalServer(tier2, "localhost:2", 1); + + final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1"); + announceSegmentForServer(server11, segment1, zkPathsConfig, jsonMapper); + + final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1"); + announceSegmentForServer(server11, segment2, zkPathsConfig, jsonMapper); + announceSegmentForServer(server21, segment2, zkPathsConfig, jsonMapper); + + final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1"); + announceSegmentForServer(server21, segment3, zkPathsConfig, jsonMapper); + + // Wait for the segments to be added + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + // Get the timeline for the datasource + TimelineLookup timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource())) + ).get(); + + // Verify that the timeline has no entry for the interval of segment 1 + Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty()); + + // Verify that there is one entry for the interval of segment 2 + List> timelineHolders = + timeline.lookup(segment2.getInterval()); + Assert.assertEquals(1, timelineHolders.size()); + + TimelineObjectHolder timelineHolder = timelineHolders.get(0); + Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval()); + Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion()); + + PartitionHolder partitionHolder = timelineHolder.getObject(); + Assert.assertTrue(partitionHolder.isComplete()); + Assert.assertEquals(1, Iterables.size(partitionHolder)); + + ServerSelector selector = (partitionHolder.iterator().next()).getObject(); + Assert.assertFalse(selector.isEmpty()); + Assert.assertEquals(segment2, selector.getSegment()); + + // Verify that the ServerSelector always picks Tier 1 + for (int i = 0; i < 5; ++i) { + Assert.assertEquals(server21, selector.pick(null).getServer()); + } + Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2)); + } + @Test(expected = ISE.class) public void testInvalidWatchedTiersConfig() throws Exception {