Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,35 +87,45 @@ public void announceSegment(DataSegment segment) throws IOException
}

synchronized (lock) {
// create new batch
if (availableZNodes.isEmpty()) {
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath());
availableZNode.addSegment(segment);

log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
availableZNodes.add(availableZNode);
} else { // update existing batch
Iterator<SegmentZNode> iter = availableZNodes.iterator();
boolean done = false;
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxBytesPerNode()) {
availableZNode.addSegment(segment);

log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
if (!availableZNodes.isEmpty()) {
// update existing batch
Iterator<SegmentZNode> iter = availableZNodes.iterator();
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxBytesPerNode()) {
availableZNode.addSegment(segment);

log.info("Announcing segment[%s] at existing path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);

if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
availableZNodes.remove(availableZNode);
}

done = true;
} else {
// We could have kept the znode around for later use, however we remove it since segment announcements should
// have similar size unless there are significant schema changes. Removing the znode reduces the number of
// znodes that would be scanned at each announcement.
availableZNodes.remove(availableZNode);
}
}
}

if (!done) {
assert (availableZNodes.isEmpty());
// create new batch

SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath());
availableZNode.addSegment(segment);

log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(),
availableZNode.getPath());
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
availableZNodes.add(availableZNode);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
*/
Expand All @@ -64,6 +65,8 @@ public class BatchDataSegmentAnnouncerTest
private BatchDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments;

private final AtomicInteger maxBytesPerNode = new AtomicInteger(512 * 1024);

@Before
public void setUp() throws Exception
{
Expand Down Expand Up @@ -104,6 +107,12 @@ public int getSegmentsPerNode()
{
return 50;
}

@Override
public long getMaxBytesPerNode()
{
return maxBytesPerNode.get();
}
},
new ZkPathsConfig()
{
Expand Down Expand Up @@ -168,14 +177,42 @@ public void testSingleAnnounce() throws Exception
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}

@Test
public void testSingleAnnounceManyTimes() throws Exception
{
int prevMax = maxBytesPerNode.get();
maxBytesPerNode.set(2048);
// each segment is about 317 bytes long and that makes 2048 / 317 = 6 segments included per node
// so 100 segments makes (100 / 6) + 1 = 17 nodes
try {
for (DataSegment segment : testSegments) {
segmentAnnouncer.announceSegment(segment);
}
}
finally {
maxBytesPerNode.set(prevMax);
}

List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
Assert.assertEquals(17, zNodes.size());

Set<DataSegment> segments = Sets.newHashSet(testSegments);
for (String zNode : zNodes) {
for (DataSegment segment : segmentReader.read(joiner.join(testSegmentsPath, zNode))) {
Assert.assertTrue("Invalid segment " + segment, segments.remove(segment));
}
}
Assert.assertTrue("Failed to find segments " + segments, segments.isEmpty());
}

@Test
public void testBatchAnnounce() throws Exception
{
segmentAnnouncer.announceSegments(testSegments);

List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);

Assert.assertTrue(zNodes.size() == 2);
Assert.assertEquals(2, zNodes.size());

Set<DataSegment> allSegments = Sets.newHashSet();
for (String zNode : zNodes) {
Expand All @@ -192,7 +229,7 @@ public void testBatchAnnounce() throws Exception
public void testMultipleBatchAnnounce() throws Exception
{
for (int i = 0; i < 10; i++) {
testBatchAnnounce();
testBatchAnnounce();
}
}

Expand Down Expand Up @@ -227,8 +264,8 @@ public Set<DataSegment> read(String path)
if (cf.checkExists().forPath(path) != null) {
return jsonMapper.readValue(
cf.getData().forPath(path), new TypeReference<Set<DataSegment>>()
{
}
{
}
);
}
}
Expand Down