Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 5 additions & 35 deletions server/src/test/java/io/druid/client/BrokerServerViewTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.Pair;
Expand All @@ -47,8 +46,6 @@
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.PartitionHolder;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.After;
Expand All @@ -65,8 +62,6 @@ public class BrokerServerViewTest extends CuratorTestBase
{
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPathsConfig;
private final String announcementsPath;
private final String inventoryPath;

private CountDownLatch segmentViewInitLatch;
private CountDownLatch segmentAddedLatch;
Expand All @@ -79,8 +74,6 @@ public BrokerServerViewTest()
{
jsonMapper = new DefaultObjectMapper();
zkPathsConfig = new ZkPathsConfig();
announcementsPath = zkPathsConfig.getAnnouncementsPath();
inventoryPath = zkPathsConfig.getLiveSegmentsPath();
}

@Before
Expand Down Expand Up @@ -111,7 +104,7 @@ public void testSingleServerAddedRemovedSegment() throws Exception
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);

final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
announceSegmentForServer(druidServer, segment);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));

Expand All @@ -137,7 +130,7 @@ public void testSingleServerAddedRemovedSegment() throws Exception
Assert.assertEquals(segment, selector.getSegment());
Assert.assertEquals(druidServer, selector.pick().getServer());

unannounceSegmentForServer(druidServer, segment);
unannounceSegmentForServer(druidServer, segment, zkPathsConfig);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));

Assert.assertEquals(
Expand Down Expand Up @@ -199,7 +192,7 @@ public DataSegment apply(Pair<String, String> input)
);

for (int i = 0; i < 5; ++i) {
announceSegmentForServer(druidServers.get(i), segments.get(i));
announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper);
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
Expand All @@ -219,7 +212,7 @@ public DataSegment apply(Pair<String, String> input)
);

// unannounce the segment created by dataSegmentWithIntervalAndVersion("2011-04-01/2011-04-09", "v2")
unannounceSegmentForServer(druidServers.get(2), segments.get(2));
unannounceSegmentForServer(druidServers.get(2), segments.get(2), zkPathsConfig);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));

// renew segmentRemovedLatch since we still have 4 segments to unannounce
Expand All @@ -244,7 +237,7 @@ public DataSegment apply(Pair<String, String> input)
for (int i = 0; i < 5; ++i) {
// skip the one that was previously unannounced
if (i != 2) {
unannounceSegmentForServer(druidServers.get(i), segments.get(i));
unannounceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig);
}
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
Expand All @@ -255,29 +248,6 @@ public DataSegment apply(Pair<String, String> input)
);
}

private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
{
curator.create()
.compressed()
.withMode(CreateMode.EPHEMERAL)
.forPath(
ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()),
jsonMapper.writeValueAsBytes(
ImmutableSet.<DataSegment>of(segment)
)
);
}

private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
{
curator.delete().guaranteed().forPath(
ZKPaths.makePath(
ZKPaths.makePath(inventoryPath, druidServer.getHost()),
segment.getIdentifier()
)
);
}

private Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> createExpected(
String intervalStr,
String version,
Expand Down
33 changes: 10 additions & 23 deletions server/src/test/java/io/druid/client/CoordinatorServerViewTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.Pair;
Expand All @@ -38,7 +37,6 @@
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.PartitionHolder;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -54,7 +52,6 @@ public class CoordinatorServerViewTest extends CuratorTestBase
{
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPathsConfig;
private final String announcementsPath;
private final String inventoryPath;

private CountDownLatch segmentViewInitLatch;
Expand All @@ -68,7 +65,6 @@ public CoordinatorServerViewTest()
{
jsonMapper = new DefaultObjectMapper();
zkPathsConfig = new ZkPathsConfig();
announcementsPath = zkPathsConfig.getAnnouncementsPath();
inventoryPath = zkPathsConfig.getLiveSegmentsPath();
}

Expand Down Expand Up @@ -100,7 +96,7 @@ public void testSingleServerAddedRemovedSegment() throws Exception
setupZNodeForServer(druidServer, zkPathsConfig, jsonMapper);

final DataSegment segment = dataSegmentWithIntervalAndVersion("2014-10-20T00:00:00Z/P1D", "v1");
announceSegmentForServer(druidServer, segment);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));

Expand All @@ -122,7 +118,10 @@ public void testSingleServerAddedRemovedSegment() throws Exception

SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject();
Assert.assertFalse(segmentLoadInfo.isEmpty());
Assert.assertEquals(druidServer.getMetadata(), Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()));
Assert.assertEquals(
druidServer.getMetadata(),
Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers())
);

unannounceSegmentForServer(druidServer, segment);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
Expand Down Expand Up @@ -186,7 +185,7 @@ public DataSegment apply(Pair<String, String> input)
);

for (int i = 0; i < 5; ++i) {
announceSegmentForServer(druidServers.get(i), segments.get(i));
announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper);
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
Expand Down Expand Up @@ -242,19 +241,6 @@ public DataSegment apply(Pair<String, String> input)
);
}

private void announceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
{
curator.create()
.compressed()
.withMode(CreateMode.EPHEMERAL)
.forPath(
ZKPaths.makePath(ZKPaths.makePath(inventoryPath, druidServer.getHost()), segment.getIdentifier()),
jsonMapper.writeValueAsBytes(
ImmutableSet.<DataSegment>of(segment)
)
);
}

