diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java b/server/src/main/java/io/druid/client/ImmutableDruidServer.java index b43f9d00bbd9..922993dedc0f 100644 --- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java +++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java @@ -142,11 +142,7 @@ public boolean equals(Object o) ImmutableDruidServer that = (ImmutableDruidServer) o; - if (metadata.equals(that.metadata)) { - return false; - } - - return true; + return metadata.equals(that.metadata); } @Override diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 10612d7eb7ee..a25e70f1b83b 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -93,9 +93,11 @@ private void balanceTier( CoordinatorStats stats ) { - final BalancerStrategy strategy = params.getBalancerStrategy(); - final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(); + if (params.getAvailableSegments().size() == 0) { + log.info("Metadata segments are not available. Cannot balance."); + return; + } currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>()); if (!currentlyMovingSegments.get(tier).isEmpty()) { @@ -117,33 +119,59 @@ private void balanceTier( numSegments += sourceHolder.getServer().getSegments().size(); } + if (numSegments == 0) { log.info("No segments found. Cannot balance."); return; } + final BalancerStrategy strategy = params.getBalancerStrategy(); + final int maxSegmentsToMove = Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), numSegments); + final int maxIterations = 2 * maxSegmentsToMove; final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); long unmoved = 0L; - for (int moved = 0; (moved + unmoved) < maxSegmentsToMove;) { - final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom); - if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { - final List toMoveToWithLoadQueueCapacity = + for (int moved = 0, iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) { + final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom); + + if (segmentToMoveHolder != null && params.getAvailableSegments().contains(segmentToMoveHolder.getSegment())) { + final DataSegment segmentToMove = segmentToMoveHolder.getSegment(); + final ImmutableDruidServer fromServer = segmentToMoveHolder.getFromServer(); + // we want to leave the server the segment is currently on in the list... + // but filter out replicas that are already serving the segment, and servers with a full load queue + final List toMoveToWithLoadQueueCapacityAndNotServingSegment = toMoveTo.stream() - .filter(s -> maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad) + .filter(s -> s.getServer().equals(fromServer) || + (!s.isServingSegment(segmentToMove) && + (maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad))) .collect(Collectors.toList()); - final ServerHolder destinationHolder = - strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveToWithLoadQueueCapacity); - - if (destinationHolder != null) { - moveSegment(segmentToMove, destinationHolder.getServer(), params); - moved++; + if (toMoveToWithLoadQueueCapacityAndNotServingSegment.size() > 0) { + final ServerHolder destinationHolder = + strategy.findNewSegmentHomeBalancer(segmentToMove, toMoveToWithLoadQueueCapacityAndNotServingSegment); + + if (destinationHolder != null && !destinationHolder.getServer().equals(fromServer)) { + moveSegment(segmentToMoveHolder, destinationHolder.getServer(), params); + moved++; + } else { + log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getIdentifier()); + unmoved++; + } } else { - log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getSegment().getIdentifier()); + log.info( + "No valid movement destinations for segment [%s].", + segmentToMove.getIdentifier() + ); unmoved++; } } + if (iter >= maxIterations) { + log.info( + "Unable to select %d remaining candidate segments out of %d total to balance after %d iterations, ending run.", + (maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter + ); + break; + } } if (unmoved == maxSegmentsToMove) {