Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,7 @@ public boolean equals(Object o)

ImmutableDruidServer that = (ImmutableDruidServer) o;

if (metadata.equals(that.metadata)) {
return false;
}

return true;
return metadata.equals(that.metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ private void balanceTier(
CoordinatorStats stats
)
{
final BalancerStrategy strategy = params.getBalancerStrategy();
final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();

if (params.getAvailableSegments().size() == 0) {
log.info("Metadata segments are not available. Cannot balance.");
return;
}
currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>());

if (!currentlyMovingSegments.get(tier).isEmpty()) {
Expand All @@ -117,33 +119,59 @@ private void balanceTier(
numSegments += sourceHolder.getServer().getSegments().size();
}


if (numSegments == 0) {
log.info("No segments found. Cannot balance.");
return;
}

final BalancerStrategy strategy = params.getBalancerStrategy();
final int maxSegmentsToMove = Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), numSegments);
final int maxIterations = 2 * maxSegmentsToMove;
final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
long unmoved = 0L;
for (int moved = 0; (moved + unmoved) < maxSegmentsToMove;) {
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom);

if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
final List<ServerHolder> toMoveToWithLoadQueueCapacity =
for (int moved = 0, iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom);

if (segmentToMoveHolder != null && params.getAvailableSegments().contains(segmentToMoveHolder.getSegment())) {
final DataSegment segmentToMove = segmentToMoveHolder.getSegment();
final ImmutableDruidServer fromServer = segmentToMoveHolder.getFromServer();
// we want to leave the server the segment is currently on in the list...
// but filter out replicas that are already serving the segment, and servers with a full load queue
final List<ServerHolder> toMoveToWithLoadQueueCapacityAndNotServingSegment =
toMoveTo.stream()
.filter(s -> maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad)
.filter(s -> s.getServer().equals(fromServer) ||
(!s.isServingSegment(segmentToMove) &&
(maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad)))
.collect(Collectors.toList());

final ServerHolder destinationHolder =
strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveToWithLoadQueueCapacity);

if (destinationHolder != null) {
moveSegment(segmentToMove, destinationHolder.getServer(), params);
moved++;
if (toMoveToWithLoadQueueCapacityAndNotServingSegment.size() > 0) {
final ServerHolder destinationHolder =
strategy.findNewSegmentHomeBalancer(segmentToMove, toMoveToWithLoadQueueCapacityAndNotServingSegment);

if (destinationHolder != null && !destinationHolder.getServer().equals(fromServer)) {
moveSegment(segmentToMoveHolder, destinationHolder.getServer(), params);
moved++;
} else {
log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getIdentifier());
unmoved++;
}
} else {
log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getSegment().getIdentifier());
log.info(
"No valid movement destinations for segment [%s].",
segmentToMove.getIdentifier()
);
unmoved++;
}
}
if (iter >= maxIterations) {
log.info(
"Unable to select %d remaining candidate segments out of %d total to balance after %d iterations, ending run.",
(maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter
);
break;
}
}

if (unmoved == maxSegmentsToMove) {
Expand Down