Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class SegmentReplicantLookup
public static SegmentReplicantLookup make(DruidCluster cluster)
{
final Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
final Table<String, String, Integer> loadingSegments = HashBasedTable.create();

for (SortedSet<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serversByType) {
Expand All @@ -48,17 +49,29 @@ public static SegmentReplicantLookup make(DruidCluster cluster)
}
segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}

// Also account for queued segments
for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) {
Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier());
if (numReplicants == null) {
numReplicants = 0;
}
loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants);
}
}
}

return new SegmentReplicantLookup(segmentsInCluster);
return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
}

private final Table<String, String, Integer> segmentsInCluster;

private SegmentReplicantLookup(Table<String, String, Integer> segmentsInCluster)
private final Table<String, String, Integer> loadingSegments;

private SegmentReplicantLookup(Table<String, String, Integer> segmentsInCluster, Table<String, String, Integer> loadingSegments)
{
this.segmentsInCluster = segmentsInCluster;
this.loadingSegments = loadingSegments;
}

public Map<String, Integer> getClusterTiers(String segmentId)
Expand All @@ -82,4 +95,30 @@ public int getLoadedReplicants(String segmentId, String tier)
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}

public int getLoadingReplicants(String segmentId, String tier)
{
Integer retVal = loadingSegments.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
}

public int getLoadingReplicants(String segmentId)
{
Map<String, Integer> allTiers = loadingSegments.row(segmentId);
int retVal = 0;
for (Integer replicants : allTiers.values()) {
retVal += replicants;
}
return retVal;
}

public int getTotalReplicants(String segmentId)
{
return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId);
}

public int getTotalReplicants(String segmentId, String tier)
{
return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ private void assign(
final CoordinatorStats stats
)
{
// if primary replica already exists
if (!currentReplicants.isEmpty()) {
// if primary replica already exists or is loading
final int loading = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier());
if (!currentReplicants.isEmpty() || loading > 0) {
assignReplicas(params, segment, stats, null);
} else {
final ServerHolder primaryHolderToLoad = assignPrimary(params, segment);
Expand Down Expand Up @@ -169,7 +170,6 @@ private ServerHolder assignPrimary(
if (targetReplicantsInTier <= 0) {
continue;
}

final String tier = entry.getKey();

final List<ServerHolder> holders = getFilteredHolders(
Expand Down Expand Up @@ -228,7 +228,7 @@ private void assignReplicas(
final int numAssigned = assignReplicasForTier(
tier,
entry.getIntValue(),
currentReplicants.getOrDefault(tier, 0),
params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier),
params,
createLoadQueueSizeLimitingPredicate(params),
segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.DruidServer;
Expand Down Expand Up @@ -942,6 +943,8 @@ public void testDropServerActuallyServesSegment()

LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce();
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes();

EasyMock.replay(anotherMockPeon);

DruidCluster druidCluster = new DruidCluster(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package io.druid.server.coordinator.rules;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -58,7 +60,9 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -190,6 +194,127 @@ public void testLoad()
EasyMock.verify(throttler, mockPeon, mockBalancerStrategy);
}

@Test
public void testLoadPrimaryAssignDoesNotOverAssign()
{
EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes();

final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();

LoadRule rule = createLoadRule(ImmutableMap.of(
"hot", 1
));

final DataSegment segment = createDataSegment("foo");

EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.anyTimes();

EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);

DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
mockPeon
), new ServerHolder(
new DruidServer(
"serverHot2",
"hostHot2",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);

CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);


Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));

// ensure multiple runs don't assign primary segment again if at replication count
final LoadQueuePeon loadingPeon = createLoadingPeon(ImmutableList.of(segment));
EasyMock.replay(loadingPeon);

DruidCluster afterLoad = new DruidCluster(
null,
ImmutableMap.of(
"hot",
Stream.of(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
loadingPeon
), new ServerHolder(
new DruidServer(
"serverHot2",
"hostHot2",
null,
1000,
ServerType.HISTORICAL,
"hot",
1
).toImmutableDruidServer(),
mockPeon
)
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
)
);
CoordinatorStats statsAfterLoadPrimary = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(afterLoad)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(afterLoad))
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);


Assert.assertEquals(0, statsAfterLoadPrimary.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));

EasyMock.verify(throttler, mockPeon, mockBalancerStrategy);
}

@Test
public void testLoadPriority()
{
Expand Down Expand Up @@ -619,4 +744,18 @@ private static LoadQueuePeon createEmptyPeon()

return mockPeon;
}

private static LoadQueuePeon createLoadingPeon(List<DataSegment> segments)
{
final Set<DataSegment> segs = ImmutableSet.copyOf(segments);
final long loadingSize = segs.stream().mapToLong(DataSegment::getSize).sum();

final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(segs).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(loadingSize).anyTimes();
EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(segs.size()).anyTimes();

return mockPeon;
}
}