diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index cc266d705df6..b4f7e22411f1 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -20,9 +20,9 @@ package io.druid.server.coordinator.helper; import com.google.common.collect.Lists; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.server.coordinator.BalancerSegmentHolder; import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; @@ -40,6 +40,7 @@ import java.util.NavigableSet; import java.util.SortedSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; /** */ @@ -103,15 +104,16 @@ private void balanceTier( return; } - final List serverHolderList = Lists.newArrayList(servers); + final List toMoveFrom = Lists.newArrayList(servers); + final List toMoveTo = Lists.newArrayList(servers); - if (serverHolderList.size() <= 1) { + if (toMoveTo.size() <= 1) { log.info("[%s]: One or fewer servers found. Cannot balance.", tier); return; } int numSegments = 0; - for (ServerHolder server : serverHolderList) { + for (ServerHolder server : toMoveFrom) { numSegments += server.getServer().getSegments().size(); } @@ -119,20 +121,31 @@ private void balanceTier( log.info("No segments found. Cannot balance."); return; } + + final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); long unmoved = 0L; - for (int iter = 0; iter < maxSegmentsToMove; iter++) { - final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList); + for (int moved = 0; (moved + unmoved) < maxSegmentsToMove;) { + final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom); if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { - final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList); + final List toMoveToWithLoadQueueCapacity = + toMoveTo.stream() + .filter(s -> maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad) + .collect(Collectors.toList()); - if (holder != null) { - moveSegment(segmentToMove, holder.getServer(), params); + final ServerHolder destinationHolder = + strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveToWithLoadQueueCapacity); + + if (destinationHolder != null) { + moveSegment(segmentToMove, destinationHolder.getServer(), params); + moved++; } else { - ++unmoved; + log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getSegment().getIdentifier()); + unmoved++; } } } + if (unmoved == maxSegmentsToMove) { // Cluster should be alive and constantly adjusting log.info("No good moves found in tier [%s]", tier); @@ -140,7 +153,7 @@ private void balanceTier( stats.addToTieredStat("unmovedCount", tier, unmoved); stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size()); if (params.getCoordinatorDynamicConfig().emitBalancingStats()) { - strategy.emitStats(tier, stats, serverHolderList); + strategy.emitStats(tier, stats, toMoveFrom); } log.info( "[%s]: Segments Moved: [%d] Segments Let Alone: [%d]",