From 6dd45337abbaa2c8e35c4c5cf6cd4d1a682679eb Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 9 Jul 2018 13:30:53 -0700 Subject: [PATCH 1/6] this will fix it --- .../coordinator/helper/DruidCoordinatorBalancer.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 10612d7eb7ee..05f4fbb6b804 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -94,7 +94,6 @@ private void balanceTier( ) { final BalancerStrategy strategy = params.getBalancerStrategy(); - final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(); currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>()); @@ -122,9 +121,12 @@ private void balanceTier( return; } + final int maxSegmentsToMove = Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), numSegments); + final int maxIterations = 2 * maxSegmentsToMove; + final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); long unmoved = 0L; - for (int moved = 0; (moved + unmoved) < maxSegmentsToMove;) { + for (int moved = 0, iter = 0; (moved + unmoved) < maxSegmentsToMove; iter++) { final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom); if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { @@ -144,6 +146,10 @@ private void balanceTier( unmoved++; } } + if (iter >= maxIterations) { + log.info("Unable to select %d remaining candidate segments out of %d total to balance after %d iterations, ending run.", (maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter); + break; + } } if (unmoved == maxSegmentsToMove) { From 0253fd7b96e543a4961b23c6978e77fab41c37eb Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 9 Jul 2018 14:42:05 -0700 Subject: [PATCH 2/6] filter destinations to not consider servers already serving segment --- .../helper/DruidCoordinatorBalancer.java | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 05f4fbb6b804..b3893cd3df40 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -126,23 +126,33 @@ private void balanceTier( final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); long unmoved = 0L; - for (int moved = 0, iter = 0; (moved + unmoved) < maxSegmentsToMove; iter++) { + + for (int moved = 0, iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) { final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom); if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { - final List toMoveToWithLoadQueueCapacity = + + final List toMoveToWithLoadQueueCapacityAndNotServingSegment = toMoveTo.stream() - .filter(s -> maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad) + .filter(s -> (maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad) && !s.isServingSegment(segmentToMove.getSegment())) .collect(Collectors.toList()); - final ServerHolder destinationHolder = - strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveToWithLoadQueueCapacity); - - if (destinationHolder != null) { - moveSegment(segmentToMove, destinationHolder.getServer(), params); - moved++; + if (toMoveToWithLoadQueueCapacityAndNotServingSegment.size() > 0) { + final ServerHolder destinationHolder = + strategy.findNewSegmentHomeBalancer( + segmentToMove.getSegment(), + toMoveToWithLoadQueueCapacityAndNotServingSegment + ); + + if (destinationHolder != null) { + moveSegment(segmentToMove, destinationHolder.getServer(), params); + moved++; + } else { + log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getSegment().getIdentifier()); + unmoved++; + } } else { - log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getSegment().getIdentifier()); + log.info("No valid movement destinations for segment [%s].", segmentToMove.getSegment().getIdentifier()); unmoved++; } } From b3bcfde9b67bc0708945a82e1c673ffa2177ab5d Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 9 Jul 2018 17:52:24 -0700 Subject: [PATCH 3/6] fix it --- .../helper/DruidCoordinatorBalancer.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index b3893cd3df40..4d71566f4d0e 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -132,9 +132,17 @@ private void balanceTier( if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { + // we want to leave the server the segment is currently on in the list... + // but filter out replicas that are already serving the segment, and servers with a full load queue final List toMoveToWithLoadQueueCapacityAndNotServingSegment = toMoveTo.stream() - .filter(s -> (maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad) && !s.isServingSegment(segmentToMove.getSegment())) + .filter(s -> + s.getServer().equals(segmentToMove.getFromServer()) || + ( + (maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad) && + !s.isServingSegment(segmentToMove.getSegment()) + ) + ) .collect(Collectors.toList()); if (toMoveToWithLoadQueueCapacityAndNotServingSegment.size() > 0) { @@ -144,7 +152,7 @@ private void balanceTier( toMoveToWithLoadQueueCapacityAndNotServingSegment ); - if (destinationHolder != null) { + if (destinationHolder != null && !destinationHolder.getServer().equals(segmentToMove.getFromServer())) { moveSegment(segmentToMove, destinationHolder.getServer(), params); moved++; } else { @@ -152,12 +160,18 @@ private void balanceTier( unmoved++; } } else { - log.info("No valid movement destinations for segment [%s].", segmentToMove.getSegment().getIdentifier()); + log.info( + "No valid movement destinations for segment [%s].", + segmentToMove.getSegment().getIdentifier() + ); unmoved++; } } if (iter >= maxIterations) { - log.info("Unable to select %d remaining candidate segments out of %d total to balance after %d iterations, ending run.", (maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter); + log.info( + "Unable to select %d remaining candidate segments out of %d total to balance after %d iterations, ending run.", + (maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter + ); break; } } From a0c2714fda80ebcc9f371050b80d7d57268ff44d Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 10 Jul 2018 03:32:04 -0700 Subject: [PATCH 4/6] cleanup --- .../helper/DruidCoordinatorBalancer.java | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 4d71566f4d0e..d83cd9b921e0 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -128,41 +128,35 @@ private void balanceTier( long unmoved = 0L; for (int moved = 0, iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) { - final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom); - - if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) { + final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom); + if (segmentToMoveHolder != null && params.getAvailableSegments().contains(segmentToMoveHolder.getSegment())) { + final DataSegment segmentToMove = segmentToMoveHolder.getSegment(); + final ImmutableDruidServer fromServer = segmentToMoveHolder.getFromServer(); // we want to leave the server the segment is currently on in the list... // but filter out replicas that are already serving the segment, and servers with a full load queue final List toMoveToWithLoadQueueCapacityAndNotServingSegment = toMoveTo.stream() - .filter(s -> - s.getServer().equals(segmentToMove.getFromServer()) || - ( - (maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad) && - !s.isServingSegment(segmentToMove.getSegment()) - ) - ) + .filter(s -> s.getServer().equals(fromServer) || + (!s.isServingSegment(segmentToMove) && + (maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad))) .collect(Collectors.toList()); if (toMoveToWithLoadQueueCapacityAndNotServingSegment.size() > 0) { final ServerHolder destinationHolder = - strategy.findNewSegmentHomeBalancer( - segmentToMove.getSegment(), - toMoveToWithLoadQueueCapacityAndNotServingSegment - ); + strategy.findNewSegmentHomeBalancer(segmentToMove, toMoveToWithLoadQueueCapacityAndNotServingSegment); - if (destinationHolder != null && !destinationHolder.getServer().equals(segmentToMove.getFromServer())) { - moveSegment(segmentToMove, destinationHolder.getServer(), params); + if (destinationHolder != null && !destinationHolder.getServer().equals(fromServer)) { + moveSegment(segmentToMoveHolder, destinationHolder.getServer(), params); moved++; } else { - log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getSegment().getIdentifier()); + log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getIdentifier()); unmoved++; } } else { log.info( "No valid movement destinations for segment [%s].", - segmentToMove.getSegment().getIdentifier() + segmentToMove.getIdentifier() ); unmoved++; } From 8e84ed014f3cafa72300b3f332feff6e2d1a9400 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 10 Jul 2018 04:13:22 -0700 Subject: [PATCH 5/6] fix opposite day in ImmutableDruidServer.equals --- .../main/java/io/druid/client/ImmutableDruidServer.java | 2 +- .../coordinator/helper/DruidCoordinatorBalancer.java | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java b/server/src/main/java/io/druid/client/ImmutableDruidServer.java index b43f9d00bbd9..76b7ab9da022 100644 --- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java +++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java @@ -142,7 +142,7 @@ public boolean equals(Object o) ImmutableDruidServer that = (ImmutableDruidServer) o; - if (metadata.equals(that.metadata)) { + if (!metadata.equals(that.metadata)) { return false; } diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index d83cd9b921e0..a25e70f1b83b 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -93,8 +93,11 @@ private void balanceTier( CoordinatorStats stats ) { - final BalancerStrategy strategy = params.getBalancerStrategy(); + if (params.getAvailableSegments().size() == 0) { + log.info("Metadata segments are not available. Cannot balance."); + return; + } currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>()); if (!currentlyMovingSegments.get(tier).isEmpty()) { @@ -116,14 +119,15 @@ private void balanceTier( numSegments += sourceHolder.getServer().getSegments().size(); } + if (numSegments == 0) { log.info("No segments found. Cannot balance."); return; } + final BalancerStrategy strategy = params.getBalancerStrategy(); final int maxSegmentsToMove = Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), numSegments); final int maxIterations = 2 * maxSegmentsToMove; - final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); long unmoved = 0L; From 7b509d2289eb9360192a81a3a67174bbea3d4964 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 10 Jul 2018 04:39:25 -0700 Subject: [PATCH 6/6] simplify --- .../src/main/java/io/druid/client/ImmutableDruidServer.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java b/server/src/main/java/io/druid/client/ImmutableDruidServer.java index 76b7ab9da022..922993dedc0f 100644 --- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java +++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java @@ -142,11 +142,7 @@ public boolean equals(Object o) ImmutableDruidServer that = (ImmutableDruidServer) o; - if (!metadata.equals(that.metadata)) { - return false; - } - - return true; + return metadata.equals(that.metadata); } @Override