Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public Map<String, LoadQueuePeon> getLoadManagementPeons()
((LoadRule) rule)
.getTieredReplicants()
.forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getTotalReplicants(segment.getIdentifier(), tier);
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getIdentifier(), tier);
retVal
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>())
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
Expand All @@ -268,7 +268,7 @@ public Object2LongMap<String> getSegmentAvailability()
}

for (DataSegment segment : getAvailableDataSegments()) {
if (segmentReplicantLookup.getTotalReplicants(segment.getIdentifier()) == 0) {
if (segmentReplicantLookup.getLoadedReplicants(segment.getIdentifier()) == 0) {
retVal.addTo(segment.getDataSource(), 1);
} else {
retVal.addTo(segment.getDataSource(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public class SegmentReplicantLookup
public static SegmentReplicantLookup make(DruidCluster cluster)
{
final Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
final Table<String, String, Integer> loadingSegments = HashBasedTable.create();

for (SortedSet<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serversByType) {
Expand All @@ -49,31 +48,17 @@ public static SegmentReplicantLookup make(DruidCluster cluster)
}
segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}

// Also account for queued segments
for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) {
Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier());
if (numReplicants == null) {
numReplicants = 0;
}
loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}
}
}

return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
return new SegmentReplicantLookup(segmentsInCluster);
}

private final Table<String, String, Integer> segmentsInCluster;
private final Table<String, String, Integer> loadingSegments;

private SegmentReplicantLookup(
Table<String, String, Integer> segmentsInCluster,
Table<String, String, Integer> loadingSegments
)
private SegmentReplicantLookup(Table<String, String, Integer> segmentsInCluster)
{
this.segmentsInCluster = segmentsInCluster;
this.loadingSegments = loadingSegments;
}

public Map<String, Integer> getClusterTiers(String segmentId)
Expand All @@ -82,12 +67,6 @@ public Map<String, Integer> getClusterTiers(String segmentId)
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
}

public Map<String, Integer> getLoadingTiers(String segmentId)
{
Map<String, Integer> retVal = loadingSegments.row(segmentId);
return (retVal == null) ? Maps.<String, Integer>newHashMap() : retVal;
}

public int getLoadedReplicants(String segmentId)
{
Map<String, Integer> allTiers = segmentsInCluster.row(segmentId);
Expand All @@ -103,30 +82,4 @@ public int getLoadedReplicants(String segmentId, String tier)
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}

public int getLoadingReplicants(String segmentId, String tier)
{
Integer retVal = loadingSegments.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}

public int getLoadingReplicants(String segmentId)
{
Map<String, Integer> allTiers = loadingSegments.row(segmentId);
int retVal = 0;
for (Integer replicants : allTiers.values()) {
retVal += replicants;
}
return retVal;
}

public int getTotalReplicants(String segmentId)
{
return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId);
}

public int getTotalReplicants(String segmentId, String tier)
{
return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.EmittingLogger;
Expand Down Expand Up @@ -942,7 +941,6 @@ public void testDropServerActuallyServesSegment() throws Exception
mockEmptyPeon();

LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
EasyMock.replay(anotherMockPeon);

Expand Down Expand Up @@ -1411,8 +1409,8 @@ private void mockCoordinator()

private void mockEmptyPeon()
{
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(new HashSet<>()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(0).anyTimes();
EasyMock.replay(mockPeon);
Expand Down