Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti
|`druid.broker.segment.watchedTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to only consider segments being served from a list of tiers. By default, Broker considers all tiers. This can be used to partition your dataSources in specific Historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources. This config is mutually exclusive from `druid.broker.segment.ignoredTiers` and at most one of these can be configured on a Broker.|none|
|`druid.broker.segment.ignoredTiers`|List of strings|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. This configuration allows the Broker to ignore the segments being served from a list of tiers. By default, Broker considers all tiers. This config is mutually exclusive from `druid.broker.segment.watchedTiers` and at most one of these can be configured on a Broker.|none|
|`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from processes serving segments to build cache of which process is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none|
|`druid.broker.segment.watchRealtimeTasks`|Boolean|The Broker watches segment announcements from processes that serve segments to build a cache to relate each process to the segments it serves. When `watchRealtimeTasks` is true, the Broker watches for segment announcements from both Historicals and realtime processes. To configure a broker to exclude segments served by realtime processes, set `watchRealtimeTasks` to false. |true|
|`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true|

## Cache Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class BrokerSegmentWatcherConfig
@JsonProperty
private Set<String> watchedDataSources = null;

@JsonProperty
private boolean watchRealtimeTasks = true;

@JsonProperty
private boolean awaitInitializationOnStart = true;

Expand All @@ -54,6 +57,11 @@ public Set<String> getWatchedDataSources()
return watchedDataSources;
}

public boolean isWatchRealtimeTasks()
{
return watchRealtimeTasks;
}

public boolean isAwaitInitializationOnStart()
{
return awaitInitializationOnStart;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public BrokerServerView(
this.segmentWatcherConfig = segmentWatcherConfig;

this.segmentFilter = (Pair<DruidServerMetadata, DataSegment> metadataAndSegment) -> {

// Include only watched tiers if specified
if (segmentWatcherConfig.getWatchedTiers() != null
&& !segmentWatcherConfig.getWatchedTiers().contains(metadataAndSegment.lhs.getTier())) {
Expand All @@ -133,7 +134,9 @@ public BrokerServerView(
return false;
}

return true;
// Include realtime tasks only if they are watched
return metadataAndSegment.lhs.getType() != ServerType.INDEXER_EXECUTOR
|| segmentWatcherConfig.isWatchRealtimeTasks();
};
ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s");
baseView.registerSegmentCallback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ public void testSerde() throws Exception
);

Assert.assertNull(config.getWatchedTiers());
Assert.assertTrue(config.isWatchRealtimeTasks());
Assert.assertNull(config.getIgnoredTiers());

//non-defaults
json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }";
json = "{ \"watchedTiers\": [\"t1\", \"t2\"], \"watchedDataSources\": [\"ds1\", \"ds2\"], \"watchRealtimeTasks\": false }";

config = MAPPER.readValue(
MAPPER.writeValueAsString(
Expand All @@ -60,6 +61,7 @@ public void testSerde() throws Exception
Assert.assertEquals(ImmutableSet.of("t1", "t2"), config.getWatchedTiers());
Assert.assertNull(config.getIgnoredTiers());
Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources());
Assert.assertFalse(config.isWatchRealtimeTasks());

// json with ignoredTiers
json = "{ \"ignoredTiers\": [\"t3\", \"t4\"], \"watchedDataSources\": [\"ds1\", \"ds2\"] }";
Expand All @@ -74,5 +76,6 @@ public void testSerde() throws Exception
Assert.assertNull(config.getWatchedTiers());
Assert.assertEquals(ImmutableSet.of("t3", "t4"), config.getIgnoredTiers());
Assert.assertEquals(ImmutableSet.of("ds1", "ds2"), config.getWatchedDataSources());
Assert.assertTrue(config.isWatchRealtimeTasks());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void testMultipleTiers() throws Exception
// Setup a Broker that watches only Tier 2
final String tier1 = "tier1";
final String tier2 = "tier2";
setupViews(Sets.newHashSet(tier2));
setupViews(Sets.newHashSet(tier2), null, true);

// Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3
final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1);
Expand Down Expand Up @@ -384,6 +384,66 @@ public void testMultipleTiers() throws Exception
Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2));
}

@Test
public void testRealtimeTasksNotWatched() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
segmentAddedLatch = new CountDownLatch(4);
segmentRemovedLatch = new CountDownLatch(0);

// Setup a Broker that watches only Historicals
setupViews(null, null, false);

// Historical has segments 2 and 3, Realtime has segments 1 and 2
final DruidServer realtimeServer = setupDruidServer(ServerType.INDEXER_EXECUTOR, null, "realtime:1", 1);
final DruidServer historicalServer = setupHistoricalServer("tier1", "historical:2", 1);

final DataSegment segment1 = dataSegmentWithIntervalAndVersion("2020-01-01/P1D", "v1");
announceSegmentForServer(realtimeServer, segment1, zkPathsConfig, jsonMapper);

final DataSegment segment2 = dataSegmentWithIntervalAndVersion("2020-01-02/P1D", "v1");
announceSegmentForServer(realtimeServer, segment2, zkPathsConfig, jsonMapper);
announceSegmentForServer(historicalServer, segment2, zkPathsConfig, jsonMapper);

final DataSegment segment3 = dataSegmentWithIntervalAndVersion("2020-01-03/P1D", "v1");
announceSegmentForServer(historicalServer, segment3, zkPathsConfig, jsonMapper);

// Wait for the segments to be added
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));

// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource(segment1.getDataSource()))
).get();

// Verify that the timeline has no entry for the interval of segment 1
Assert.assertTrue(timeline.lookup(segment1.getInterval()).isEmpty());

// Verify that there is one entry for the interval of segment 2
List<TimelineObjectHolder<String, ServerSelector>> timelineHolders =
timeline.lookup(segment2.getInterval());
Assert.assertEquals(1, timelineHolders.size());

TimelineObjectHolder<String, ServerSelector> timelineHolder = timelineHolders.get(0);
Assert.assertEquals(segment2.getInterval(), timelineHolder.getInterval());
Assert.assertEquals(segment2.getVersion(), timelineHolder.getVersion());

PartitionHolder<ServerSelector> partitionHolder = timelineHolder.getObject();
Assert.assertTrue(partitionHolder.isComplete());
Assert.assertEquals(1, Iterables.size(partitionHolder));

ServerSelector selector = (partitionHolder.iterator().next()).getObject();
Assert.assertFalse(selector.isEmpty());
Assert.assertEquals(segment2, selector.getSegment());

// Verify that the ServerSelector always picks the Historical server
for (int i = 0; i < 5; ++i) {
Assert.assertEquals(historicalServer, selector.pick(null).getServer());
}
Assert.assertEquals(Collections.singletonList(historicalServer.getMetadata()), selector.getCandidates(2));
}

@Test
public void testIgnoredTiers() throws Exception
{
Expand All @@ -394,7 +454,7 @@ public void testIgnoredTiers() throws Exception
// Setup a Broker that does not watch Tier 1
final String tier1 = "tier1";
final String tier2 = "tier2";
setupViews(null, Sets.newHashSet(tier1));
setupViews(null, Sets.newHashSet(tier1), false);

// Historical Tier 1 has segments 1 and 2, Tier 2 has segments 2 and 3
final DruidServer server11 = setupHistoricalServer(tier1, "localhost:1", 1);
Expand Down Expand Up @@ -452,19 +512,19 @@ public void testInvalidWatchedTiersConfig() throws Exception
// Verify that specifying both ignoredTiers and watchedTiers fails startup
final String tier1 = "tier1";
final String tier2 = "tier2";
setupViews(Sets.newHashSet(tier2), Sets.newHashSet(tier1));
setupViews(Sets.newHashSet(tier2), Sets.newHashSet(tier1), true);
}

@Test(expected = ISE.class)
public void testEmptyWatchedTiersConfig() throws Exception
{
setupViews(Collections.emptySet(), null);
setupViews(Collections.emptySet(), null, true);
}

@Test(expected = ISE.class)
public void testEmptyIgnoredTiersConfig() throws Exception
{
setupViews(null, Collections.emptySet());
setupViews(null, Collections.emptySet(), true);
}

/**
Expand Down Expand Up @@ -530,15 +590,10 @@ private void assertValues(

private void setupViews() throws Exception
{
setupViews(null);
setupViews(null, null, true);
}

private void setupViews(Set<String> watchedTiers) throws Exception
{
setupViews(watchedTiers, null);
}

private void setupViews(Set<String> watchedTiers, Set<String> ignoredTiers) throws Exception
private void setupViews(Set<String> watchedTiers, Set<String> ignoredTiers, boolean watchRealtimeTasks) throws Exception
{
baseView = new BatchServerInventoryView(
zkPathsConfig,
Expand Down Expand Up @@ -598,6 +653,12 @@ public Set<String> getWatchedTiers()
return watchedTiers;
}

@Override
public boolean isWatchRealtimeTasks()
{
return watchRealtimeTasks;
}

@Override
public Set<String> getIgnoredTiers()
{
Expand Down