diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 337e9a46fb2f..3b5d40872cd6 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -42,6 +42,7 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -217,6 +218,12 @@ private QueryableDruidServer removeServer(DruidServer server) private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment) { + if (server.getType().equals(ServerType.BROKER)) { + // in theory we could just filter this to ensure we don't put ourselves in here, to make dope broker tree + // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query + // loop... + return; + } SegmentId segmentId = segment.getId(); synchronized (lock) { log.debug("Adding segment[%s] for server[%s]", segment, server); @@ -246,6 +253,10 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment) { + if (server.getType().equals(ServerType.BROKER)) { + // might as well save the trouble of grabbing a lock for something that isn't there.. + return; + } SegmentId segmentId = segment.getId(); final ServerSelector selector; diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index dd3961e04f42..30b93a18c84d 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileGenerator; -import com.google.common.base.Function; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -161,22 +160,15 @@ public void testMultipleServerAddedRemovedSegment() throws Exception final List druidServers = Lists.transform( ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), - new Function() - { - @Override - public DruidServer apply(String input) - { - return new DruidServer( - input, - input, - null, - 10000000L, - ServerType.HISTORICAL, - "default_tier", - 0 - ); - } - } + input -> new DruidServer( + input, + input, + null, + 10000000L, + ServerType.HISTORICAL, + "default_tier", + 0 + ) ); for (DruidServer druidServer : druidServers) { @@ -190,14 +182,7 @@ public DruidServer apply(String input) Pair.of("2011-04-01/2011-04-09", "v2"), Pair.of("2011-04-06/2011-04-09", "v3"), Pair.of("2011-04-01/2011-04-02", "v3") - ), new Function, DataSegment>() - { - @Override - public DataSegment apply(Pair input) - { - return dataSegmentWithIntervalAndVersion(input.lhs, input.rhs); - } - } + ), input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs) ); for (int i = 0; i < 5; ++i) { @@ -261,6 +246,114 @@ public DataSegment apply(Pair input) ); } + @Test + public void testMultipleServerAndBroker() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(6); + + // temporarily set latch count to 1 + segmentRemovedLatch = new CountDownLatch(1); + + setupViews(); + + final DruidServer druidBroker = new DruidServer( + "localhost:5", + "localhost:5", + null, + 10000000L, + ServerType.BROKER, + "default_tier", + 0 + ); + + final List druidServers = Lists.transform( + ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), + input -> new DruidServer( + input, + input, + null, + 10000000L, + ServerType.HISTORICAL, + "default_tier", + 0 + ) + ); + + setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper); + for (DruidServer druidServer : druidServers) { + setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper); + } + + final List segments = Lists.transform( + ImmutableList.of( + Pair.of("2011-04-01/2011-04-03", "v1"), + Pair.of("2011-04-03/2011-04-06", "v1"), + Pair.of("2011-04-01/2011-04-09", "v2"), + Pair.of("2011-04-06/2011-04-09", "v3"), + Pair.of("2011-04-01/2011-04-02", "v3") + ), + input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs) + ); + + DataSegment brokerSegment = dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-11", "v4"); + announceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig, jsonMapper); + for (int i = 0; i < 5; ++i) { + announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper); + } + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + TimelineLookup timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) + ).get(); + + assertValues( + Arrays.asList( + createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), + createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)), + createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3)) + ), + (List) timeline.lookup( + Intervals.of( + "2011-04-01/2011-04-09" + ) + ) + ); + + // unannounce the broker segment should do nothing to announcements + unannounceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); + + // renew segmentRemovedLatch since we still have 5 segments to unannounce + segmentRemovedLatch = new CountDownLatch(5); + + timeline = brokerServerView.getTimeline( + DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view")) + ).get(); + + // expect same set of segments as before + assertValues( + Arrays.asList( + createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)), + createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)), + createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3)) + ), + (List) timeline.lookup( + Intervals.of( + "2011-04-01/2011-04-09" + ) + ) + ); + + // unannounce all the segments + for (int i = 0; i < 5; ++i) { + unannounceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig); + } + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); + } + + private Pair>> createExpected( String intervalStr, String version, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 54e7e5a8f961..35037e2ff2cb 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -350,6 +350,12 @@ protected Multimap getFunctionMultim @VisibleForTesting void addSegment(final DruidServerMetadata server, final DataSegment segment) { + if (server.getType().equals(ServerType.BROKER)) { + // in theory we could just filter this to ensure we don't put ourselves in here, to make dope broker tree + // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite metadata + // loop... + return; + } synchronized (lock) { final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null; @@ -428,6 +434,10 @@ void removeSegment(final DataSegment segment) @VisibleForTesting void removeServerSegment(final DruidServerMetadata server, final DataSegment segment) { + if (server.getType().equals(ServerType.BROKER)) { + // cheese it + return; + } synchronized (lock) { log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 6e2f8f62103f..ee3bca1a4f8e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -57,6 +57,7 @@ import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.LinearShardSpec; +import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.junit.After; import org.junit.AfterClass; @@ -418,4 +419,35 @@ public void testAvailableSegmentMetadataIsRealtime() Assert.assertEquals(0L, currentMetadata.isRealtime()); } + @Test + public void testAvailableSegmentFromBrokerIsIgnored() + { + + Assert.assertEquals(4, schema.getTotalSegments()); + + DruidServerMetadata metadata = new DruidServerMetadata( + "broker", + "localhost:0", + null, + 1000L, + ServerType.BROKER, + "broken", + 0 + ); + + DataSegment segment = new DataSegment( + "test", + Intervals.of("2011-04-01/2011-04-11"), + "v1", + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + NoneShardSpec.instance(), + 1, + 100L + ); + schema.addSegment(metadata, segment); + Assert.assertEquals(4, schema.getTotalSegments()); + + } }