From b8bc536bdced3f9c006efd46ffa8d4cfe47d0536 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 3 Apr 2018 11:22:51 -0700 Subject: [PATCH] Coordinator drop segment selection through cost balancer (#5529) * drop selection through cost balancer * use collections.emptyIterator * add test to ensure does not drop from server with larger loading queue with cost balancer * javadocs and comments to clear things up * random drop for completeness --- .../server/coordinator/BalancerStrategy.java | 46 +++++++++++++++++ .../coordinator/CostBalancerStrategy.java | 49 ++++++++++++++----- .../coordinator/RandomBalancerStrategy.java | 9 ++++ .../server/coordinator/rules/LoadRule.java | 24 ++++++--- .../coordinator/rules/LoadRuleTest.java | 7 ++- 5 files changed, 117 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java index e654cb44ecf4..ec498f1154a4 100644 --- a/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/BalancerStrategy.java @@ -21,15 +21,61 @@ import io.druid.timeline.DataSegment; +import java.util.Iterator; import java.util.List; +import java.util.NavigableSet; +/** + * This interface describes the coordinator balancing strategy, which is responsible for making decisions on where + * to place {@link DataSegment}s on historical servers (described by {@link ServerHolder}). The balancing strategy + * is used by {@link io.druid.server.coordinator.rules.LoadRule} to assign and drop segments, and by + * {@link io.druid.server.coordinator.helper.DruidCoordinatorBalancer} to migrate segments between historicals. + */ public interface BalancerStrategy { + /** + * Find the best server to move a {@link DataSegment} to according the the balancing strategy. + * @param proposalSegment segment to move + * @param serverHolders servers to consider as move destinations + * @return The server to move to, or null if no move should be made or no server is suitable + */ ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List serverHolders); + /** + * Find the best server on which to place a {@link DataSegment} replica according to the balancing strategy + * @param proposalSegment segment to replicate + * @param serverHolders servers to consider as replica holders + * @return The server to replicate to, or null if no suitable server is found + */ ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List serverHolders); + /** + * Pick the best segment to move from one of the supplied set of servers according to the balancing strategy. + * @param serverHolders set of historicals to consider for moving segments + * @return {@link BalancerSegmentHolder} containing segment to move and server it current resides on + */ BalancerSegmentHolder pickSegmentToMove(List serverHolders); + /** + * Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first + * for a given drop strategy. One or more segments may be dropped, depending on how much the segment is + * over-replicated. + * @param toDropSegment segment to drop from one or more servers + * @param serverHolders set of historicals to consider dropping from + * @return Iterator for set of historicals, ordered by drop preference + */ + default Iterator pickServersToDrop(DataSegment toDropSegment, NavigableSet serverHolders) + { + // By default, use the reverse order to get the holders with least available size first. + return serverHolders.descendingIterator(); + } + + /** + * Add balancing strategy stats during the 'balanceTier' operation of + * {@link io.druid.server.coordinator.helper.DruidCoordinatorBalancer} to be included + * @param tier historical tier being balanced + * @param stats stats object to add balancing strategy stats to + * @param serverHolderList servers in tier being balanced + */ void emitStats(String tier, CoordinatorStats stats, List serverHolderList); } diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java index c4cd0f7cf882..c5ea85096f84 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java @@ -25,17 +25,20 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import io.druid.java.util.emitter.EmittingLogger; - import io.druid.java.util.common.Pair; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.timeline.DataSegment; import org.apache.commons.math3.util.FastMath; import org.joda.time.Interval; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; import java.util.List; -import java.util.concurrent.Callable; +import java.util.NavigableSet; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; public class CostBalancerStrategy implements BalancerStrategy { @@ -220,6 +223,37 @@ public BalancerSegmentHolder pickSegmentToMove(final List serverHo return sampler.getRandomBalancerSegmentHolder(serverHolders); } + @Override + public Iterator pickServersToDrop(DataSegment toDrop, NavigableSet serverHolders) + { + List>> futures = Lists.newArrayList(); + + for (final ServerHolder server : serverHolders) { + futures.add( + exec.submit( + () -> Pair.of(computeCost(toDrop, server, true), server) + ) + ); + } + + final ListenableFuture>> resultsFuture = Futures.allAsList(futures); + + try { + // results is an un-ordered list of a pair consisting of the 'cost' of a segment being on a server and the server + List> results = resultsFuture.get(); + return results.stream() + // Comparator.comapringDouble will order by lowest cost... + // reverse it because we want to drop from the highest cost servers first + .sorted(Comparator.comparingDouble((Pair o) -> o.lhs).reversed()) + .map(x -> x.rhs).collect(Collectors.toList()) + .iterator(); + } + catch (Exception e) { + log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit(); + } + return Collections.emptyIterator(); + } + /** * Calculates the initial cost of the Druid segment configuration. * @@ -342,14 +376,7 @@ protected Pair chooseBestServer( for (final ServerHolder server : serverHolders) { futures.add( exec.submit( - new Callable>() - { - @Override - public Pair call() throws Exception - { - return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server); - } - } + () -> Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server) ) ); } diff --git a/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategy.java index 8c2aed3397ab..092811831198 100644 --- a/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategy.java @@ -21,7 +21,10 @@ import io.druid.timeline.DataSegment; +import java.util.Comparator; +import java.util.Iterator; import java.util.List; +import java.util.NavigableSet; import java.util.Random; public class RandomBalancerStrategy implements BalancerStrategy @@ -54,6 +57,12 @@ public BalancerSegmentHolder pickSegmentToMove(List serverHolders) return sampler.getRandomBalancerSegmentHolder(serverHolders); } + @Override + public Iterator pickServersToDrop(DataSegment toDropSegment, NavigableSet serverHolders) + { + return serverHolders.stream().sorted(Comparator.comparingDouble(o -> new Random().nextDouble())).iterator(); + } + @Override public void emitStats(String tier, CoordinatorStats stats, List serverHolderList) { diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index 5af4d822c4b3..c604303d28a3 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -19,8 +19,9 @@ package io.druid.server.coordinator.rules; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.common.IAE; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCoordinator; @@ -39,6 +40,7 @@ import java.util.Map; import java.util.NavigableSet; import java.util.Objects; +import java.util.TreeSet; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -209,7 +211,7 @@ private ServerHolder assignPrimary( } /** - * @param stats {@link CoordinatorStats} to accumulate assignment statistics. + * @param stats {@link CoordinatorStats} to accumulate assignment statistics. * @param tierToSkip if not null, this tier will be skipped from doing assignment, use when primary replica was * assigned. */ @@ -320,7 +322,7 @@ private void drop( } else { final int currentReplicantsInTier = entry.getIntValue(); final int numToDrop = currentReplicantsInTier - targetReplicants.getOrDefault(tier, 0); - numDropped = dropForTier(numToDrop, holders, segment); + numDropped = dropForTier(numToDrop, holders, segment, params.getBalancerStrategy()); } stats.addToTieredStat(DROPPED_COUNT, tier, numDropped); @@ -346,13 +348,17 @@ private boolean loadingInProgress(final DruidCluster druidCluster) private static int dropForTier( final int numToDrop, final NavigableSet holdersInTier, - final DataSegment segment + final DataSegment segment, + final BalancerStrategy balancerStrategy ) { int numDropped = 0; - // Use the reverse order to get the holders with least available size first. - final Iterator iterator = holdersInTier.descendingIterator(); + final NavigableSet isServingSubset = + holdersInTier.stream().filter(s -> s.isServingSegment(segment)).collect(Collectors.toCollection(TreeSet::new)); + + final Iterator iterator = balancerStrategy.pickServersToDrop(segment, isServingSubset); + while (numDropped < numToDrop) { if (!iterator.hasNext()) { log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier()); @@ -364,6 +370,12 @@ private static int dropForTier( if (holder.isServingSegment(segment)) { holder.getPeon().dropSegment(segment, null); ++numDropped; + } else { + log.warn( + "Server [%s] is no longer serving segment [%s], skipping drop.", + holder.getServer().getName(), + segment.getIdentifier() + ); } } diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 0388318b7262..393caa9c2d11 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -285,6 +285,9 @@ public void testDrop() throws Exception final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expect(mockBalancerStrategy.pickServersToDrop(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .times(2); EasyMock.replay(throttler, mockPeon, mockBalancerStrategy); LoadRule rule = createLoadRule(ImmutableMap.of( @@ -429,7 +432,9 @@ public void testDropWithNonExistentTier() throws Exception final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); - + EasyMock.expect(mockBalancerStrategy.pickServersToDrop(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .times(1); EasyMock.replay(throttler, mockPeon, mockBalancerStrategy); LoadRule rule = createLoadRule(ImmutableMap.of(