From dbcad63622c6caf3c29435d8cff2c8cc77b725d4 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 6 Oct 2017 16:49:24 -0700 Subject: [PATCH 1/3] Only consider loaded replicants when computing replication status. This affects the computation of segment/underReplicated/count and segment/unavailable/count, as well as the loadstatus?simple and loadstatus?full APIs. I'm not sure why they currently consider segments in the load queues, but it would make more sense to me if they only considered segments that are actually loaded. --- .../server/coordinator/DruidCoordinator.java | 4 +- .../coordinator/SegmentReplicantLookup.java | 51 +------------------ 2 files changed, 4 insertions(+), 51 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 95d3d0cd1a71..53cafa8c8bc2 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -248,7 +248,7 @@ public Map getLoadManagementPeons() ((LoadRule) rule) .getTieredReplicants() .forEach((final String tier, final Integer ruleReplicants) -> { - int currentReplicants = segmentReplicantLookup.getTotalReplicants(segment.getIdentifier(), tier); + int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getIdentifier(), tier); retVal .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()) .addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0)); @@ -268,7 +268,7 @@ public Object2LongMap getSegmentAvailability() } for (DataSegment segment : getAvailableDataSegments()) { - if (segmentReplicantLookup.getTotalReplicants(segment.getIdentifier()) == 0) { + if (segmentReplicantLookup.getLoadedReplicants(segment.getIdentifier()) == 0) { retVal.addTo(segment.getDataSource(), 1); } else { retVal.addTo(segment.getDataSource(), 0); diff --git a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java index f23f1199e86e..a713c8d181c1 100644 --- a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java @@ -36,7 +36,6 @@ public class SegmentReplicantLookup public static SegmentReplicantLookup make(DruidCluster cluster) { final Table segmentsInCluster = HashBasedTable.create(); - final Table loadingSegments = HashBasedTable.create(); for (SortedSet serversByType : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serversByType) { @@ -49,31 +48,17 @@ public static SegmentReplicantLookup make(DruidCluster cluster) } segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants); } - - // Also account for queued segments - for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) { - Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier()); - if (numReplicants == null) { - numReplicants = 0; - } - loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants); - } } } - return new SegmentReplicantLookup(segmentsInCluster, loadingSegments); + return new SegmentReplicantLookup(segmentsInCluster); } private final Table segmentsInCluster; - private final Table loadingSegments; - private SegmentReplicantLookup( - Table segmentsInCluster, - Table loadingSegments - ) + private SegmentReplicantLookup(Table segmentsInCluster) { this.segmentsInCluster = segmentsInCluster; - this.loadingSegments = loadingSegments; } public Map getClusterTiers(String segmentId) @@ -82,12 +67,6 @@ public Map getClusterTiers(String segmentId) return (retVal == null) ? Maps.newHashMap() : retVal; } - public Map getLoadingTiers(String segmentId) - { - Map retVal = loadingSegments.row(segmentId); - return (retVal == null) ? Maps.newHashMap() : retVal; - } - public int getLoadedReplicants(String segmentId) { Map allTiers = segmentsInCluster.row(segmentId); @@ -103,30 +82,4 @@ public int getLoadedReplicants(String segmentId, String tier) Integer retVal = segmentsInCluster.get(segmentId, tier); return (retVal == null) ? 0 : retVal; } - - public int getLoadingReplicants(String segmentId, String tier) - { - Integer retVal = loadingSegments.get(segmentId, tier); - return (retVal == null) ? 0 : retVal; - } - - public int getLoadingReplicants(String segmentId) - { - Map allTiers = loadingSegments.row(segmentId); - int retVal = 0; - for (Integer replicants : allTiers.values()) { - retVal += replicants; - } - return retVal; - } - - public int getTotalReplicants(String segmentId) - { - return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId); - } - - public int getTotalReplicants(String segmentId, String tier) - { - return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier); - } } From c340388da900b52ac914520b9047a549c682f2da Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 9 Oct 2017 17:53:47 -0700 Subject: [PATCH 2/3] Fix tests. --- .../server/coordinator/DruidCoordinatorRuleRunnerTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 7357c9cdf722..146b7d90b94b 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -942,7 +942,6 @@ public void testDropServerActuallyServesSegment() throws Exception mockEmptyPeon(); LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class); - EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce(); EasyMock.replay(anotherMockPeon); @@ -1411,8 +1410,8 @@ private void mockCoordinator() private void mockEmptyPeon() { - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes(); + EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes(); EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); EasyMock.replay(mockPeon); From c5b644361ec0f86c4adbe988b39987ca27d30602 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 10 Oct 2017 08:01:50 -0700 Subject: [PATCH 3/3] Fix imports. --- .../druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 146b7d90b94b..3ef19d174831 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.emitter.EmittingLogger;