diff --git a/pom.xml b/pom.xml index 120b2611a129..931ac5600810 100644 --- a/pom.xml +++ b/pom.xml @@ -65,7 +65,7 @@ - 0.27.6 + 0.27.7 2.9.1 9.2.5.v20141112 1.19 diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index bc62bdf0b641..e50280c2e9db 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -27,6 +27,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.metamx.common.IAE; @@ -78,8 +79,10 @@ import org.joda.time.Duration; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -94,6 +97,20 @@ public class DruidCoordinator { public static final String COORDINATOR_OWNER_NODE = "_COORDINATOR"; + + public static Comparator SEGMENT_COMPARATOR = Ordering.from(Comparators.intervalsByEndThenStart()) + .onResultOf( + new Function() + { + @Override + public Interval apply(DataSegment segment) + { + return segment.getInterval(); + } + }) + .compound(Ordering.natural()) + .reverse(); + private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class); private final Object lock = new Object(); private final DruidCoordinatorConfig config; @@ -249,7 +266,8 @@ public CountingMap getSegmentAvailability() return retVal; } - CountingMap getLoadPendingDatasources() { + CountingMap getLoadPendingDatasources() + { final CountingMap retVal = new CountingMap<>(); for (LoadQueuePeon peon : loadManagementPeons.values()) { for (DataSegment segment : peon.getSegmentsToLoad()) { @@ -386,7 +404,7 @@ public void moveSegment( public void execute() { try { - if (curator.checkExists().forPath(toServedSegPath) != null && + if (curator.checkExists().forPath(toServedSegPath) != null && curator.checkExists().forPath(toLoadQueueSegPath) == null && !dropPeon.getSegmentsToDrop().contains(segment)) { dropPeon.dropSegment(segment, callback); @@ -411,7 +429,7 @@ public void execute() public Set getOrderedAvailableDataSegments() { - Set availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator())); + Set availableSegments = Sets.newTreeSet(SEGMENT_COMPARATOR); Iterable dataSegments = getAvailableDataSegments(); diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index cd4eba468bf2..742999ce2439 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -199,7 +199,7 @@ public static class Builder this.databaseRuleManager = null; this.segmentReplicantLookup = null; this.dataSources = Sets.newHashSet(); - this.availableSegments = Sets.newTreeSet(Comparators.inverse(DataSegment.bucketMonthComparator())); + this.availableSegments = Sets.newTreeSet(DruidCoordinator.SEGMENT_COMPARATOR); this.loadManagementPeons = Maps.newHashMap(); this.replicationManager = null; this.emitter = null; diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java index e1d287e599aa..ae049e8a68b3 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java @@ -57,8 +57,6 @@ public class LoadQueuePeon private static final int DROP = 0; private static final int LOAD = 1; - private static Comparator segmentComparator = Comparators.inverse(DataSegment.bucketMonthComparator()); - private static void executeCallbacks(List callbacks) { for (LoadPeonCallback callback : callbacks) { @@ -79,10 +77,10 @@ private static void executeCallbacks(List callbacks) private final AtomicInteger failedAssignCount = new AtomicInteger(0); private final ConcurrentSkipListMap segmentsToLoad = new ConcurrentSkipListMap<>( - segmentComparator + DruidCoordinator.SEGMENT_COMPARATOR ); private final ConcurrentSkipListMap segmentsToDrop = new ConcurrentSkipListMap<>( - segmentComparator + DruidCoordinator.SEGMENT_COMPARATOR ); private final Object lock = new Object(); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index a179eb73e36e..fa06f44b16aa 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -23,7 +23,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.common.collect.MapMaker; +import com.google.common.collect.Maps; import com.metamx.common.concurrent.ScheduledExecutorFactory; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; @@ -61,6 +63,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -375,4 +378,45 @@ public void childEvent( EasyMock.verify(serverInventoryView); EasyMock.verify(metadataRuleManager); } + + @Test + public void testOrderedAvailableDataSegments() + { + DruidDataSource dataSource = new DruidDataSource("test", new HashMap()); + DataSegment[] segments = new DataSegment[]{ + getSegment("test", new Interval("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")), + getSegment("test", new Interval("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")), + getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z")), + getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z")) + }; + for (DataSegment segment : segments) { + dataSource.addSegment(segment.getIdentifier(), segment); + } + + EasyMock.expect(databaseSegmentManager.getInventory()).andReturn( + ImmutableList.of(dataSource) + ).atLeastOnce(); + EasyMock.replay(databaseSegmentManager); + Set availableSegments = coordinator.getOrderedAvailableDataSegments(); + DataSegment[] expected = new DataSegment[]{ + getSegment("test", new Interval("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")), + getSegment("test", new Interval("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")), + getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z")), + getSegment("test", new Interval("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z")) + }; + Assert.assertEquals(expected.length, availableSegments.size()); + Assert.assertEquals(expected, availableSegments.toArray()); + EasyMock.verify(databaseSegmentManager); + } + + + private DataSegment getSegment(String dataSource, Interval interval) + { + // Not using EasyMock as it hampers the performance of multithreads. + DataSegment segment = new DataSegment( + dataSource, interval, "dummy_version", Maps.newConcurrentMap(), + Lists.newArrayList(), Lists.newArrayList(), null, 0, 0L + ); + return segment; + } } diff --git a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTest.java index 0fb2286123a5..5e3f56a41af8 100644 --- a/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTest.java +++ b/server/src/test/java/io/druid/server/coordinator/LoadQueuePeonTest.java @@ -137,9 +137,27 @@ public DataSegment apply(String intervalStr) final List segmentToLoad = Lists.transform( ImmutableList.of( + "2014-10-27T00:00:00Z/P1D", + "2014-10-29T00:00:00Z/P1M", + "2014-10-31T00:00:00Z/P1D", + "2014-10-30T00:00:00Z/P1D", + "2014-10-28T00:00:00Z/P1D" + ), new Function() + { + @Override + public DataSegment apply(String intervalStr) + { + return dataSegmentWithInterval(intervalStr); + } + } + ); + + // segment with latest interval should be loaded first + final List expectedLoadOrder = Lists.transform( + ImmutableList.of( + "2014-10-29T00:00:00Z/P1M", "2014-10-31T00:00:00Z/P1D", "2014-10-30T00:00:00Z/P1D", - "2014-10-29T00:00:00Z/P1D", "2014-10-28T00:00:00Z/P1D", "2014-10-27T00:00:00Z/P1D" ), new Function() @@ -235,7 +253,7 @@ public void execute() } } - for (DataSegment segment : segmentToLoad) { + for (DataSegment segment : expectedLoadOrder) { String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getIdentifier()); Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal[requestSignalIdx.get()])); Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));