diff --git a/docs/configuration/index.md b/docs/configuration/index.md index c20d801cf592..4896eba623df 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1854,6 +1854,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti |`druid.broker.segment.watchedTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to only consider segments being served from a list of tiers. By default, Broker considers all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources. This config is mutually exclusive from `druid.broker.segment.ignoredTiers` and at most one of these can be configured on a Broker.|none| |`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker considers all tiers. This config is mutually exclusive from `druid.broker.segment.watchedTiers` and at most one of these can be configured on a Broker.|none| |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none| +|`druid.broker.segment.watchRealtimeTasks`|Boolean|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. When `watchRealtimeTasks` is true, the Broker watches for segment announcements from both Historicals and realtime processes. To configure a broker to exclude segments served by realtime processes, set `watchRealtimeTasks` to false. |true| |`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true| ## Cache Configuration diff --git a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java index c487c713f232..5f3dbd3abe7d 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java +++ b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java @@ -36,6 +36,9 @@ public class BrokerSegmentWatcherConfig @JsonProperty private Set watchedDataSources = null; + @JsonProperty + private boolean watchRealtimeTasks = true; + @JsonProperty private boolean awaitInitializationOnStart = true; @@ -54,6 +57,11 @@ public Set getWatchedDataSources() return watchedDataSources; } + public boolean isWatchRealtimeTasks() + { + return watchRealtimeTasks; + } + public boolean isAwaitInitializationOnStart() { return awaitInitializationOnStart; diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index f4269157b6ad..5fcd8cdc85f8 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -115,6 +115,7 @@ public BrokerServerView( this.segmentWatcherConfig = segmentWatcherConfig; this.segmentFilter = (Pair metadataAndSegment) -> { + // Include only watched tiers if specified if (segmentWatcherConfig.getWatchedTiers() != null && !segmentWatcherConfig.getWatchedTiers().contains(metadataAndSegment.lhs.getTier())) { @@ -133,7 +134,9 @@ public BrokerServerView( return false; } - return true; + // Include realtime tasks only if they are watched + return metadataAndSegment.lhs.getType() != ServerType.INDEXER_EXECUTOR + || segmentWatcherConfig.isWatchRealtimeTasks(); }; ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s"); baseView.registerSegmentCallback( diff --git a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java index 8eab4629489a..cc5398c85faf 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerSegmentWatcherConfigTest.java @@ -45,10 +45,11 @@ public void testSerde() throws Exception ); Assert.assertNull(config.getWatchedTiers()); + Assert.assertTrue(config.isWatchRealtimeTasks()); Assert.assertNull(config.getIgnoredTiers()); //non-defaults - json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }"; + json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"], \"watchRealtimeTasks\": false }"; config = MAPPER.readValue( MAPPER.writeValueAsString( @@ -60,6 +61,7 @@ public void testSerde() throws Exception Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers()); Assert.assertNull(config.getIgnoredTiers()); Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); + Assert.assertFalse(config.isWatchRealtimeTasks()); // json with ignoredTiers json = "{ \"ignoredTiers\": [\"t3\", \"t4\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }"; @@ -74,5 +76,6 @@ public void testSerde() throws Exception Assert.assertNull(config.getWatchedTiers()); Assert.assertEquals(ImmutableSet.of("t3", "t4"), config.getIgnoredTiers()); Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources()); + Assert.assertTrue(config.isWatchRealtimeTasks()); } } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index 6e93c372b346..0a0e6b83cac1 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -332,7 +332,7 @@ public void testMultipleTiers() throws Exception // Setup a Broker that watches only Tier 2 final String tier1 = "tier1"; final String tier2 = "tier2"; - setupViews(Sets.newHashSet(tier2)); + setupViews(Sets.newHashSet(tier2), null, true); // Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3 final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1); @@ -384,6 +384,66 @@ public void testMultipleTiers() throws Exception Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2)); } + @Test + public void testRealtimeTasksNotWatched() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(4); + segmentRemovedLatch = new CountDownLatch(0); + + // Setup a Broker that watches only Historicals + setupViews(null, null, false); + + // Historical has segments 2 and 3, Realtime has segments 1 and 2 + final DruidServer realtimeServer = setupDruidServer(ServerType.INDEXER_EXECUTOR, null, "realtime:1", 1); + final DruidServer historicalServer = setupHistoricalServer("tier1", "historical:2", 1); + + final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1"); + announceSegmentForServer(realtimeServer, segment1, zkPathsConfig, jsonMapper); + + final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1"); + announceSegmentForServer(realtimeServer, segment2, zkPathsConfig, jsonMapper); + announceSegmentForServer(historicalServer, segment2, zkPathsConfig, jsonMapper); + + final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1"); + announceSegmentForServer(historicalServer, segment3, zkPathsConfig, jsonMapper); + + // Wait for the segments to be added + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + // Get the timeline for the datasource + TimelineLookup timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource())) + ).get(); + + // Verify that the timeline has no entry for the interval of segment 1 + Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty()); + + // Verify that there is one entry for the interval of segment 2 + List> timelineHolders = + timeline.lookup(segment2.getInterval()); + Assert.assertEquals(1, timelineHolders.size()); + + TimelineObjectHolder timelineHolder = timelineHolders.get(0); + Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval()); + Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion()); + + PartitionHolder partitionHolder = timelineHolder.getObject(); + Assert.assertTrue(partitionHolder.isComplete()); + Assert.assertEquals(1, Iterables.size(partitionHolder)); + + ServerSelector selector = (partitionHolder.iterator().next()).getObject(); + Assert.assertFalse(selector.isEmpty()); + Assert.assertEquals(segment2, selector.getSegment()); + + // Verify that the ServerSelector always picks the Historical server + for (int i = 0; i < 5; ++i) { + Assert.assertEquals(historicalServer, selector.pick(null).getServer()); + } + Assert.assertEquals(Collections.singletonList(historicalServer.getMetadata()), selector.getCandidates(2)); + } + @Test public void testIgnoredTiers() throws Exception { @@ -394,7 +454,7 @@ public void testIgnoredTiers() throws Exception // Setup a Broker that does not watch Tier 1 final String tier1 = "tier1"; final String tier2 = "tier2"; - setupViews(null, Sets.newHashSet(tier1)); + setupViews(null, Sets.newHashSet(tier1), false); // Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3 final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1); @@ -452,19 +512,19 @@ public void testInvalidWatchedTiersConfig() throws Exception // Verify that specifying both ignoredTiers and watchedTiers fails startup final String tier1 = "tier1"; final String tier2 = "tier2"; - setupViews(Sets.newHashSet(tier2), Sets.newHashSet(tier1)); + setupViews(Sets.newHashSet(tier2), Sets.newHashSet(tier1), true); } @Test(expected = ISE.class) public void testEmptyWatchedTiersConfig() throws Exception { - setupViews(Collections.emptySet(), null); + setupViews(Collections.emptySet(), null, true); } @Test(expected = ISE.class) public void testEmptyIgnoredTiersConfig() throws Exception { - setupViews(null, Collections.emptySet()); + setupViews(null, Collections.emptySet(), true); } /** @@ -530,15 +590,10 @@ private void assertValues( private void setupViews() throws Exception { - setupViews(null); + setupViews(null, null, true); } - private void setupViews(Set watchedTiers) throws Exception - { - setupViews(watchedTiers, null); - } - - private void setupViews(Set watchedTiers, Set ignoredTiers) throws Exception + private void setupViews(Set watchedTiers, Set ignoredTiers, boolean watchRealtimeTasks) throws Exception { baseView = new BatchServerInventoryView( zkPathsConfig, @@ -598,6 +653,12 @@ public Set getWatchedTiers() return watchedTiers; } + @Override + public boolean isWatchRealtimeTasks() + { + return watchRealtimeTasks; + } + @Override public Set getIgnoredTiers() {