Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions server/src/main/java/org/apache/druid/client/BrokerServerView.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
Expand Down Expand Up @@ -217,6 +218,12 @@ private QueryableDruidServer removeServer(DruidServer server)

private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment)
{
if (server.getType().equals(ServerType.BROKER)) {
// in theory we could just filter this to ensure we don't put ourselves in here, to make dope broker tree
// query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query
// loop...
return;
}
SegmentId segmentId = segment.getId();
synchronized (lock) {
log.debug("Adding segment[%s] for server[%s]", segment, server);
Expand Down Expand Up @@ -246,6 +253,10 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm

private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment)
{
if (server.getType().equals(ServerType.BROKER)) {
// might as well save the trouble of grabbing a lock for something that isn't there..
return;
}
SegmentId segmentId = segment.getId();
final ServerSelector selector;

Expand Down
143 changes: 118 additions & 25 deletions server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -161,22 +160,15 @@ public void testMultipleServerAddedRemovedSegment() throws Exception

final List<DruidServer> druidServers = Lists.transform(
ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
new Function<String, DruidServer>()
{
@Override
public DruidServer apply(String input)
{
return new DruidServer(
input,
input,
null,
10000000L,
ServerType.HISTORICAL,
"default_tier",
0
);
}
}
input -> new DruidServer(
input,
input,
null,
10000000L,
ServerType.HISTORICAL,
"default_tier",
0
)
);

for (DruidServer druidServer : druidServers) {
Expand All @@ -190,14 +182,7 @@ public DruidServer apply(String input)
Pair.of("2011-04-01/2011-04-09", "v2"),
Pair.of("2011-04-06/2011-04-09", "v3"),
Pair.of("2011-04-01/2011-04-02", "v3")
), new Function<Pair<String, String>, DataSegment>()
{
@Override
public DataSegment apply(Pair<String, String> input)
{
return dataSegmentWithIntervalAndVersion(input.lhs, input.rhs);
}
}
), input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs)
);

for (int i = 0; i < 5; ++i) {
Expand Down Expand Up @@ -261,6 +246,114 @@ public DataSegment apply(Pair<String, String> input)
);
}

@Test
public void testMultipleServerAndBroker() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
segmentAddedLatch = new CountDownLatch(6);

// temporarily set latch count to 1
segmentRemovedLatch = new CountDownLatch(1);

setupViews();

final DruidServer druidBroker = new DruidServer(
"localhost:5",
"localhost:5",
null,
10000000L,
ServerType.BROKER,
"default_tier",
0
);

final List<DruidServer> druidServers = Lists.transform(
ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
input -> new DruidServer(
input,
input,
null,
10000000L,
ServerType.HISTORICAL,
"default_tier",
0
)
);

setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper);
for (DruidServer druidServer : druidServers) {
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);
}

final List<DataSegment> segments = Lists.transform(
ImmutableList.of(
Pair.of("2011-04-01/2011-04-03", "v1"),
Pair.of("2011-04-03/2011-04-06", "v1"),
Pair.of("2011-04-01/2011-04-09", "v2"),
Pair.of("2011-04-06/2011-04-09", "v3"),
Pair.of("2011-04-01/2011-04-02", "v3")
),
input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs)
);

DataSegment brokerSegment = dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-11", "v4");
announceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig, jsonMapper);
for (int i = 0; i < 5; ++i) {
announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper);
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));

TimelineLookup timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view"))
).get();

assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)),
createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)),
createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3))
),
(List<TimelineObjectHolder>) timeline.lookup(
Intervals.of(
"2011-04-01/2011-04-09"
)
)
);

// unannounce the broker segment should do nothing to announcements
unannounceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));

// renew segmentRemovedLatch since we still have 5 segments to unannounce
segmentRemovedLatch = new CountDownLatch(5);

timeline = brokerServerView.getTimeline(
DataSourceAnalysis.forDataSource(new TableDataSource("test_broker_server_view"))
).get();

// expect same set of segments as before
assertValues(
Arrays.asList(
createExpected("2011-04-01/2011-04-02", "v3", druidServers.get(4), segments.get(4)),
createExpected("2011-04-02/2011-04-06", "v2", druidServers.get(2), segments.get(2)),
createExpected("2011-04-06/2011-04-09", "v3", druidServers.get(3), segments.get(3))
),
(List<TimelineObjectHolder>) timeline.lookup(
Intervals.of(
"2011-04-01/2011-04-09"
)
)
);

// unannounce all the segments
for (int i = 0; i < 5; ++i) {
unannounceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig);
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
}


private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(
String intervalStr,
String version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ protected Multimap<String, org.apache.calcite.schema.Function> getFunctionMultim
@VisibleForTesting
void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
if (server.getType().equals(ServerType.BROKER)) {
// in theory we could just filter this to ensure we don't put ourselves in here, to make dope broker tree
// query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite metadata
// loop...
return;
}
synchronized (lock) {
final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null;
Expand Down Expand Up @@ -428,6 +434,10 @@ void removeSegment(final DataSegment segment)
@VisibleForTesting
void removeServerSegment(final DruidServerMetadata server, final DataSegment segment)
{
if (server.getType().equals(ServerType.BROKER)) {
// cheese it
return;
}
synchronized (lock) {
log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName());
final Map<SegmentId, AvailableSegmentMetadata> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.After;
import org.junit.AfterClass;
Expand Down Expand Up @@ -418,4 +419,35 @@ public void testAvailableSegmentMetadataIsRealtime()
Assert.assertEquals(0L, currentMetadata.isRealtime());
}

@Test
public void testAvailableSegmentFromBrokerIsIgnored()
{

Assert.assertEquals(4, schema.getTotalSegments());

DruidServerMetadata metadata = new DruidServerMetadata(
"broker",
"localhost:0",
null,
1000L,
ServerType.BROKER,
"broken",
0
);

DataSegment segment = new DataSegment(
"test",
Intervals.of("2011-04-01/2011-04-11"),
"v1",
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
NoneShardSpec.instance(),
1,
100L
);
schema.addSegment(metadata, segment);
Assert.assertEquals(4, schema.getTotalSegments());

}
}