private void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment) throws Exception
{
curator.delete().guaranteed().forPath(
Expand Down Expand Up @@ -283,7 +269,7 @@ private void assertValues(

for (int i = 0; i < expected.size(); ++i) {
Pair<Interval, Pair<String, Pair<DruidServer, DataSegment>>> expectedPair = expected.get(i);
TimelineObjectHolder<String,SegmentLoadInfo> actualTimelineObjectHolder = actual.get(i);
TimelineObjectHolder<String, SegmentLoadInfo> actualTimelineObjectHolder = actual.get(i);

Assert.assertEquals(expectedPair.lhs, actualTimelineObjectHolder.getInterval());
Assert.assertEquals(expectedPair.rhs.lhs, actualTimelineObjectHolder.getVersion());
Expand All @@ -292,9 +278,10 @@ private void assertValues(
Assert.assertTrue(actualPartitionHolder.isComplete());
Assert.assertEquals(1, Iterables.size(actualPartitionHolder));

SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject();
SegmentLoadInfo segmentLoadInfo = actualPartitionHolder.iterator().next().getObject();
Assert.assertFalse(segmentLoadInfo.isEmpty());
Assert.assertEquals(expectedPair.rhs.rhs.lhs.getMetadata(),Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()));
Assert.assertEquals(expectedPair.rhs.rhs.lhs.getMetadata(),
Iterables.getOnlyElement(segmentLoadInfo.toImmutableSegmentLoadInfo().getServers()));
}
}

Expand Down
110 changes: 89 additions & 21 deletions server/src/test/java/io/druid/curator/CuratorTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@
package io.druid.curator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.guava.CloseQuietly;
import io.druid.client.DruidServer;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;

/**
*/
Expand All @@ -54,40 +59,103 @@ protected void setupServerAndCurator() throws Exception
}

protected void setupZNodeForServer(DruidServer server, ZkPathsConfig zkPathsConfig, ObjectMapper jsonMapper)
throws Exception
{
final String announcementsPath = zkPathsConfig.getAnnouncementsPath();
final String inventoryPath = zkPathsConfig.getLiveSegmentsPath();

final String zNodePathAnnounce = ZKPaths.makePath(announcementsPath, server.getHost());
final String zNodePathSegment = ZKPaths.makePath(inventoryPath, server.getHost());
try {
curator.create()
.creatingParentsIfNeeded()
.forPath(
ZKPaths.makePath(announcementsPath, server.getHost()),
jsonMapper.writeValueAsBytes(server.getMetadata())
);
curator.create()
.creatingParentsIfNeeded()
.forPath(ZKPaths.makePath(inventoryPath, server.getHost()));
}
catch (KeeperException.NodeExistsException e) {
/*
* For some reason, Travis build sometimes fails here because of
* org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists, though it should never
* happen because zookeeper should be in a clean state for each run of tests.
* Address issue: https://github.com/druid-io/druid/issues/1512
*/
try {
curator.setData()
.forPath(
ZKPaths.makePath(announcementsPath, server.getHost()),
jsonMapper.writeValueAsBytes(server.getMetadata())
);
curator.setData()
.forPath(ZKPaths.makePath(inventoryPath, server.getHost()));
}
catch (Exception e1) {
Throwables.propagate(e1);
}
}
catch (Exception e) {
Throwables.propagate(e);
}
}

protected void announceSegmentForServer(
DruidServer druidServer,
DataSegment segment,
ZkPathsConfig zkPathsConfig,
ObjectMapper jsonMapper
)
{
final String segmentAnnouncementPath = ZKPaths.makePath(ZKPaths.makePath(
zkPathsConfig.getLiveSegmentsPath(),
druidServer.getHost()
), segment.getIdentifier());

/*
* Explicitly check whether the zNodes we are about to create exist or not,
* if exist, delete them to make sure we have a clean state on zookeeper.
* Address issue: https://github.com/druid-io/druid/issues/1512
*/
if (curator.checkExists().forPath(zNodePathAnnounce) != null) {
curator.delete().guaranteed().forPath(zNodePathAnnounce);
try {
curator.create()
.compressed()
.withMode(CreateMode.EPHEMERAL)
.forPath(
segmentAnnouncementPath,
jsonMapper.writeValueAsBytes(
ImmutableSet.<DataSegment>of(segment)
)
);
}
if (curator.checkExists().forPath(zNodePathSegment) != null) {
curator.delete().guaranteed().forPath(zNodePathSegment);
catch (KeeperException.NodeExistsException e) {
try {
curator.setData()
.forPath(
segmentAnnouncementPath,
jsonMapper.writeValueAsBytes(ImmutableSet.<DataSegment>of(segment))
);
}
catch (Exception e1) {
Throwables.propagate(e1);
}
}
catch (Exception e) {
Throwables.propagate(e);
}
}

curator.create()
.creatingParentsIfNeeded()
.forPath(
ZKPaths.makePath(announcementsPath, server.getHost()),
jsonMapper.writeValueAsBytes(server.getMetadata())
);
curator.create()
.creatingParentsIfNeeded()
.forPath(ZKPaths.makePath(inventoryPath, server.getHost()));
protected void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment, ZkPathsConfig zkPathsConfig)
throws Exception
{
curator.delete().guaranteed().forPath(
ZKPaths.makePath(
ZKPaths.makePath(zkPathsConfig.getLiveSegmentsPath(), druidServer.getHost()),
segment.getIdentifier()
)
);
}

protected void tearDownServerAndCurator()
{
CloseQuietly.close(curator);
CloseQuietly.close(server);
}

}

//Build at Tue Dec 22 21:30:00 CST 2015