Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,61 @@

import io.druid.timeline.DataSegment;

import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;

/**
* This interface describes the coordinator balancing strategy, which is responsible for making decisions on where
* to place {@link DataSegment}s on historical servers (described by {@link ServerHolder}). The balancing strategy
* is used by {@link io.druid.server.coordinator.rules.LoadRule} to assign and drop segments, and by
* {@link io.druid.server.coordinator.helper.DruidCoordinatorBalancer} to migrate segments between historicals.
*/
public interface BalancerStrategy
{
/**
* Find the best server to move a {@link DataSegment} to according the the balancing strategy.
* @param proposalSegment segment to move
* @param serverHolders servers to consider as move destinations
* @return The server to move to, or null if no move should be made or no server is suitable
*/
ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List<ServerHolder> serverHolders);

/**
* Find the best server on which to place a {@link DataSegment} replica according to the balancing strategy
* @param proposalSegment segment to replicate
* @param serverHolders servers to consider as replica holders
* @return The server to replicate to, or null if no suitable server is found
*/
ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List<ServerHolder> serverHolders);

/**
* Pick the best segment to move from one of the supplied set of servers according to the balancing strategy.
* @param serverHolders set of historicals to consider for moving segments
* @return {@link BalancerSegmentHolder} containing segment to move and server it current resides on
*/
BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders);

/**
* Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first
* for a given drop strategy. One or more segments may be dropped, depending on how much the segment is
* over-replicated.
* @param toDropSegment segment to drop from one or more servers
* @param serverHolders set of historicals to consider dropping from
* @return Iterator for set of historicals, ordered by drop preference
*/
default Iterator<ServerHolder> pickServersToDrop(DataSegment toDropSegment, NavigableSet<ServerHolder> serverHolders)
{
// By default, use the reverse order to get the holders with least available size first.
return serverHolders.descendingIterator();
}

/**
* Add balancing strategy stats during the 'balanceTier' operation of
* {@link io.druid.server.coordinator.helper.DruidCoordinatorBalancer} to be included
* @param tier historical tier being balanced
* @param stats stats object to add balancing strategy stats to
* @param serverHolderList servers in tier being balanced
*/
void emitStats(String tier, CoordinatorStats stats, List<ServerHolder> serverHolderList);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.java.util.emitter.EmittingLogger;

import io.druid.java.util.common.Pair;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.timeline.DataSegment;
import org.apache.commons.math3.util.FastMath;
import org.joda.time.Interval;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.NavigableSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

public class CostBalancerStrategy implements BalancerStrategy
{
Expand Down Expand Up @@ -220,6 +223,37 @@ public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHo
return sampler.getRandomBalancerSegmentHolder(serverHolders);
}

@Override
public Iterator<ServerHolder> pickServersToDrop(DataSegment toDrop, NavigableSet<ServerHolder> serverHolders)
{
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = Lists.newArrayList();

for (final ServerHolder server : serverHolders) {
futures.add(
exec.submit(
() -> Pair.of(computeCost(toDrop, server, true), server)
)
);
}

final ListenableFuture<List<Pair<Double, ServerHolder>>> resultsFuture = Futures.allAsList(futures);

try {
// results is an un-ordered list of a pair consisting of the 'cost' of a segment being on a server and the server
List<Pair<Double, ServerHolder>> results = resultsFuture.get();
return results.stream()
// Comparator.comapringDouble will order by lowest cost...
// reverse it because we want to drop from the highest cost servers first
.sorted(Comparator.comparingDouble((Pair<Double, ServerHolder> o) -> o.lhs).reversed())
.map(x -> x.rhs).collect(Collectors.toList())
.iterator();
}
catch (Exception e) {
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit();
}
return Collections.emptyIterator();
}

/**
* Calculates the initial cost of the Druid segment configuration.
*
Expand Down Expand Up @@ -342,14 +376,7 @@ protected Pair<Double, ServerHolder> chooseBestServer(
for (final ServerHolder server : serverHolders) {
futures.add(
exec.submit(
new Callable<Pair<Double, ServerHolder>>()
{
@Override
public Pair<Double, ServerHolder> call() throws Exception
{
return Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server);
}
}
() -> Pair.of(computeCost(proposalSegment, server, includeCurrentServer), server)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import io.druid.timeline.DataSegment;

import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Random;

public class RandomBalancerStrategy implements BalancerStrategy
Expand Down Expand Up @@ -54,6 +57,12 @@ public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
return sampler.getRandomBalancerSegmentHolder(serverHolders);
}

@Override
public Iterator<ServerHolder> pickServersToDrop(DataSegment toDropSegment, NavigableSet<ServerHolder> serverHolders)
{
return serverHolders.stream().sorted(Comparator.comparingDouble(o -> new Random().nextDouble())).iterator();
}

@Override
public void emitStats(String tier, CoordinatorStats stats, List<ServerHolder> serverHolderList)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

package io.druid.server.coordinator.rules;

import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.common.IAE;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
Expand All @@ -39,6 +40,7 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -209,7 +211,7 @@ private ServerHolder assignPrimary(
}

/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
* @param tierToSkip if not null, this tier will be skipped from doing assignment, use when primary replica was
* assigned.
*/
Expand Down Expand Up @@ -320,7 +322,7 @@ private void drop(
} else {
final int currentReplicantsInTier = entry.getIntValue();
final int numToDrop = currentReplicantsInTier - targetReplicants.getOrDefault(tier, 0);
numDropped = dropForTier(numToDrop, holders, segment);
numDropped = dropForTier(numToDrop, holders, segment, params.getBalancerStrategy());
}

stats.addToTieredStat(DROPPED_COUNT, tier, numDropped);
Expand All @@ -346,13 +348,17 @@ private boolean loadingInProgress(final DruidCluster druidCluster)
private static int dropForTier(
final int numToDrop,
final NavigableSet<ServerHolder> holdersInTier,
final DataSegment segment
final DataSegment segment,
final BalancerStrategy balancerStrategy
)
{
int numDropped = 0;

// Use the reverse order to get the holders with least available size first.
final Iterator<ServerHolder> iterator = holdersInTier.descendingIterator();
final NavigableSet<ServerHolder> isServingSubset =
holdersInTier.stream().filter(s -> s.isServingSegment(segment)).collect(Collectors.toCollection(TreeSet::new));

final Iterator<ServerHolder> iterator = balancerStrategy.pickServersToDrop(segment, isServingSubset);

while (numDropped < numToDrop) {
if (!iterator.hasNext()) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
Expand All @@ -364,6 +370,12 @@ private static int dropForTier(
if (holder.isServingSegment(segment)) {
holder.getPeon().dropSegment(segment, null);
++numDropped;
} else {
log.warn(
"Server [%s] is no longer serving segment [%s], skipping drop.",
holder.getServer().getName(),
segment.getIdentifier()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ public void testDrop() throws Exception
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockBalancerStrategy.pickServersToDrop(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(2);
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);

LoadRule rule = createLoadRule(ImmutableMap.of(
Expand Down Expand Up @@ -429,7 +432,9 @@ public void testDropWithNonExistentTier() throws Exception
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();

EasyMock.expect(mockBalancerStrategy.pickServersToDrop(EasyMock.anyObject(), EasyMock.anyObject()))
.andDelegateTo(balancerStrategy)
.times(1);
EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);

LoadRule rule = createLoadRule(ImmutableMap.of(
Expand Down