diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 6f18386c1c5e..657b652b466a 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -746,6 +746,7 @@ A sample Coordinator dynamic config JSON object is shown below: "mergeBytesLimit": 100000000, "mergeSegmentsLimit" : 1000, "maxSegmentsToMove": 5, + "percentOfSegmentsToConsiderPerMove": 100, "replicantLifetime": 15, "replicationThrottleLimit": 10, "emitBalancingStats": false, @@ -764,6 +765,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| +|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.|100| |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| diff --git a/docs/operations/basic-cluster-tuning.md b/docs/operations/basic-cluster-tuning.md index c3413eca1b99..1f7253c23f21 100644 --- a/docs/operations/basic-cluster-tuning.md +++ b/docs/operations/basic-cluster-tuning.md @@ -280,6 +280,23 @@ The heap requirements of the Coordinator scale with the number of servers, segme You can set the Coordinator heap to the same size as your Broker heap, or slightly smaller: both services have to process cluster-wide state and answer API requests about this state. +#### Dynamic Configuration + +`percentOfSegmentsToConsiderPerMove` +* The default value is 100. This means that the Coordinator will consider all segments when it is looking for a segment to move. The Coordinator makes a weighted choice, with segments on Servers with the least capacity being the most likely segments to be moved. + * This weighted selection strategy means that the segments on the servers who have the most available capacity are the least likely to be chosen. + * As the number of segments in the cluster increases, the probability of choosing the Nth segment to move decreases; where N is the last segment considered for moving. + * An admin can use this config to skip consideration of that Nth segment. +* Instead of skipping a precise amount of segments, we skip a percentage of segments in the cluster. + * For example, with the value set to 25, only the first 25% of segments will be considered as a segment that can be moved. This 25% of segments will come from the servers that have the least available capacity. + * In this example, each time the Coordinator looks for a segment to move, it will consider 75% less segments than it did when the configuration was 100. On clusters with hundreds of thousands of segments, this can add up to meaningful coordination time savings. +* General recommendations for this configuration: + * If you are not worried about the amount of time it takes your Coordinator to complete a full coordination cycle, you likely do not need to modify this config. + * If you are frustrated with how long the Coordinator takes to run a full coordination cycle, and you have set the Coordinator dynamic config `maxSegmentsToMove` to a value above 0 (the default is 5), setting this config to a non-default value can help shorten coordination time. + * The recommended starting point value is 66. It represents a meaningful decrease in the percentage of segments considered while also not being too aggressive (You will consider 1/3 fewer segments per move operation with this value). +* The impact that modifying this config will have on your coordination time will be a function of how low you set the config value, the value for `maxSegmentsToMove` and the total number of segments in your cluster. + * If your cluster has a relatively small number of segments, or you choose to move few segments per coordination cycle, there may not be much savings to be had here. + ### Overlord The main performance-related setting on the Overlord is the heap size. diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java index 889c167c8ef5..db451693674a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java @@ -63,11 +63,20 @@ public interface BalancerStrategy * NOTE: this should really be handled on a per-segment basis, to properly support * the interval or period-based broadcast rules. For simplicity of the initial * implementation, only forever broadcast rules are supported. + * @param percentOfSegmentsToConsider The percentage of the total number of segments that we will consider when + * choosing which segment to move. {@link CoordinatorDynamicConfig} defines a + * config percentOfSegmentsToConsiderPerMove that will be used as an argument + * for implementations of this method. + * * @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if * there are no segments to pick from (i. e. all provided serverHolders are empty). */ @Nullable - BalancerSegmentHolder pickSegmentToMove(List serverHolders, Set broadcastDatasources); + BalancerSegmentHolder pickSegmentToMove( + List serverHolders, + Set broadcastDatasources, + double percentOfSegmentsToConsider + ); /** * Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index bafeb73c5083..4415f6a8ce9c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -53,6 +53,7 @@ public class CoordinatorDynamicConfig private final long mergeBytesLimit; private final int mergeSegmentsLimit; private final int maxSegmentsToMove; + private final double percentOfSegmentsToConsiderPerMove; private final int replicantLifetime; private final int replicationThrottleLimit; private final int balancerComputeThreads; @@ -95,6 +96,7 @@ public CoordinatorDynamicConfig( @JsonProperty("mergeBytesLimit") long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove, + @JsonProperty("percentOfSegmentsToConsiderPerMove") double percentOfSegmentsToConsiderPerMove, @JsonProperty("replicantLifetime") int replicantLifetime, @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit, @JsonProperty("balancerComputeThreads") int balancerComputeThreads, @@ -123,6 +125,13 @@ public CoordinatorDynamicConfig( this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; + + Preconditions.checkArgument( + percentOfSegmentsToConsiderPerMove > 0 && percentOfSegmentsToConsiderPerMove <= 100, + "percentOfSegmentsToConsiderPerMove should be between 1 and 100!" + ); + this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; + this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; this.balancerComputeThreads = Math.max(balancerComputeThreads, 1); @@ -211,6 +220,12 @@ public int getMaxSegmentsToMove() return maxSegmentsToMove; } + @JsonProperty + public double getPercentOfSegmentsToConsiderPerMove() + { + return percentOfSegmentsToConsiderPerMove; + } + @JsonProperty public int getReplicantLifetime() { @@ -302,6 +317,7 @@ public String toString() ", mergeBytesLimit=" + mergeBytesLimit + ", mergeSegmentsLimit=" + mergeSegmentsLimit + ", maxSegmentsToMove=" + maxSegmentsToMove + + ", percentOfSegmentsToConsiderPerMove=" + percentOfSegmentsToConsiderPerMove + ", replicantLifetime=" + replicantLifetime + ", replicationThrottleLimit=" + replicationThrottleLimit + ", balancerComputeThreads=" + balancerComputeThreads + @@ -341,6 +357,9 @@ public boolean equals(Object o) if (maxSegmentsToMove != that.maxSegmentsToMove) { return false; } + if (percentOfSegmentsToConsiderPerMove != that.percentOfSegmentsToConsiderPerMove) { + return false; + } if (replicantLifetime != that.replicantLifetime) { return false; } @@ -382,6 +401,7 @@ public int hashCode() mergeBytesLimit, mergeSegmentsLimit, maxSegmentsToMove, + percentOfSegmentsToConsiderPerMove, replicantLifetime, replicationThrottleLimit, balancerComputeThreads, @@ -408,6 +428,7 @@ public static class Builder private static final long DEFAULT_MERGE_BYTES_LIMIT = 524_288_000L; private static final int DEFAULT_MERGE_SEGMENTS_LIMIT = 100; private static final int DEFAULT_MAX_SEGMENTS_TO_MOVE = 5; + private static final int DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE = 100; private static final int DEFAULT_REPLICANT_LIFETIME = 15; private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10; private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1; @@ -421,6 +442,7 @@ public static class Builder private Long mergeBytesLimit; private Integer mergeSegmentsLimit; private Integer maxSegmentsToMove; + private Double percentOfSegmentsToConsiderPerMove; private Integer replicantLifetime; private Integer replicationThrottleLimit; private Boolean emitBalancingStats; @@ -444,6 +466,7 @@ public Builder( @JsonProperty("mergeBytesLimit") @Nullable Long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove, + @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, @JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime, @JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit, @JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads, @@ -463,6 +486,7 @@ public Builder( this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; + this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; this.balancerComputeThreads = balancerComputeThreads; @@ -500,6 +524,12 @@ public Builder withMaxSegmentsToMove(int maxSegmentsToMove) return this; } + public Builder withPercentOfSegmentsToConsiderPerMove(double percentOfSegmentsToConsiderPerMove) + { + this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; + return this; + } + public Builder withReplicantLifetime(int replicantLifetime) { this.replicantLifetime = replicantLifetime; @@ -569,6 +599,8 @@ public CoordinatorDynamicConfig build() mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : mergeBytesLimit, mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : mergeSegmentsLimit, maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : maxSegmentsToMove, + percentOfSegmentsToConsiderPerMove == null ? DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE + : percentOfSegmentsToConsiderPerMove, replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : replicantLifetime, replicationThrottleLimit == null ? DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit, balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads, @@ -598,6 +630,7 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) mergeBytesLimit == null ? defaults.getMergeBytesLimit() : mergeBytesLimit, mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit, maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove, + percentOfSegmentsToConsiderPerMove == null ? defaults.getPercentOfSegmentsToConsiderPerMove() : percentOfSegmentsToConsiderPerMove, replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime, replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit, balancerComputeThreads == null ? defaults.getBalancerComputeThreads() : balancerComputeThreads, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java index e5e3cb57b129..ac56544b0730 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java @@ -214,10 +214,15 @@ static double computeJointSegmentsCost(final DataSegment segment, final Iterable @Override public BalancerSegmentHolder pickSegmentToMove( final List serverHolders, - Set broadcastDatasources + Set broadcastDatasources, + double percentOfSegmentsToConsider ) { - return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources); + return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( + serverHolders, + broadcastDatasources, + percentOfSegmentsToConsider + ); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java index de3e46e66f43..8f3b96d67ab2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java @@ -54,9 +54,17 @@ public ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List } @Override - public BalancerSegmentHolder pickSegmentToMove(List serverHolders, Set broadcastDatasources) + public BalancerSegmentHolder pickSegmentToMove( + List serverHolders, + Set broadcastDatasources, + double percentOfSegmentsToConsider + ) { - return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources); + return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( + serverHolders, + broadcastDatasources, + percentOfSegmentsToConsider + ); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java index 7181d52e152a..dd43760ec874 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.DataSegment; import java.util.List; @@ -28,15 +29,54 @@ final class ReservoirSegmentSampler { + private static final EmittingLogger log = new EmittingLogger(ReservoirSegmentSampler.class); + + /** + * Iterates over segments that live on the candidate servers passed in {@link ServerHolder} and (possibly) picks a + * segment to return to caller in a {@link BalancerSegmentHolder} object. + * + * @param serverHolders List of {@link ServerHolder} objects containing segments who are candidates to be chosen. + * @param broadcastDatasources Set of DataSource names that identify broadcast datasources. We don't want to consider + * segments from these datasources. + * @param percentOfSegmentsToConsider The % of total cluster segments to consider before short-circuiting and + * returning immediately. + * @return + */ static BalancerSegmentHolder getRandomBalancerSegmentHolder( final List serverHolders, - Set broadcastDatasources + Set broadcastDatasources, + double percentOfSegmentsToConsider ) { ServerHolder fromServerHolder = null; DataSegment proposalSegment = null; + int calculatedSegmentLimit = Integer.MAX_VALUE; int numSoFar = 0; + // Reset a bad value of percentOfSegmentsToConsider to 100. We don't allow consideration less than or equal to + // 0% of segments or greater than 100% of segments. + if (percentOfSegmentsToConsider <= 0 || percentOfSegmentsToConsider > 100) { + log.warn("Resetting percentOfSegmentsToConsider to 100 because only values from 1 to 100 are allowed." + + " You Provided [%f]", percentOfSegmentsToConsider); + percentOfSegmentsToConsider = 100; + } + + // Calculate the integer limit for the number of segments to be considered for moving if % is less than 100 + if (percentOfSegmentsToConsider < 100) { + int totalSegments = 0; + for (ServerHolder server : serverHolders) { + totalSegments += server.getServer().getNumSegments(); + } + // If totalSegments are zero, we will assume it is a mistake and move on to iteration without updating + // calculatedSegmentLimit + if (totalSegments != 0) { + calculatedSegmentLimit = (int) Math.ceil((double) totalSegments * (percentOfSegmentsToConsider / 100.0)); + } else { + log.warn("Unable to calculate limit on segments to consider because ServerHolder collection indicates" + + " zero segments existing in the cluster."); + } + } + for (ServerHolder server : serverHolders) { if (!server.getServer().getType().isSegmentReplicationTarget()) { // if the server only handles broadcast segments (which don't need to be rebalanced), we have nothing to do @@ -56,6 +96,19 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder( proposalSegment = segment; } numSoFar++; + + // We have iterated over the alloted number of segments and will return the currently proposed segment or null + // We will only break out early if we are iterating less than 100% of the total cluster segments + if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) { + log.debug("Breaking out of iteration over potential segments to move because we hit the limit [%f percent] of" + + " segments to consider to move. Segments Iterated: [%d]", percentOfSegmentsToConsider, numSoFar); + break; + } + } + // We have iterated over the alloted number of segments and will return the currently proposed segment or null + // We will only break out early if we are iterating less than 100% of the total cluster segments + if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) { + break; } } if (fromServerHolder != null) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index a1c5237ddd15..d1fca19e03be 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -189,7 +189,8 @@ private Pair balanceServers( for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) { final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove( toMoveFrom, - params.getBroadcastDatasources() + params.getBroadcastDatasources(), + params.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove() ); if (segmentToMoveHolder == null) { log.info("All servers to move segments from are empty, ending run."); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index f37c92cb1056..26175f613216 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -232,10 +232,19 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove() mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, false)), broadcastDatasources)) - .andReturn(new BalancerSegmentHolder(druidServer2, segment3)) - .andReturn(new BalancerSegmentHolder(druidServer2, segment4)); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect( + strategy.pickSegmentToMove( + ImmutableList.of( + new ServerHolder(druidServer2, peon2, false) + ), + broadcastDatasources, + 100 + ) + ).andReturn( + new BalancerSegmentHolder(druidServer2, segment3)).andReturn(new BalancerSegmentHolder(druidServer2, segment4) + ); + + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .andReturn(new BalancerSegmentHolder(druidServer1, segment2)); @@ -300,7 +309,7 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissi mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .andReturn(new BalancerSegmentHolder(druidServer1, segment2)) .andReturn(new BalancerSegmentHolder(druidServer2, segment3)) @@ -349,7 +358,7 @@ public void testMoveToDecommissioningServer() mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .anyTimes(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> { @@ -384,7 +393,7 @@ public void testMoveFromDecommissioningServer() ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .once(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) @@ -527,6 +536,76 @@ public void testRun2() Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); } + /** + * Testing that the dynamic coordinator config value, percentOfSegmentsToConsiderPerMove, is honored when calling + * out to pickSegmentToMove. This config limits the number of segments that are considered when looking for a segment + * to move. + */ + @Test + public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove() + { + mockDruidServer(druidServer1, "1", "normal", 50L, 100L, Arrays.asList(segment1, segment2)); + mockDruidServer(druidServer2, "2", "normal", 30L, 100L, Arrays.asList(segment3, segment4)); + mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyList()); + + EasyMock.replay(druidServer4); + + mockCoordinator(coordinator); + + BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); + + // The first call for decommissioning servers + EasyMock.expect( + strategy.pickSegmentToMove( + ImmutableList.of(), + broadcastDatasources, + 40 + ) + ) + .andReturn(null); + + // The second call for the single non decommissioning server move + EasyMock.expect( + strategy.pickSegmentToMove( + ImmutableList.of( + new ServerHolder(druidServer3, peon3, false), + new ServerHolder(druidServer2, peon2, false), + new ServerHolder(druidServer1, peon1, false) + ), + broadcastDatasources, + 40 + ) + ) + .andReturn(new BalancerSegmentHolder(druidServer2, segment3)); + + EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(new ServerHolder(druidServer3, peon3)) + .anyTimes(); + EasyMock.replay(strategy); + + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder( + ImmutableList.of(druidServer1, druidServer2, druidServer3), + ImmutableList.of(peon1, peon2, peon3), + ImmutableList.of(false, false, false) + ) + .withDynamicConfigs( + CoordinatorDynamicConfig.builder() + .withMaxSegmentsToMove(1) + .withPercentOfSegmentsToConsiderPerMove(40) + .build() + ) + .withBalancerStrategy(strategy) + .withBroadcastDatasources(broadcastDatasources) + .build(); + + params = new BalanceSegmentsTester(coordinator).run(params); + Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); + Assert.assertThat( + peon3.getSegmentsToLoad(), + Matchers.is(Matchers.equalTo(ImmutableSet.of(segment3))) + ); + } + private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder( List druidServers, List peons @@ -637,7 +716,11 @@ public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, Li } @Override - public BalancerSegmentHolder pickSegmentToMove(List serverHolders, Set broadcastDatasources) + public BalancerSegmentHolder pickSegmentToMove( + List serverHolders, + Set broadcastDatasources, + double percentOfSegmentsToConsider + ) { return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size()); } @@ -661,9 +744,18 @@ private DruidCoordinatorRuntimeParams setupParamsForDecommissioningMaxPercentOfM // either decommissioning servers list or acitve ones (ie servers list is [2] or [1, 3]) BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true)), broadcastDatasources)) - .andReturn(new BalancerSegmentHolder(druidServer2, segment2)); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect( + strategy.pickSegmentToMove( + ImmutableList.of( + new ServerHolder(druidServer2, peon2, true) + ), + broadcastDatasources, + 100 + ) + ).andReturn( + new BalancerSegmentHolder(druidServer2, segment2) + ); + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new ServerHolder(druidServer3, peon3)) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java index 8f84ad6e2956..f59f24274038 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java @@ -174,14 +174,74 @@ public void getRandomBalancerSegmentHolderTest() Map segmentCountMap = new HashMap<>(); for (int i = 0; i < iterations; i++) { // due to the pseudo-randomness of this method, we may not select a segment every single time no matter what. - BalancerSegmentHolder balancerSegmentHolder = ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet()); + BalancerSegmentHolder balancerSegmentHolder = ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 100); if (balancerSegmentHolder != null) { segmentCountMap.put(balancerSegmentHolder.getSegment(), 1); } } for (DataSegment segment : segments) { - Assert.assertEquals(segmentCountMap.get(segment), new Integer(1)); + Assert.assertEquals(new Integer(1), segmentCountMap.get(segment)); + } + + EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4); + EasyMock.verify(holder1, holder2, holder3, holder4); + } + + /** + * Makes sure that the segment on server4 is never chosen in 5k iterations because it should never have its segment + * checked due to the limit on segment candidates + */ + @Test + public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit() + { + int iterations = 5000; + + EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).times(iterations); + ImmutableDruidServerTests.expectSegments(druidServer1, segments1); + EasyMock.replay(druidServer1); + + EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).times(iterations); + ImmutableDruidServerTests.expectSegments(druidServer2, segments2); + EasyMock.replay(druidServer2); + + EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).times(iterations); + ImmutableDruidServerTests.expectSegments(druidServer3, segments3); + EasyMock.replay(druidServer3); + + ImmutableDruidServerTests.expectSegments(druidServer4, segments4); + EasyMock.replay(druidServer4); + + // Have to use anyTimes() because the number of times a segment on a given server is chosen is indetermistic. + EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes(); + EasyMock.replay(holder1); + EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes(); + EasyMock.replay(holder2); + EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes(); + EasyMock.replay(holder3); + // We only run getServer() each time we calculate the limit on segments to consider. Always 5k + EasyMock.expect(holder4.getServer()).andReturn(druidServer4).times(5000); + EasyMock.replay(holder4); + + List holderList = new ArrayList<>(); + holderList.add(holder1); + holderList.add(holder2); + holderList.add(holder3); + holderList.add(holder4); + + Map segmentCountMap = new HashMap<>(); + for (int i = 0; i < iterations; i++) { + segmentCountMap.put( + ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 75).getSegment(), 1 + ); + } + + for (DataSegment segment : segments) { + if (!segment.equals(segment4)) { + Assert.assertEquals(new Integer(1), segmentCountMap.get(segment)); + } else { + Assert.assertNull(segmentCountMap.get(segment)); + } } EasyMock.verify(druidServer1, druidServer2, druidServer3, druidServer4); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 0050c0e07398..15c49a2de1af 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -44,6 +44,7 @@ public void testSerde() throws Exception + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -66,16 +67,19 @@ public void testSerde() throws Exception ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true); + + actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual); + assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true); } @Test @@ -86,6 +90,7 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -105,13 +110,13 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio ); ImmutableSet decommissioning = ImmutableSet.of(); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); } @Test @@ -122,6 +127,7 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -139,7 +145,91 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception ), CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false, 1, ImmutableSet.of(), 0, false); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 2, + true, + ImmutableSet.of("test1", "test2"), + false, + 1, + ImmutableSet.of(), + 0, + false + ); + } + + @Test + public void testSerdeCorrectsInvalidBadMaxPercentOfSegmentsToConsiderPerMove() throws Exception + { + try { + String jsonStr = "{\n" + + " \"percentOfSegmentsToConsiderPerMove\": 0\n" + + "}\n"; + + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } + + try { + String jsonStr = "{\n" + + " \"percentOfSegmentsToConsiderPerMove\": -100\n" + + "}\n"; + + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } + + try { + String jsonStr = "{\n" + + " \"percentOfSegmentsToConsiderPerMove\": 105\n" + + "}\n"; + + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } } @Test @@ -150,6 +240,7 @@ public void testSerdeWithKillAllDataSources() throws Exception + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -168,13 +259,14 @@ public void testSerdeWithKillAllDataSources() throws Exception CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false); //ensure whitelist is empty when killAllDataSources is true try { jsonStr = "{\n" + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" - + " \"killAllDataSources\": true\n" + + " \"killAllDataSources\": true,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1\n" + "}\n"; mapper.readValue( jsonStr, @@ -196,6 +288,7 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -213,7 +306,7 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false); } @Test @@ -221,7 +314,24 @@ public void testBuilderDefaults() { CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder().build(); ImmutableSet emptyList = ImmutableSet.of(); - assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, emptyList, false, 0, emptyList, 70, false); + assertConfig( + defaultConfig, + 900000, + 524288000, + 100, + 5, + 100, + 15, + 10, + 1, + false, + emptyList, + false, + 0, + emptyList, + 70, + false + ); } @Test @@ -235,7 +345,7 @@ public void testUpdate() Assert.assertEquals( current, new CoordinatorDynamicConfig - .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) + .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) .build(current) ); } @@ -255,6 +365,7 @@ private void assertConfig( long expectedMergeBytesLimit, int expectedMergeSegmentsLimit, int expectedMaxSegmentsToMove, + int expectedPercentOfSegmentsToConsiderPerMove, int expectedReplicantLifetime, int expectedReplicationThrottleLimit, int expectedBalancerComputeThreads, @@ -274,6 +385,7 @@ private void assertConfig( Assert.assertEquals(expectedMergeBytesLimit, config.getMergeBytesLimit()); Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit()); Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove()); + Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove, config.getPercentOfSegmentsToConsiderPerMove(), 0); Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime()); Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit()); Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads()); diff --git a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap index 50f17d44ee65..ff3008c61146 100644 --- a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap @@ -167,6 +167,14 @@ exports[`coordinator dynamic config matches snapshot 1`] = ` "name": "decommissioningMaxPercentOfMaxSegmentsToMove", "type": "number", }, + Object { + "defaultValue": 100, + "info": + The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move. + , + "name": "percentOfSegmentsToConsiderPerMove", + "type": "number", + }, Object { "defaultValue": false, "info": diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index b5621ce31f8a..f2fa7d0c8ec9 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -193,6 +193,23 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ ), }, + { + name: 'percentOfSegmentsToConsiderPerMove', + type: 'number', + defaultValue: 100, + info: ( + <> + The percentage of the total number of segments in the cluster that are considered every time + a segment needs to be selected for a move. Druid orders servers by available capacity + ascending (the least available capacity first) and then iterates over the servers. For each + server, Druid iterates over the segments on the server, considering them for moving. The + default config of 100% means that every segment on every server is a candidate to be moved. + This should make sense for most small to medium-sized clusters. However, an admin may find + it preferable to drop this value lower if they don't think that it is worthwhile to consider + every single segment in the cluster each time it is looking for a segment to move. + + ), + }, { name: 'pauseCoordination', type: 'boolean',