diff --git a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java index ab4696905154..d47fe5c36594 100644 --- a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -36,7 +36,6 @@ import org.joda.time.DateTime; import java.io.IOException; -import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; @@ -87,36 +86,41 @@ public void announceSegment(DataSegment segment) throws IOException } synchronized (lock) { - // create new batch - if (availableZNodes.isEmpty()) { - SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath()); - availableZNode.addSegment(segment); - - log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath()); - announcer.announce(availableZNode.getPath(), availableZNode.getBytes()); - segmentLookup.put(segment, availableZNode); - availableZNodes.add(availableZNode); - } else { // update existing batch - Iterator iter = availableZNodes.iterator(); - boolean done = false; - while (iter.hasNext() && !done) { - SegmentZNode availableZNode = iter.next(); + // try to update existing batch + for (SegmentZNode availableZNode : availableZNodes) { if (availableZNode.getBytes().length + newBytesLen < config.getMaxBytesPerNode()) { - availableZNode.addSegment(segment); - - log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath()); - announcer.update(availableZNode.getPath(), availableZNode.getBytes()); - segmentLookup.put(segment, availableZNode); - - if (availableZNode.getCount() >= config.getSegmentsPerNode()) { - availableZNodes.remove(availableZNode); - } - - done = true; + SegmentZNode announced = announceSegmentTo(segment, availableZNode); + if (announced.getCount() >= config.getSegmentsPerNode()) { + availableZNodes.remove(availableZNode); } + return; } } + // create new batch + SegmentZNode announced = announceSegmentTo(segment, null); + availableZNodes.add(announced); + } + } + + private SegmentZNode announceSegmentTo(DataSegment segment, SegmentZNode segmentNode) + { + SegmentZNode availableNode = segmentNode == null ? new SegmentZNode(makeServedSegmentPath()) : segmentNode; + availableNode.addSegment(segment); + + log.info( + "Announcing segment[%s] at %s path[%s]", + segment.getIdentifier(), + segmentNode == null ? "new" : "existing", + availableNode.getPath() + ); + if (segmentNode == null) { + announcer.announce(availableNode.getPath(), availableNode.getBytes()); + } else { + announcer.update(availableNode.getPath(), availableNode.getBytes()); } + segmentLookup.put(segment, availableNode); + + return availableNode; } @Override @@ -186,9 +190,17 @@ public void unannounceSegments(Iterable segments) throws IOExceptio } } - private String makeServedSegmentPath(){ + private String makeServedSegmentPath() + { // server.getName() is already in the zk path - return makeServedSegmentPath(UUIDUtils.generateUuid(server.getHost(), server.getType(), server.getTier(), new DateTime().toString())); + return makeServedSegmentPath( + UUIDUtils.generateUuid( + server.getHost(), + server.getType(), + server.getTier(), + new DateTime().toString() + ) + ); } private String makeServedSegmentPath(String zNode) @@ -231,8 +243,8 @@ public Set getSegments() try { return jsonMapper.readValue( bytes, new TypeReference>() - { - } + { + } ); } catch (Exception e) { diff --git a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java index 26d13af6baf4..08024ed7d1ad 100644 --- a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java +++ b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java @@ -47,6 +47,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -64,6 +65,8 @@ public class BatchDataSegmentAnnouncerTest private BatchDataSegmentAnnouncer segmentAnnouncer; private Set testSegments; + private final AtomicInteger maxBytesPerNode = new AtomicInteger(512 * 1024); + @Before public void setUp() throws Exception { @@ -104,6 +107,12 @@ public int getSegmentsPerNode() { return 50; } + + @Override + public long getMaxBytesPerNode() + { + return maxBytesPerNode.get(); + } }, new ZkPathsConfig() { @@ -168,6 +177,34 @@ public void testSingleAnnounce() throws Exception Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); } + @Test + public void testSingleAnnounceManyTimes() throws Exception + { + int prevMax = maxBytesPerNode.get(); + maxBytesPerNode.set(2048); + // each segment is about 317 bytes long and that makes 2048 / 317 = 6 segments included per node + // so 100 segments makes (100 / 6) + 1 = 17 nodes + try { + for (DataSegment segment : testSegments) { + segmentAnnouncer.announceSegment(segment); + } + } + finally { + maxBytesPerNode.set(prevMax); + } + + List zNodes = cf.getChildren().forPath(testSegmentsPath); + Assert.assertEquals(17, zNodes.size()); + + Set segments = Sets.newHashSet(testSegments); + for (String zNode : zNodes) { + for (DataSegment segment : segmentReader.read(joiner.join(testSegmentsPath, zNode))) { + Assert.assertTrue("Invalid segment " + segment, segments.remove(segment)); + } + } + Assert.assertTrue("Failed to find segments " + segments, segments.isEmpty()); + } + @Test public void testBatchAnnounce() throws Exception { @@ -175,7 +212,7 @@ public void testBatchAnnounce() throws Exception List zNodes = cf.getChildren().forPath(testSegmentsPath); - Assert.assertTrue(zNodes.size() == 2); + Assert.assertEquals(2, zNodes.size()); Set allSegments = Sets.newHashSet(); for (String zNode : zNodes) { @@ -192,7 +229,7 @@ public void testBatchAnnounce() throws Exception public void testMultipleBatchAnnounce() throws Exception { for (int i = 0; i < 10; i++) { - testBatchAnnounce(); + testBatchAnnounce(); } } @@ -227,8 +264,8 @@ public Set read(String path) if (cf.checkExists().forPath(path) != null) { return jsonMapper.readValue( cf.getData().forPath(path), new TypeReference>() - { - } + { + } ); } }