diff --git a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java index ab4696905154..25264d5e69d4 100644 --- a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -87,35 +87,45 @@ public void announceSegment(DataSegment segment) throws IOException } synchronized (lock) { - // create new batch - if (availableZNodes.isEmpty()) { - SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath()); - availableZNode.addSegment(segment); - - log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath()); - announcer.announce(availableZNode.getPath(), availableZNode.getBytes()); - segmentLookup.put(segment, availableZNode); - availableZNodes.add(availableZNode); - } else { // update existing batch - Iterator iter = availableZNodes.iterator(); boolean done = false; - while (iter.hasNext() && !done) { - SegmentZNode availableZNode = iter.next(); - if (availableZNode.getBytes().length + newBytesLen < config.getMaxBytesPerNode()) { - availableZNode.addSegment(segment); - - log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath()); + if (!availableZNodes.isEmpty()) { + // update existing batch + Iterator iter = availableZNodes.iterator(); + while (iter.hasNext() && !done) { + SegmentZNode availableZNode = iter.next(); + if (availableZNode.getBytes().length + newBytesLen < config.getMaxBytesPerNode()) { + availableZNode.addSegment(segment); + + log.info("Announcing segment[%s] at existing path[%s]", segment.getIdentifier(), availableZNode.getPath()); announcer.update(availableZNode.getPath(), availableZNode.getBytes()); segmentLookup.put(segment, availableZNode); if (availableZNode.getCount() >= config.getSegmentsPerNode()) { availableZNodes.remove(availableZNode); } - done = true; + } else { + // We could have kept the znode around for later use, however we remove it since segment announcements should + // have similar size unless there are significant schema changes. Removing the znode reduces the number of + // znodes that would be scanned at each announcement. + availableZNodes.remove(availableZNode); } } } + + if (!done) { + assert (availableZNodes.isEmpty()); + // create new batch + + SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath()); + availableZNode.addSegment(segment); + + log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(), + availableZNode.getPath()); + announcer.announce(availableZNode.getPath(), availableZNode.getBytes()); + segmentLookup.put(segment, availableZNode); + availableZNodes.add(availableZNode); + } } } diff --git a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java index 26d13af6baf4..08024ed7d1ad 100644 --- a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java +++ b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java @@ -47,6 +47,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -64,6 +65,8 @@ public class BatchDataSegmentAnnouncerTest private BatchDataSegmentAnnouncer segmentAnnouncer; private Set testSegments; + private final AtomicInteger maxBytesPerNode = new AtomicInteger(512 * 1024); + @Before public void setUp() throws Exception { @@ -104,6 +107,12 @@ public int getSegmentsPerNode() { return 50; } + + @Override + public long getMaxBytesPerNode() + { + return maxBytesPerNode.get(); + } }, new ZkPathsConfig() { @@ -168,6 +177,34 @@ public void testSingleAnnounce() throws Exception Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); } + @Test + public void testSingleAnnounceManyTimes() throws Exception + { + int prevMax = maxBytesPerNode.get(); + maxBytesPerNode.set(2048); + // each segment is about 317 bytes long and that makes 2048 / 317 = 6 segments included per node + // so 100 segments makes (100 / 6) + 1 = 17 nodes + try { + for (DataSegment segment : testSegments) { + segmentAnnouncer.announceSegment(segment); + } + } + finally { + maxBytesPerNode.set(prevMax); + } + + List zNodes = cf.getChildren().forPath(testSegmentsPath); + Assert.assertEquals(17, zNodes.size()); + + Set segments = Sets.newHashSet(testSegments); + for (String zNode : zNodes) { + for (DataSegment segment : segmentReader.read(joiner.join(testSegmentsPath, zNode))) { + Assert.assertTrue("Invalid segment " + segment, segments.remove(segment)); + } + } + Assert.assertTrue("Failed to find segments " + segments, segments.isEmpty()); + } + @Test public void testBatchAnnounce() throws Exception { @@ -175,7 +212,7 @@ public void testBatchAnnounce() throws Exception List zNodes = cf.getChildren().forPath(testSegmentsPath); - Assert.assertTrue(zNodes.size() == 2); + Assert.assertEquals(2, zNodes.size()); Set allSegments = Sets.newHashSet(); for (String zNode : zNodes) { @@ -192,7 +229,7 @@ public void testBatchAnnounce() throws Exception public void testMultipleBatchAnnounce() throws Exception { for (int i = 0; i < 10; i++) { - testBatchAnnounce(); + testBatchAnnounce(); } } @@ -227,8 +264,8 @@ public Set read(String path) if (cf.checkExists().forPath(path) != null) { return jsonMapper.readValue( cf.getData().forPath(path), new TypeReference>() - { - } + { + } ); } }