diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 95d3d0cd1a71..53cafa8c8bc2 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -248,7 +248,7 @@ public Map getLoadManagementPeons() ((LoadRule) rule) .getTieredReplicants() .forEach((final String tier, final Integer ruleReplicants) -> { - int currentReplicants = segmentReplicantLookup.getTotalReplicants(segment.getIdentifier(), tier); + int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getIdentifier(), tier); retVal .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()) .addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0)); @@ -268,7 +268,7 @@ public Object2LongMap getSegmentAvailability() } for (DataSegment segment : getAvailableDataSegments()) { - if (segmentReplicantLookup.getTotalReplicants(segment.getIdentifier()) == 0) { + if (segmentReplicantLookup.getLoadedReplicants(segment.getIdentifier()) == 0) { retVal.addTo(segment.getDataSource(), 1); } else { retVal.addTo(segment.getDataSource(), 0); diff --git a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java index f23f1199e86e..a713c8d181c1 100644 --- a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java @@ -36,7 +36,6 @@ public class SegmentReplicantLookup public static SegmentReplicantLookup make(DruidCluster cluster) { final Table segmentsInCluster = HashBasedTable.create(); - final Table loadingSegments = HashBasedTable.create(); for (SortedSet serversByType : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serversByType) { @@ -49,31 +48,17 @@ public static SegmentReplicantLookup make(DruidCluster cluster) } segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants); } - - // Also account for queued segments - for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) { - Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier()); - if (numReplicants == null) { - numReplicants = 0; - } - loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants); - } } } - return new SegmentReplicantLookup(segmentsInCluster, loadingSegments); + return new SegmentReplicantLookup(segmentsInCluster); } private final Table segmentsInCluster; - private final Table loadingSegments; - private SegmentReplicantLookup( - Table segmentsInCluster, - Table loadingSegments - ) + private SegmentReplicantLookup(Table segmentsInCluster) { this.segmentsInCluster = segmentsInCluster; - this.loadingSegments = loadingSegments; } public Map getClusterTiers(String segmentId) @@ -82,12 +67,6 @@ public Map getClusterTiers(String segmentId) return (retVal == null) ? Maps.newHashMap() : retVal; } - public Map getLoadingTiers(String segmentId) - { - Map retVal = loadingSegments.row(segmentId); - return (retVal == null) ? Maps.newHashMap() : retVal; - } - public int getLoadedReplicants(String segmentId) { Map allTiers = segmentsInCluster.row(segmentId); @@ -103,30 +82,4 @@ public int getLoadedReplicants(String segmentId, String tier) Integer retVal = segmentsInCluster.get(segmentId, tier); return (retVal == null) ? 0 : retVal; } - - public int getLoadingReplicants(String segmentId, String tier) - { - Integer retVal = loadingSegments.get(segmentId, tier); - return (retVal == null) ? 0 : retVal; - } - - public int getLoadingReplicants(String segmentId) - { - Map allTiers = loadingSegments.row(segmentId); - int retVal = 0; - for (Integer replicants : allTiers.values()) { - retVal += replicants; - } - return retVal; - } - - public int getTotalReplicants(String segmentId) - { - return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId); - } - - public int getTotalReplicants(String segmentId, String tier) - { - return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier); - } } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 7357c9cdf722..3ef19d174831 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.emitter.EmittingLogger; @@ -942,7 +941,6 @@ public void testDropServerActuallyServesSegment() throws Exception mockEmptyPeon(); LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class); - EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce(); EasyMock.replay(anotherMockPeon); @@ -1411,8 +1409,8 @@ private void mockCoordinator() private void mockEmptyPeon() { - EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).atLeastOnce(); - EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes(); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes(); + EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes(); EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce(); EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes(); EasyMock.replay(mockPeon);