Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.joda.time.DateTime;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
Expand Down Expand Up @@ -87,36 +86,41 @@ public void announceSegment(DataSegment segment) throws IOException
}

synchronized (lock) {
// create new batch
if (availableZNodes.isEmpty()) {
SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath());
availableZNode.addSegment(segment);

log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
availableZNodes.add(availableZNode);
} else { // update existing batch
Iterator<SegmentZNode> iter = availableZNodes.iterator();
boolean done = false;
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
// try to update existing batch
for (SegmentZNode availableZNode : availableZNodes) {
if (availableZNode.getBytes().length + newBytesLen < config.getMaxBytesPerNode()) {
availableZNode.addSegment(segment);

log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);

if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
availableZNodes.remove(availableZNode);
}

done = true;
SegmentZNode announced = announceSegmentTo(segment, availableZNode);
if (announced.getCount() >= config.getSegmentsPerNode()) {
availableZNodes.remove(availableZNode);
}
return;
}
}
// create new batch
SegmentZNode announced = announceSegmentTo(segment, null);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit : It will probably be cleaner to create a new SegmentZNode and pass it, instead of the announceSegmentTo creating a new node

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using null to decide announce or update in announceSegmentTo().

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make a comment as such in the javadoc for the method please?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, +1 for method javadoc.

availableZNodes.add(announced);
}
}

private SegmentZNode announceSegmentTo(DataSegment segment, SegmentZNode segmentNode)
{
SegmentZNode availableNode = segmentNode == null ? new SegmentZNode(makeServedSegmentPath()) : segmentNode;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a lot of branching based on segmentNode == null in this method – we might as well have two separate methods

availableNode.addSegment(segment);

log.info(
"Announcing segment[%s] at %s path[%s]",
segment.getIdentifier(),
segmentNode == null ? "new" : "existing",
availableNode.getPath()
);
if (segmentNode == null) {
announcer.announce(availableNode.getPath(), availableNode.getBytes());
} else {
announcer.update(availableNode.getPath(), availableNode.getBytes());
}
segmentLookup.put(segment, availableNode);

return availableNode;
}

@Override
Expand Down Expand Up @@ -186,9 +190,17 @@ public void unannounceSegments(Iterable<DataSegment> segments) throws IOExceptio
}
}

private String makeServedSegmentPath(){
private String makeServedSegmentPath()
{
// server.getName() is already in the zk path
return makeServedSegmentPath(UUIDUtils.generateUuid(server.getHost(), server.getType(), server.getTier(), new DateTime().toString()));
return makeServedSegmentPath(
UUIDUtils.generateUuid(
server.getHost(),
server.getType(),
server.getTier(),
new DateTime().toString()
)
);
}

private String makeServedSegmentPath(String zNode)
Expand Down Expand Up @@ -231,8 +243,8 @@ public Set<DataSegment> getSegments()
try {
return jsonMapper.readValue(
bytes, new TypeReference<Set<DataSegment>>()
{
}
{
}
);
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
*/
Expand All @@ -64,6 +65,8 @@ public class BatchDataSegmentAnnouncerTest
private BatchDataSegmentAnnouncer segmentAnnouncer;
private Set<DataSegment> testSegments;

private final AtomicInteger maxBytesPerNode = new AtomicInteger(512 * 1024);

@Before
public void setUp() throws Exception
{
Expand Down Expand Up @@ -104,6 +107,12 @@ public int getSegmentsPerNode()
{
return 50;
}

@Override
public long getMaxBytesPerNode()
{
return maxBytesPerNode.get();
}
},
new ZkPathsConfig()
{
Expand Down Expand Up @@ -168,14 +177,42 @@ public void testSingleAnnounce() throws Exception
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}

@Test
public void testSingleAnnounceManyTimes() throws Exception
{
int prevMax = maxBytesPerNode.get();
maxBytesPerNode.set(2048);
// each segment is about 317 bytes long and that makes 2048 / 317 = 6 segments included per node
// so 100 segments makes (100 / 6) + 1 = 17 nodes
try {
for (DataSegment segment : testSegments) {
segmentAnnouncer.announceSegment(segment);
}
}
finally {
maxBytesPerNode.set(prevMax);
}

List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
Assert.assertEquals(17, zNodes.size());

Set<DataSegment> segments = Sets.newHashSet(testSegments);
for (String zNode : zNodes) {
for (DataSegment segment : segmentReader.read(joiner.join(testSegmentsPath, zNode))) {
Assert.assertTrue("Invalid segment " + segment, segments.remove(segment));
}
}
Assert.assertTrue("Failed to find segments " + segments, segments.isEmpty());
}

@Test
public void testBatchAnnounce() throws Exception
{
segmentAnnouncer.announceSegments(testSegments);

List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);

Assert.assertTrue(zNodes.size() == 2);
Assert.assertEquals(2, zNodes.size());

Set<DataSegment> allSegments = Sets.newHashSet();
for (String zNode : zNodes) {
Expand All @@ -192,7 +229,7 @@ public void testBatchAnnounce() throws Exception
public void testMultipleBatchAnnounce() throws Exception
{
for (int i = 0; i < 10; i++) {
testBatchAnnounce();
testBatchAnnounce();
}
}

Expand Down Expand Up @@ -227,8 +264,8 @@ public Set<DataSegment> read(String path)
if (cf.checkExists().forPath(path) != null) {
return jsonMapper.readValue(
cf.getData().forPath(path), new TypeReference<Set<DataSegment>>()
{
}
{
}
);
}
}
Expand Down