Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
</scm>

<properties>
<metamx.java-util.version>0.27.6</metamx.java-util.version>
<metamx.java-util.version>0.27.7</metamx.java-util.version>
<apache.curator.version>2.9.1</apache.curator.version>
<jetty.version>9.2.5.v20141112</jetty.version>
<jersey.version>1.19</jersey.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.IAE;
Expand Down Expand Up @@ -78,8 +79,10 @@
import org.joda.time.Duration;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -94,6 +97,20 @@
public class DruidCoordinator
{
public static final String COORDINATOR_OWNER_NODE = "_COORDINATOR";

public static Comparator<DataSegment> SEGMENT_COMPARATOR = Ordering.from(Comparators.intervalsByEndThenStart())
.onResultOf(
new Function<DataSegment, Interval>()
{
@Override
public Interval apply(DataSegment segment)
{
return segment.getInterval();
}
})
.compound(Ordering.<DataSegment>natural())
.reverse();

private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
private final Object lock = new Object();
private final DruidCoordinatorConfig config;
Expand Down Expand Up @@ -249,7 +266,8 @@ public CountingMap<String> getSegmentAvailability()
return retVal;
}

CountingMap<String> getLoadPendingDatasources() {
CountingMap<String> getLoadPendingDatasources()
{
final CountingMap<String> retVal = new CountingMap<>();
for (LoadQueuePeon peon : loadManagementPeons.values()) {
for (DataSegment segment : peon.getSegmentsToLoad()) {
Expand Down Expand Up @@ -386,7 +404,7 @@ public void moveSegment(
public void execute()
{
try {
if (curator.checkExists().forPath(toServedSegPath) != null &&
if (curator.checkExists().forPath(toServedSegPath) != null &&
curator.checkExists().forPath(toLoadQueueSegPath) == null &&
!dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback);
Expand All @@ -411,7 +429,7 @@ public void execute()

public Set<DataSegment> getOrderedAvailableDataSegments()
{
Set<DataSegment> availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
Set<DataSegment> availableSegments = Sets.newTreeSet(SEGMENT_COMPARATOR);

Iterable<DataSegment> dataSegments = getAvailableDataSegments();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public static class Builder
this.databaseRuleManager = null;
this.segmentReplicantLookup = null;
this.dataSources = Sets.newHashSet();
this.availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator()));
this.availableSegments = Sets.newTreeSet(DruidCoordinator.SEGMENT_COMPARATOR);
this.loadManagementPeons = Maps.newHashMap();
this.replicationManager = null;
this.emitter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public class LoadQueuePeon
private static final int DROP = 0;
private static final int LOAD = 1;

private static Comparator<DataSegment> segmentComparator = Comparators.inverse(DataSegment.bucketMonthComparator());

private static void executeCallbacks(List<LoadPeonCallback> callbacks)
{
for (LoadPeonCallback callback : callbacks) {
Expand All @@ -79,10 +77,10 @@ private static void executeCallbacks(List<LoadPeonCallback> callbacks)
private final AtomicInteger failedAssignCount = new AtomicInteger(0);

private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
segmentComparator
DruidCoordinator.SEGMENT_COMPARATOR
);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
segmentComparator
DruidCoordinator.SEGMENT_COMPARATOR
);

private final Object lock = new Object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
Expand Down Expand Up @@ -61,6 +63,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -375,4 +378,45 @@ public void childEvent(
EasyMock.verify(serverInventoryView);
EasyMock.verify(metadataRuleManager);
}

@Test
public void testOrderedAvailableDataSegments()
{
DruidDataSource dataSource = new DruidDataSource("test", new HashMap());
DataSegment[] segments = new DataSegment[]{
getSegment("test", new Interval("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")),
getSegment("test", new Interval("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")),
getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z")),
getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z"))
};
for (DataSegment segment : segments) {
dataSource.addSegment(segment.getIdentifier(), segment);
}

EasyMock.expect(databaseSegmentManager.getInventory()).andReturn(
ImmutableList.of(dataSource)
).atLeastOnce();
EasyMock.replay(databaseSegmentManager);
Set<DataSegment> availableSegments = coordinator.getOrderedAvailableDataSegments();
DataSegment[] expected = new DataSegment[]{
getSegment("test", new Interval("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")),
getSegment("test", new Interval("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")),
getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z")),
getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z"))
};
Assert.assertEquals(expected.length, availableSegments.size());
Assert.assertEquals(expected, availableSegments.toArray());
EasyMock.verify(databaseSegmentManager);
}


private DataSegment getSegment(String dataSource, Interval interval)
{
// Not using EasyMock as it hampers the performance of multithreads.
DataSegment segment = new DataSegment(
dataSource, interval, "dummy_version", Maps.<String, Object>newConcurrentMap(),
Lists.<String>newArrayList(), Lists.<String>newArrayList(), null, 0, 0L
);
return segment;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,27 @@ public DataSegment apply(String intervalStr)

final List<DataSegment> segmentToLoad = Lists.transform(
ImmutableList.<String>of(
"2014-10-27T00:00:00Z/P1D",
"2014-10-29T00:00:00Z/P1M",
"2014-10-31T00:00:00Z/P1D",
"2014-10-30T00:00:00Z/P1D",
"2014-10-28T00:00:00Z/P1D"
), new Function<String, DataSegment>()
{
@Override
public DataSegment apply(String intervalStr)
{
return dataSegmentWithInterval(intervalStr);
}
}
);

// segment with latest interval should be loaded first
final List<DataSegment> expectedLoadOrder = Lists.transform(
ImmutableList.<String>of(
"2014-10-29T00:00:00Z/P1M",
"2014-10-31T00:00:00Z/P1D",
"2014-10-30T00:00:00Z/P1D",
"2014-10-29T00:00:00Z/P1D",
"2014-10-28T00:00:00Z/P1D",
"2014-10-27T00:00:00Z/P1D"
), new Function<String, DataSegment>()
Expand Down Expand Up @@ -235,7 +253,7 @@ public void execute()
}
}

for (DataSegment segment : segmentToLoad) {
for (DataSegment segment : expectedLoadOrder) {
String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getIdentifier());
Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal[requestSignalIdx.get()]));
Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));
Expand Down