From 614a96bb4ec9f52a2654e0ea39f769a759aedcb3 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Thu, 13 Aug 2020 15:31:53 -0500 Subject: [PATCH 01/28] dynamic coord config adding more balancing control add new dynamic coordinator config, maxSegmentsToConsiderPerMove. This config caps the number of segments that are iterated over when selecting a segment to move. The default value combined with current balancing strategies will still iterate over all provided segments. However, setting this value to something > 0 will cap the number of segments visited. This could make sense in cases where a cluster has a very large number of segments and the admins prefer less iterations vs a thorough consideration of all segments provided. --- docs/configuration/index.md | 2 + .../server/coordinator/BalancerStrategy.java | 13 +- .../coordinator/CoordinatorDynamicConfig.java | 31 +++++ .../coordinator/CostBalancerStrategy.java | 9 +- .../coordinator/RandomBalancerStrategy.java | 12 +- .../coordinator/ReservoirSegmentSampler.java | 30 ++++- .../coordinator/duty/BalanceSegments.java | 3 +- .../coordinator/BalanceSegmentsTest.java | 44 +++++-- .../ReservoirSegmentSamplerTest.java | 67 ++++++++++- .../http/CoordinatorDynamicConfigTest.java | 111 +++++++++++++++--- 10 files changed, 287 insertions(+), 35 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index b02c467141a1..5767af5164d2 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -737,6 +737,7 @@ A sample Coordinator dynamic config JSON object is shown below: "mergeBytesLimit": 100000000, "mergeSegmentsLimit" : 1000, "maxSegmentsToMove": 5, + "maxSegmentsToConsiderPerMove": 2147483647, "replicantLifetime": 15, "replicationThrottleLimit": 10, "emitBalancingStats": false, @@ -755,6 +756,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| +|`maxSegmentsToConsiderPerMove`|The maximum number of segments to consider when picking a segment to move. For a cluster with a very large number of segments, choosing to set this to a non-default value may make sense for an experienced admin. A value of `<= 0` will be reset to the default automatically.|Integer.MAX_VALUE| |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java index 889c167c8ef5..37ad86febb96 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java @@ -63,11 +63,22 @@ public interface BalancerStrategy * NOTE: this should really be handled on a per-segment basis, to properly support * the interval or period-based broadcast rules. For simplicity of the initial * implementation, only forever broadcast rules are supported. + * @param numberOfSegmentsToConsider The number of segments to consider when choosing which segment to move. + * Implementations of this interface need to be cognizant of the + * {@link CoordinatorDynamicConfig} default value for maxSegmentsToConsiderPerMove. + * If use of an implementation is going to potentially leverage this value, the + * implementation must ensure proper functionality with the potentially provided + * values, especially the default value. + * * @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if * there are no segments to pick from (i. e. all provided serverHolders are empty). */ @Nullable - BalancerSegmentHolder pickSegmentToMove(List serverHolders, Set broadcastDatasources); + BalancerSegmentHolder pickSegmentToMove( + List serverHolders, + Set broadcastDatasources, + int numberOfSegmentsToConsider + ); /** * Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index bafeb73c5083..7330d705bfc2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -53,6 +53,7 @@ public class CoordinatorDynamicConfig private final long mergeBytesLimit; private final int mergeSegmentsLimit; private final int maxSegmentsToMove; + private final int maxSegmentsToConsiderPerMove; private final int replicantLifetime; private final int replicationThrottleLimit; private final int balancerComputeThreads; @@ -95,6 +96,7 @@ public CoordinatorDynamicConfig( @JsonProperty("mergeBytesLimit") long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove, + @JsonProperty("maxSegmentsToConsiderPerMove") int maxSegmentsToConsiderPerMove, @JsonProperty("replicantLifetime") int replicantLifetime, @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit, @JsonProperty("balancerComputeThreads") int balancerComputeThreads, @@ -123,6 +125,12 @@ public CoordinatorDynamicConfig( this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; + // This helps with ease of migration to the new config, but could confuse users. Docs explicitly state this value must be > 0 + if (maxSegmentsToConsiderPerMove <= 0) { + this.maxSegmentsToConsiderPerMove = Integer.MAX_VALUE; + } else { + this.maxSegmentsToConsiderPerMove = maxSegmentsToConsiderPerMove; + } this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; this.balancerComputeThreads = Math.max(balancerComputeThreads, 1); @@ -211,6 +219,12 @@ public int getMaxSegmentsToMove() return maxSegmentsToMove; } + @JsonProperty + public int getMaxSegmentsToConsiderPerMove() + { + return maxSegmentsToConsiderPerMove; + } + @JsonProperty public int getReplicantLifetime() { @@ -302,6 +316,7 @@ public String toString() ", mergeBytesLimit=" + mergeBytesLimit + ", mergeSegmentsLimit=" + mergeSegmentsLimit + ", maxSegmentsToMove=" + maxSegmentsToMove + + ", maxSegmentsToConsiderPerMove=" + maxSegmentsToConsiderPerMove + ", replicantLifetime=" + replicantLifetime + ", replicationThrottleLimit=" + replicationThrottleLimit + ", balancerComputeThreads=" + balancerComputeThreads + @@ -341,6 +356,9 @@ public boolean equals(Object o) if (maxSegmentsToMove != that.maxSegmentsToMove) { return false; } + if (maxSegmentsToConsiderPerMove != that.maxSegmentsToConsiderPerMove) { + return false; + } if (replicantLifetime != that.replicantLifetime) { return false; } @@ -382,6 +400,7 @@ public int hashCode() mergeBytesLimit, mergeSegmentsLimit, maxSegmentsToMove, + maxSegmentsToConsiderPerMove, replicantLifetime, replicationThrottleLimit, balancerComputeThreads, @@ -408,6 +427,7 @@ public static class Builder private static final long DEFAULT_MERGE_BYTES_LIMIT = 524_288_000L; private static final int DEFAULT_MERGE_SEGMENTS_LIMIT = 100; private static final int DEFAULT_MAX_SEGMENTS_TO_MOVE = 5; + private static final int DEFAULT_MAX_SEGMENTS_TO_CONSIDER_PER_MOVE = Integer.MAX_VALUE; private static final int DEFAULT_REPLICANT_LIFETIME = 15; private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10; private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1; @@ -421,6 +441,7 @@ public static class Builder private Long mergeBytesLimit; private Integer mergeSegmentsLimit; private Integer maxSegmentsToMove; + private Integer maxSegmentsToConsiderPerMove; private Integer replicantLifetime; private Integer replicationThrottleLimit; private Boolean emitBalancingStats; @@ -444,6 +465,7 @@ public Builder( @JsonProperty("mergeBytesLimit") @Nullable Long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove, + @JsonProperty("maxSegmentsToConsiderPerMove") @Nullable Integer maxSegmentsToConsiderPerMove, @JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime, @JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit, @JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads, @@ -463,6 +485,7 @@ public Builder( this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; + this.maxSegmentsToConsiderPerMove = maxSegmentsToConsiderPerMove; this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; this.balancerComputeThreads = balancerComputeThreads; @@ -500,6 +523,12 @@ public Builder withMaxSegmentsToMove(int maxSegmentsToMove) return this; } + public Builder withMaxSegmentsToConsiderPerMove(int maxSegmentsToConsiderPerMove) + { + this.maxSegmentsToConsiderPerMove = maxSegmentsToConsiderPerMove; + return this; + } + public Builder withReplicantLifetime(int replicantLifetime) { this.replicantLifetime = replicantLifetime; @@ -569,6 +598,7 @@ public CoordinatorDynamicConfig build() mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : mergeBytesLimit, mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : mergeSegmentsLimit, maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : maxSegmentsToMove, + maxSegmentsToConsiderPerMove == null ? DEFAULT_MAX_SEGMENTS_TO_CONSIDER_PER_MOVE : maxSegmentsToConsiderPerMove, replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : replicantLifetime, replicationThrottleLimit == null ? DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit, balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads, @@ -598,6 +628,7 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) mergeBytesLimit == null ? defaults.getMergeBytesLimit() : mergeBytesLimit, mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit, maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove, + maxSegmentsToConsiderPerMove == null ? defaults.getMaxSegmentsToConsiderPerMove() : maxSegmentsToConsiderPerMove, replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime, replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit, balancerComputeThreads == null ? defaults.getBalancerComputeThreads() : balancerComputeThreads, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java index e5e3cb57b129..c383d44d9879 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java @@ -214,10 +214,15 @@ static double computeJointSegmentsCost(final DataSegment segment, final Iterable @Override public BalancerSegmentHolder pickSegmentToMove( final List serverHolders, - Set broadcastDatasources + Set broadcastDatasources, + int numberOfSegmentsToConsider ) { - return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources); + return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( + serverHolders, + broadcastDatasources, + numberOfSegmentsToConsider + ); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java index de3e46e66f43..43cc2232f644 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java @@ -54,9 +54,17 @@ public ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List } @Override - public BalancerSegmentHolder pickSegmentToMove(List serverHolders, Set broadcastDatasources) + public BalancerSegmentHolder pickSegmentToMove( + List serverHolders, + Set broadcastDatasources, + int numberOfSegmentsToConsider + ) { - return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources); + return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( + serverHolders, + broadcastDatasources, + numberOfSegmentsToConsider + ); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java index 7181d52e152a..7c5f9a05f47b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.DataSegment; import java.util.List; @@ -28,9 +29,23 @@ final class ReservoirSegmentSampler { + private static final EmittingLogger log = new EmittingLogger(ReservoirSegmentSampler.class); + + /** + * Iterates over segments that live on the candidate servers passed in {@link ServerHolder} and (possibly) picks a + * segment to return to caller in a {@link BalancerSegmentHolder} object. + * + * @param serverHolders List of {@link ServerHolder} objects containing segments who are candidates to be chosen. + * @param broadcastDatasources Set of DataSource names that identify broadcast datasources. We don't want to consider + * segments from these datasources. + * @param numberOfSegmentsToConsider A limit on the number of segments to consider before short-circuiting and + * returning immediately. + * @return + */ static BalancerSegmentHolder getRandomBalancerSegmentHolder( final List serverHolders, - Set broadcastDatasources + Set broadcastDatasources, + int numberOfSegmentsToConsider ) { ServerHolder fromServerHolder = null; @@ -56,6 +71,19 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder( proposalSegment = segment; } numSoFar++; + + // We have iterated over the alloted number of segments and will return the currently proposed segment or null + if (numSoFar >= numberOfSegmentsToConsider) { + log.debug( + "Breaking out of iteration over potential segments to move because the limit for " + + "numberOfSegmentsToCondider [%d] has been hit", numberOfSegmentsToConsider + ); + break; + } + } + // We have iterated over the alloted number of segments and will return the currently proposed segment or null + if (numSoFar >= numberOfSegmentsToConsider) { + break; } } if (fromServerHolder != null) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index a1c5237ddd15..7803e1c2466a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -189,7 +189,8 @@ private Pair balanceServers( for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) { final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove( toMoveFrom, - params.getBroadcastDatasources() + params.getBroadcastDatasources(), + params.getCoordinatorDynamicConfig().getMaxSegmentsToConsiderPerMove() ); if (segmentToMoveHolder == null) { log.info("All servers to move segments from are empty, ending run."); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index f37c92cb1056..329c02e113a2 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -232,10 +232,19 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove() mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, false)), broadcastDatasources)) - .andReturn(new BalancerSegmentHolder(druidServer2, segment3)) - .andReturn(new BalancerSegmentHolder(druidServer2, segment4)); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect( + strategy.pickSegmentToMove( + ImmutableList.of( + new ServerHolder(druidServer2, peon2, false) + ), + broadcastDatasources, + Integer.MAX_VALUE + ) + ).andReturn( + new BalancerSegmentHolder(druidServer2, segment3)).andReturn(new BalancerSegmentHolder(druidServer2, segment4) + ); + + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .andReturn(new BalancerSegmentHolder(druidServer1, segment2)); @@ -300,7 +309,7 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissi mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .andReturn(new BalancerSegmentHolder(druidServer1, segment2)) .andReturn(new BalancerSegmentHolder(druidServer2, segment3)) @@ -349,7 +358,7 @@ public void testMoveToDecommissioningServer() mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .anyTimes(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> { @@ -384,7 +393,7 @@ public void testMoveFromDecommissioningServer() ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) .once(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) @@ -637,7 +646,11 @@ public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, Li } @Override - public BalancerSegmentHolder pickSegmentToMove(List serverHolders, Set broadcastDatasources) + public BalancerSegmentHolder pickSegmentToMove( + List serverHolders, + Set broadcastDatasources, + int numberOfSegmentsToConsider + ) { return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size()); } @@ -661,9 +674,18 @@ private DruidCoordinatorRuntimeParams setupParamsForDecommissioningMaxPercentOfM // either decommissioning servers list or acitve ones (ie servers list is [2] or [1, 3]) BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true)), broadcastDatasources)) - .andReturn(new BalancerSegmentHolder(druidServer2, segment2)); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject())) + EasyMock.expect( + strategy.pickSegmentToMove( + ImmutableList.of( + new ServerHolder(druidServer2, peon2, true) + ), + broadcastDatasources, + Integer.MAX_VALUE + ) + ).andReturn( + new BalancerSegmentHolder(druidServer2, segment2) + ); + EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) .andReturn(new BalancerSegmentHolder(druidServer1, segment1)); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new ServerHolder(druidServer3, peon3)) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java index 8aef2f2e10fb..c4498f94e5df 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java @@ -191,11 +191,74 @@ public void getRandomBalancerSegmentHolderTest() Map segmentCountMap = new HashMap<>(); for (int i = 0; i < 5000; i++) { - segmentCountMap.put(ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet()).getSegment(), 1); + segmentCountMap.put(ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), Integer.MAX_VALUE).getSegment(), 1); } for (DataSegment segment : segments) { - Assert.assertEquals(segmentCountMap.get(segment), new Integer(1)); + Assert.assertEquals(new Integer(1), segmentCountMap.get(segment)); + } + } + + /** + * Makes sure that the segment on server4 is never chosen in 5k iterations because it should never have its segment + * checked due to the limit on segment candidates + */ + @Test + public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit() + { + EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce(); + EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce(); + EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); + EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); + ImmutableDruidServerTests.expectSegments(druidServer1, segments1); + EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer1); + + EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce(); + EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce(); + EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes(); + EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce(); + EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); + ImmutableDruidServerTests.expectSegments(druidServer2, segments2); + EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer2); + + EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce(); + EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce(); + EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes(); + EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce(); + EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce(); + ImmutableDruidServerTests.expectSegments(druidServer3, segments3); + EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer3); + + EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes(); + EasyMock.replay(holder1); + EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes(); + EasyMock.replay(holder2); + + EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes(); + EasyMock.replay(holder3); + + List holderList = new ArrayList<>(); + holderList.add(holder1); + holderList.add(holder2); + holderList.add(holder3); + holderList.add(holder4); + + Map segmentCountMap = new HashMap<>(); + for (int i = 0; i < 5000; i++) { + segmentCountMap.put( + ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 3).getSegment(), 1) + ; + } + + for (DataSegment segment : segments) { + if (!segment.equals(segment4)) { + Assert.assertEquals(new Integer(1), segmentCountMap.get(segment)); + } else { + Assert.assertNull(segmentCountMap.get(segment)); + } } } } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 0050c0e07398..51dd259974f8 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -44,6 +44,7 @@ public void testSerde() throws Exception + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"maxSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -66,16 +67,19 @@ public void testSerde() throws Exception ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, 1); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false, 1); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, 1); actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, 1); + + actual = CoordinatorDynamicConfig.builder().withMaxSegmentsToConsiderPerMove(10).build(actual); + assertConfig(actual, 1, 1, 1, 1, 10, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, 1); } @Test @@ -86,6 +90,7 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"maxSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -105,13 +110,13 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio ); ImmutableSet decommissioning = ImmutableSet.of(); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false, 1); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false, 1); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, 1); } @Test @@ -122,6 +127,7 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"maxSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -139,7 +145,60 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception ), CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of("test1", "test2"), false, 1, ImmutableSet.of(), 0, false); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 1, + 1, + 2, + true, + ImmutableSet.of("test1", "test2"), + false, + 1, + ImmutableSet.of(), + 0, + false, + 1 + ); + } + + @Test + public void testSerdeCorrectsInvalidBadMaxSegmentsToConsiderPerMove() throws Exception + { + String jsonStr = "{\n" + + " \"maxSegmentsToConsiderPerMove\": 0\n" + + "}\n"; + + CoordinatorDynamicConfig actual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + + Assert.assertEquals(Integer.MAX_VALUE, actual.getMaxSegmentsToConsiderPerMove()); + + jsonStr = "{\n" + + " \"maxSegmentsToConsiderPerMove\": -100\n" + + "}\n"; + + actual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + + Assert.assertEquals(Integer.MAX_VALUE, actual.getMaxSegmentsToConsiderPerMove()); } @Test @@ -150,6 +209,7 @@ public void testSerdeWithKillAllDataSources() throws Exception + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"maxSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -168,13 +228,14 @@ public void testSerdeWithKillAllDataSources() throws Exception CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false, 1); //ensure whitelist is empty when killAllDataSources is true try { jsonStr = "{\n" + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" - + " \"killAllDataSources\": true\n" + + " \"killAllDataSources\": true,\n" + + " \"maxSegmentsToConsiderPerMove\": 1\n" + "}\n"; mapper.readValue( jsonStr, @@ -196,6 +257,7 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" + + " \"maxSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -213,7 +275,7 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false, 1); } @Test @@ -221,7 +283,24 @@ public void testBuilderDefaults() { CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder().build(); ImmutableSet emptyList = ImmutableSet.of(); - assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, emptyList, false, 0, emptyList, 70, false); + assertConfig( + defaultConfig, + 900000, + 524288000, + 100, + 5, + Integer.MAX_VALUE, + 10, + 1, + false, + emptyList, + false, + 0, + emptyList, + 70, + false, + 15 + ); } @Test @@ -235,7 +314,7 @@ public void testUpdate() Assert.assertEquals( current, new CoordinatorDynamicConfig - .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) + .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) .build(current) ); } @@ -255,7 +334,7 @@ private void assertConfig( long expectedMergeBytesLimit, int expectedMergeSegmentsLimit, int expectedMaxSegmentsToMove, - int expectedReplicantLifetime, + int expectedMaxSegmentsToConsiderPerMove, int expectedReplicationThrottleLimit, int expectedBalancerComputeThreads, boolean expectedEmitingBalancingStats, @@ -264,7 +343,8 @@ private void assertConfig( int expectedMaxSegmentsInNodeLoadingQueue, Set decommissioningNodes, int decommissioningMaxPercentOfMaxSegmentsToMove, - boolean pauseCoordination + boolean pauseCoordination, + int expectedReplicantLifetime ) { Assert.assertEquals( @@ -274,6 +354,7 @@ private void assertConfig( Assert.assertEquals(expectedMergeBytesLimit, config.getMergeBytesLimit()); Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit()); Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove()); + Assert.assertEquals(expectedMaxSegmentsToConsiderPerMove, config.getMaxSegmentsToConsiderPerMove()); Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime()); Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit()); Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads()); From f7c53cd91368d070f4fe9a6d5c95dc1045db75dc Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Fri, 14 Aug 2020 18:02:22 -0500 Subject: [PATCH 02/28] fix checkstyle failure --- .../druid/server/coordinator/ReservoirSegmentSamplerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java index c4498f94e5df..790598739408 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java @@ -249,8 +249,8 @@ public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit() Map segmentCountMap = new HashMap<>(); for (int i = 0; i < 5000; i++) { segmentCountMap.put( - ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 3).getSegment(), 1) - ; + ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 3).getSegment(), 1 + ); } for (DataSegment segment : segments) { From e1d47653b9ec74e97ed64a3ea57687d9ca9bc404 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Wed, 2 Sep 2020 15:35:40 -0500 Subject: [PATCH 03/28] Make doc more detailed for admin to understand when/why to use new config --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 5767af5164d2..1ce0584cd29b 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -756,7 +756,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| -|`maxSegmentsToConsiderPerMove`|The maximum number of segments to consider when picking a segment to move. For a cluster with a very large number of segments, choosing to set this to a non-default value may make sense for an experienced admin. A value of `<= 0` will be reset to the default automatically.|Integer.MAX_VALUE| +|`maxSegmentsToConsiderPerMove`|The maximum number of segments to consider when picking a segment to move. For a cluster with a very large number of segments, choosing to set this to a non-default value may make sense for an experienced admin. A value of `<= 0` will be reset to the default automatically. Currently, when picking a segment to move, the coordinator will iterate over all segments on each server. The coordinator starts its iterations on servers who have the least available capacity and finishes on the server with the most available capacity. Each segment considered for moving is more likely to be chosen to move compard to the segments considered after it. If you wanted to consider moving segments from only the top N servers in terms of segment cache consumption, you'd multiply N * (approximate number of segments per server in your cluster) and set this config to that value. As your cluster grows in both number of segments and nodes, you may need to revisit this value if using non-default value. Setting this config to a non-default value can save compute if you determine that you don't need every segment to be considered for every move.|Integer.MAX_VALUE| |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| From 0ff7c5aefd947ed20e289fc8e0309aa77628c943 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Thu, 3 Sep 2020 09:41:11 -0500 Subject: [PATCH 04/28] refactor PR to use a % of segments instead of raw number --- .../server/coordinator/BalancerStrategy.java | 12 +++--- .../coordinator/CoordinatorDynamicConfig.java | 37 ++++++++++--------- .../coordinator/CostBalancerStrategy.java | 4 +- .../coordinator/RandomBalancerStrategy.java | 4 +- .../coordinator/ReservoirSegmentSampler.java | 30 +++++++++++---- .../coordinator/duty/BalanceSegments.java | 2 +- .../coordinator/BalanceSegmentsTest.java | 6 +-- .../ReservoirSegmentSamplerTest.java | 11 ++++-- .../http/CoordinatorDynamicConfigTest.java | 30 +++++++-------- 9 files changed, 77 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java index 37ad86febb96..e988e2383721 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java @@ -63,12 +63,10 @@ public interface BalancerStrategy * NOTE: this should really be handled on a per-segment basis, to properly support * the interval or period-based broadcast rules. For simplicity of the initial * implementation, only forever broadcast rules are supported. - * @param numberOfSegmentsToConsider The number of segments to consider when choosing which segment to move. - * Implementations of this interface need to be cognizant of the - * {@link CoordinatorDynamicConfig} default value for maxSegmentsToConsiderPerMove. - * If use of an implementation is going to potentially leverage this value, the - * implementation must ensure proper functionality with the potentially provided - * values, especially the default value. + * @param percentOfSegmentsToConsider The percentage of the total number of segments that we will consider when + * choosing which segment to move. {@link CoordinatorDynamicConfig} defines a + * config percentOfSegmentsToConsiderPerMove that will be used as an argument + * for implementations of this method. * * @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if * there are no segments to pick from (i. e. all provided serverHolders are empty). @@ -77,7 +75,7 @@ public interface BalancerStrategy BalancerSegmentHolder pickSegmentToMove( List serverHolders, Set broadcastDatasources, - int numberOfSegmentsToConsider + int percentOfSegmentsToConsider ); /** diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 7330d705bfc2..d21babbc87a3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -53,7 +53,7 @@ public class CoordinatorDynamicConfig private final long mergeBytesLimit; private final int mergeSegmentsLimit; private final int maxSegmentsToMove; - private final int maxSegmentsToConsiderPerMove; + private final int percentOfSegmentsToConsiderPerMove; private final int replicantLifetime; private final int replicationThrottleLimit; private final int balancerComputeThreads; @@ -96,7 +96,7 @@ public CoordinatorDynamicConfig( @JsonProperty("mergeBytesLimit") long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove, - @JsonProperty("maxSegmentsToConsiderPerMove") int maxSegmentsToConsiderPerMove, + @JsonProperty("percentOfSegmentsToConsiderPerMove") int percentOfSegmentsToConsiderPerMove, @JsonProperty("replicantLifetime") int replicantLifetime, @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit, @JsonProperty("balancerComputeThreads") int balancerComputeThreads, @@ -126,10 +126,10 @@ public CoordinatorDynamicConfig( this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; // This helps with ease of migration to the new config, but could confuse users. Docs explicitly state this value must be > 0 - if (maxSegmentsToConsiderPerMove <= 0) { - this.maxSegmentsToConsiderPerMove = Integer.MAX_VALUE; + if (percentOfSegmentsToConsiderPerMove <= 0 || percentOfSegmentsToConsiderPerMove > 100) { + this.percentOfSegmentsToConsiderPerMove = 100; } else { - this.maxSegmentsToConsiderPerMove = maxSegmentsToConsiderPerMove; + this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; } this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; @@ -220,9 +220,9 @@ public int getMaxSegmentsToMove() } @JsonProperty - public int getMaxSegmentsToConsiderPerMove() + public int getPercentOfSegmentsToConsiderPerMove() { - return maxSegmentsToConsiderPerMove; + return percentOfSegmentsToConsiderPerMove; } @JsonProperty @@ -316,7 +316,7 @@ public String toString() ", mergeBytesLimit=" + mergeBytesLimit + ", mergeSegmentsLimit=" + mergeSegmentsLimit + ", maxSegmentsToMove=" + maxSegmentsToMove + - ", maxSegmentsToConsiderPerMove=" + maxSegmentsToConsiderPerMove + + ", percentOfSegmentsToConsiderPerMove=" + percentOfSegmentsToConsiderPerMove + ", replicantLifetime=" + replicantLifetime + ", replicationThrottleLimit=" + replicationThrottleLimit + ", balancerComputeThreads=" + balancerComputeThreads + @@ -356,7 +356,7 @@ public boolean equals(Object o) if (maxSegmentsToMove != that.maxSegmentsToMove) { return false; } - if (maxSegmentsToConsiderPerMove != that.maxSegmentsToConsiderPerMove) { + if (percentOfSegmentsToConsiderPerMove != that.percentOfSegmentsToConsiderPerMove) { return false; } if (replicantLifetime != that.replicantLifetime) { @@ -400,7 +400,7 @@ public int hashCode() mergeBytesLimit, mergeSegmentsLimit, maxSegmentsToMove, - maxSegmentsToConsiderPerMove, + percentOfSegmentsToConsiderPerMove, replicantLifetime, replicationThrottleLimit, balancerComputeThreads, @@ -427,7 +427,7 @@ public static class Builder private static final long DEFAULT_MERGE_BYTES_LIMIT = 524_288_000L; private static final int DEFAULT_MERGE_SEGMENTS_LIMIT = 100; private static final int DEFAULT_MAX_SEGMENTS_TO_MOVE = 5; - private static final int DEFAULT_MAX_SEGMENTS_TO_CONSIDER_PER_MOVE = Integer.MAX_VALUE; + private static final int DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE = 100; private static final int DEFAULT_REPLICANT_LIFETIME = 15; private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10; private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1; @@ -441,7 +441,7 @@ public static class Builder private Long mergeBytesLimit; private Integer mergeSegmentsLimit; private Integer maxSegmentsToMove; - private Integer maxSegmentsToConsiderPerMove; + private Integer percentOfSegmentsToConsiderPerMove; private Integer replicantLifetime; private Integer replicationThrottleLimit; private Boolean emitBalancingStats; @@ -465,7 +465,7 @@ public Builder( @JsonProperty("mergeBytesLimit") @Nullable Long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove, - @JsonProperty("maxSegmentsToConsiderPerMove") @Nullable Integer maxSegmentsToConsiderPerMove, + @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Integer percentOfSegmentsToConsiderPerMove, @JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime, @JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit, @JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads, @@ -485,7 +485,7 @@ public Builder( this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; - this.maxSegmentsToConsiderPerMove = maxSegmentsToConsiderPerMove; + this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; this.balancerComputeThreads = balancerComputeThreads; @@ -523,9 +523,9 @@ public Builder withMaxSegmentsToMove(int maxSegmentsToMove) return this; } - public Builder withMaxSegmentsToConsiderPerMove(int maxSegmentsToConsiderPerMove) + public Builder withPercentOfSegmentsToConsiderPerMove(int percentOfSegmentsToConsiderPerMove) { - this.maxSegmentsToConsiderPerMove = maxSegmentsToConsiderPerMove; + this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; return this; } @@ -598,7 +598,8 @@ public CoordinatorDynamicConfig build() mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : mergeBytesLimit, mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : mergeSegmentsLimit, maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : maxSegmentsToMove, - maxSegmentsToConsiderPerMove == null ? DEFAULT_MAX_SEGMENTS_TO_CONSIDER_PER_MOVE : maxSegmentsToConsiderPerMove, + percentOfSegmentsToConsiderPerMove == null ? DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE + : percentOfSegmentsToConsiderPerMove, replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : replicantLifetime, replicationThrottleLimit == null ? DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit, balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads, @@ -628,7 +629,7 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) mergeBytesLimit == null ? defaults.getMergeBytesLimit() : mergeBytesLimit, mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit, maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove, - maxSegmentsToConsiderPerMove == null ? defaults.getMaxSegmentsToConsiderPerMove() : maxSegmentsToConsiderPerMove, + percentOfSegmentsToConsiderPerMove == null ? defaults.getPercentOfSegmentsToConsiderPerMove() : percentOfSegmentsToConsiderPerMove, replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime, replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit, balancerComputeThreads == null ? defaults.getBalancerComputeThreads() : balancerComputeThreads, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java index c383d44d9879..0142b266d902 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java @@ -215,13 +215,13 @@ static double computeJointSegmentsCost(final DataSegment segment, final Iterable public BalancerSegmentHolder pickSegmentToMove( final List serverHolders, Set broadcastDatasources, - int numberOfSegmentsToConsider + int percentOfSegmentsToConsider ) { return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( serverHolders, broadcastDatasources, - numberOfSegmentsToConsider + percentOfSegmentsToConsider ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java index 43cc2232f644..839bff4b7e23 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java @@ -57,13 +57,13 @@ public ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List public BalancerSegmentHolder pickSegmentToMove( List serverHolders, Set broadcastDatasources, - int numberOfSegmentsToConsider + int percentOfSegmentsToConsider ) { return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( serverHolders, broadcastDatasources, - numberOfSegmentsToConsider + percentOfSegmentsToConsider ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java index 7c5f9a05f47b..7d1dd27ef325 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java @@ -38,20 +38,34 @@ final class ReservoirSegmentSampler * @param serverHolders List of {@link ServerHolder} objects containing segments who are candidates to be chosen. * @param broadcastDatasources Set of DataSource names that identify broadcast datasources. We don't want to consider * segments from these datasources. - * @param numberOfSegmentsToConsider A limit on the number of segments to consider before short-circuiting and + * @param percentOfSegmentsToConsider The % of total cluster segments to consider before short-circuiting and * returning immediately. * @return */ static BalancerSegmentHolder getRandomBalancerSegmentHolder( final List serverHolders, Set broadcastDatasources, - int numberOfSegmentsToConsider + int percentOfSegmentsToConsider ) { ServerHolder fromServerHolder = null; DataSegment proposalSegment = null; + int calculatedSegmentLimit = Integer.MAX_VALUE; int numSoFar = 0; + // Calculate the integer limit for the number of segments to be considered for moving if % is less than 100 + if (percentOfSegmentsToConsider < 100) { + int totalSegments = 0; + for (ServerHolder server : serverHolders) { + totalSegments += server.getServer().getNumSegments(); + } + // If totalSegments are zero, we will assume it is a mistake and move on to iteration without updating + // calculatedSegmentLimit + if (totalSegments != 0) { + calculatedSegmentLimit = (int) Math.ceil((double) totalSegments * ((double) percentOfSegmentsToConsider / (double) 100)); + } + } + for (ServerHolder server : serverHolders) { if (!server.getServer().getType().isSegmentReplicationTarget()) { // if the server only handles broadcast segments (which don't need to be rebalanced), we have nothing to do @@ -73,16 +87,16 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder( numSoFar++; // We have iterated over the alloted number of segments and will return the currently proposed segment or null - if (numSoFar >= numberOfSegmentsToConsider) { - log.debug( - "Breaking out of iteration over potential segments to move because the limit for " + - "numberOfSegmentsToCondider [%d] has been hit", numberOfSegmentsToConsider - ); + // We will only break out early if we are iterating less than 100% of the total cluster segments + if (percentOfSegmentsToConsider != 100 && numSoFar >= calculatedSegmentLimit) { + log.debug("Breaking out of iteration over potential segments to move because we hit the limit [%d percent] of" + + " segments to consider to move. Segments Iterated: [%s]", percentOfSegmentsToConsider, numSoFar); break; } } // We have iterated over the alloted number of segments and will return the currently proposed segment or null - if (numSoFar >= numberOfSegmentsToConsider) { + // We will only break out early if we are iterating less than 100% of the total cluster segments + if (percentOfSegmentsToConsider != 100 && numSoFar >= calculatedSegmentLimit) { break; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index 7803e1c2466a..d1fca19e03be 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -190,7 +190,7 @@ private Pair balanceServers( final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove( toMoveFrom, params.getBroadcastDatasources(), - params.getCoordinatorDynamicConfig().getMaxSegmentsToConsiderPerMove() + params.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove() ); if (segmentToMoveHolder == null) { log.info("All servers to move segments from are empty, ending run."); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index 329c02e113a2..c57e140d5112 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -238,7 +238,7 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove() new ServerHolder(druidServer2, peon2, false) ), broadcastDatasources, - Integer.MAX_VALUE + 100 ) ).andReturn( new BalancerSegmentHolder(druidServer2, segment3)).andReturn(new BalancerSegmentHolder(druidServer2, segment4) @@ -649,7 +649,7 @@ public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, Li public BalancerSegmentHolder pickSegmentToMove( List serverHolders, Set broadcastDatasources, - int numberOfSegmentsToConsider + int percentOfSegmentsToConsider ) { return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size()); @@ -680,7 +680,7 @@ private DruidCoordinatorRuntimeParams setupParamsForDecommissioningMaxPercentOfM new ServerHolder(druidServer2, peon2, true) ), broadcastDatasources, - Integer.MAX_VALUE + 100 ) ).andReturn( new BalancerSegmentHolder(druidServer2, segment2) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java index 790598739408..1c824b5bb242 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java @@ -191,7 +191,7 @@ public void getRandomBalancerSegmentHolderTest() Map segmentCountMap = new HashMap<>(); for (int i = 0; i < 5000; i++) { - segmentCountMap.put(ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), Integer.MAX_VALUE).getSegment(), 1); + segmentCountMap.put(ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 100).getSegment(), 1); } for (DataSegment segment : segments) { @@ -232,13 +232,18 @@ public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit() EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer3); + ImmutableDruidServerTests.expectSegments(druidServer4, segments4); + EasyMock.replay(druidServer4); + EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes(); EasyMock.replay(holder1); EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes(); EasyMock.replay(holder2); - EasyMock.expect(holder3.getServer()).andReturn(druidServer3).anyTimes(); EasyMock.replay(holder3); + // We only run getServer() each time we calculate the limit on segments to consider. Always 5k + EasyMock.expect(holder4.getServer()).andReturn(druidServer4).times(5000); + EasyMock.replay(holder4); List holderList = new ArrayList<>(); holderList.add(holder1); @@ -249,7 +254,7 @@ public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit() Map segmentCountMap = new HashMap<>(); for (int i = 0; i < 5000; i++) { segmentCountMap.put( - ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 3).getSegment(), 1 + ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 75).getSegment(), 1 ); } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 51dd259974f8..75df37062d2c 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -44,7 +44,7 @@ public void testSerde() throws Exception + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" - + " \"maxSegmentsToConsiderPerMove\": 1,\n" + + " \"maxPercentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -78,7 +78,7 @@ public void testSerde() throws Exception actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, 1); - actual = CoordinatorDynamicConfig.builder().withMaxSegmentsToConsiderPerMove(10).build(actual); + actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual); assertConfig(actual, 1, 1, 1, 1, 10, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, 1); } @@ -90,7 +90,7 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" - + " \"maxSegmentsToConsiderPerMove\": 1,\n" + + " \"maxPercentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -127,7 +127,7 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" - + " \"maxSegmentsToConsiderPerMove\": 1,\n" + + " \"maxPercentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -166,10 +166,10 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception } @Test - public void testSerdeCorrectsInvalidBadMaxSegmentsToConsiderPerMove() throws Exception + public void testSerdeCorrectsInvalidBadMaxPercentOfSegmentsToConsiderPerMove() throws Exception { String jsonStr = "{\n" - + " \"maxSegmentsToConsiderPerMove\": 0\n" + + " \"maxPercentOfSegmentsToConsiderPerMove\": 0\n" + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( @@ -182,10 +182,10 @@ public void testSerdeCorrectsInvalidBadMaxSegmentsToConsiderPerMove() throws Exc CoordinatorDynamicConfig.class ); - Assert.assertEquals(Integer.MAX_VALUE, actual.getMaxSegmentsToConsiderPerMove()); + Assert.assertEquals(100, actual.getPercentOfSegmentsToConsiderPerMove()); jsonStr = "{\n" - + " \"maxSegmentsToConsiderPerMove\": -100\n" + + " \"maxPercentOfSegmentsToConsiderPerMove\": -100\n" + "}\n"; actual = mapper.readValue( @@ -198,7 +198,7 @@ public void testSerdeCorrectsInvalidBadMaxSegmentsToConsiderPerMove() throws Exc CoordinatorDynamicConfig.class ); - Assert.assertEquals(Integer.MAX_VALUE, actual.getMaxSegmentsToConsiderPerMove()); + Assert.assertEquals(100, actual.getPercentOfSegmentsToConsiderPerMove()); } @Test @@ -209,7 +209,7 @@ public void testSerdeWithKillAllDataSources() throws Exception + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" - + " \"maxSegmentsToConsiderPerMove\": 1,\n" + + " \"maxPercentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -235,7 +235,7 @@ public void testSerdeWithKillAllDataSources() throws Exception jsonStr = "{\n" + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" + " \"killAllDataSources\": true,\n" - + " \"maxSegmentsToConsiderPerMove\": 1\n" + + " \"maxPercentOfSegmentsToConsiderPerMove\": 1\n" + "}\n"; mapper.readValue( jsonStr, @@ -257,7 +257,7 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" - + " \"maxSegmentsToConsiderPerMove\": 1,\n" + + " \"maxPercentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -289,7 +289,7 @@ public void testBuilderDefaults() 524288000, 100, 5, - Integer.MAX_VALUE, + 100, 10, 1, false, @@ -334,7 +334,7 @@ private void assertConfig( long expectedMergeBytesLimit, int expectedMergeSegmentsLimit, int expectedMaxSegmentsToMove, - int expectedMaxSegmentsToConsiderPerMove, + int expectedMaxPercentOfSegmentsToConsiderPerMove, int expectedReplicationThrottleLimit, int expectedBalancerComputeThreads, boolean expectedEmitingBalancingStats, @@ -354,7 +354,7 @@ private void assertConfig( Assert.assertEquals(expectedMergeBytesLimit, config.getMergeBytesLimit()); Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit()); Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove()); - Assert.assertEquals(expectedMaxSegmentsToConsiderPerMove, config.getMaxSegmentsToConsiderPerMove()); + Assert.assertEquals(expectedMaxPercentOfSegmentsToConsiderPerMove, config.getPercentOfSegmentsToConsiderPerMove()); Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime()); Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit()); Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads()); From f3993e34b2464c83c17ff15f9a50a3132bc0fbd6 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Thu, 3 Sep 2020 10:01:59 -0500 Subject: [PATCH 05/28] update the docs --- docs/configuration/index.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 1ce0584cd29b..f8339c3618ab 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -737,7 +737,7 @@ A sample Coordinator dynamic config JSON object is shown below: "mergeBytesLimit": 100000000, "mergeSegmentsLimit" : 1000, "maxSegmentsToMove": 5, - "maxSegmentsToConsiderPerMove": 2147483647, + "percentOfSegmentsToConsiderPerMove": 100, "replicantLifetime": 15, "replicationThrottleLimit": 10, "emitBalancingStats": false, @@ -756,7 +756,8 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| -|`maxSegmentsToConsiderPerMove`|The maximum number of segments to consider when picking a segment to move. For a cluster with a very large number of segments, choosing to set this to a non-default value may make sense for an experienced admin. A value of `<= 0` will be reset to the default automatically. Currently, when picking a segment to move, the coordinator will iterate over all segments on each server. The coordinator starts its iterations on servers who have the least available capacity and finishes on the server with the most available capacity. Each segment considered for moving is more likely to be chosen to move compard to the segments considered after it. If you wanted to consider moving segments from only the top N servers in terms of segment cache consumption, you'd multiply N * (approximate number of segments per server in your cluster) and set this config to that value. As your cluster grows in both number of segments and nodes, you may need to revisit this value if using non-default value. Setting this config to a non-default value can save compute if you determine that you don't need every segment to be considered for every move.|Integer.MAX_VALUE| +|`percentOfSegmentsToConsiderPerMover`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (least available capacity first) and then iterates over the servers considering their segments for moving. The default config of 100% should make sense for most small to medium sized clusters. An admin for a cluster with 1MM segments could choose to only consider 500k of these segments for each move operation by overrriding this config to 50. +|`percentOfSegmentsToConsiderPerMove`|The maximum number of segments to consider when picking a segment to move. For a cluster with a very large number of segments, choosing to set this to a non-default value may make sense for an experienced admin. A value of `<= 0` will be reset to the default automatically. Currently, when picking a segment to move, the coordinator will iterate over all segments on each server. The coordinator starts its iterations on servers who have the least available capacity and finishes on the server with the most available capacity. Each segment considered for moving is more likely to be chosen to move compard to the segments considered after it. If you wanted to consider moving segments from only the top N servers in terms of segment cache consumption, you'd multiply N * (approximate number of segments per server in your cluster) and set this config to that value. As your cluster grows in both number of segments and nodes, you may need to revisit this value if using non-default value. Setting this config to a non-default value can save compute if you determine that you don't need every segment to be considered for every move.|Integer.MAX_VALUE| |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| From 3b042135fb261eced2ce09cc127ce568aa305e2b Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Thu, 3 Sep 2020 10:02:30 -0500 Subject: [PATCH 06/28] remove bad doc line --- docs/configuration/index.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index f8339c3618ab..f7123da56dd7 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -757,7 +757,6 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| |`percentOfSegmentsToConsiderPerMover`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (least available capacity first) and then iterates over the servers considering their segments for moving. The default config of 100% should make sense for most small to medium sized clusters. An admin for a cluster with 1MM segments could choose to only consider 500k of these segments for each move operation by overrriding this config to 50. -|`percentOfSegmentsToConsiderPerMove`|The maximum number of segments to consider when picking a segment to move. For a cluster with a very large number of segments, choosing to set this to a non-default value may make sense for an experienced admin. A value of `<= 0` will be reset to the default automatically. Currently, when picking a segment to move, the coordinator will iterate over all segments on each server. The coordinator starts its iterations on servers who have the least available capacity and finishes on the server with the most available capacity. Each segment considered for moving is more likely to be chosen to move compard to the segments considered after it. If you wanted to consider moving segments from only the top N servers in terms of segment cache consumption, you'd multiply N * (approximate number of segments per server in your cluster) and set this config to that value. As your cluster grows in both number of segments and nodes, you may need to revisit this value if using non-default value. Setting this config to a non-default value can save compute if you determine that you don't need every segment to be considered for every move.|Integer.MAX_VALUE| |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| From d69897e47cea1e4671dc3b7780e93b9419b85244 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Thu, 3 Sep 2020 10:03:26 -0500 Subject: [PATCH 07/28] fix typo in name of new dynamic config --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index f7123da56dd7..6bf3ba61cf8e 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -756,7 +756,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| -|`percentOfSegmentsToConsiderPerMover`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (least available capacity first) and then iterates over the servers considering their segments for moving. The default config of 100% should make sense for most small to medium sized clusters. An admin for a cluster with 1MM segments could choose to only consider 500k of these segments for each move operation by overrriding this config to 50. +|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (least available capacity first) and then iterates over the servers considering their segments for moving. The default config of 100% should make sense for most small to medium sized clusters. An admin for a cluster with 1MM segments could choose to only consider 500k of these segments for each move operation by overrriding this config to 50. |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| From 213599aaedf106581959acb9c2143499fad95062 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Thu, 3 Sep 2020 10:11:02 -0500 Subject: [PATCH 08/28] update RservoirSegmentSampler to gracefully deal with values > 100% --- .../druid/server/coordinator/ReservoirSegmentSampler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java index 7d1dd27ef325..a4863b643ef3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java @@ -88,7 +88,7 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder( // We have iterated over the alloted number of segments and will return the currently proposed segment or null // We will only break out early if we are iterating less than 100% of the total cluster segments - if (percentOfSegmentsToConsider != 100 && numSoFar >= calculatedSegmentLimit) { + if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) { log.debug("Breaking out of iteration over potential segments to move because we hit the limit [%d percent] of" + " segments to consider to move. Segments Iterated: [%s]", percentOfSegmentsToConsider, numSoFar); break; @@ -96,7 +96,7 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder( } // We have iterated over the alloted number of segments and will return the currently proposed segment or null // We will only break out early if we are iterating less than 100% of the total cluster segments - if (percentOfSegmentsToConsider != 100 && numSoFar >= calculatedSegmentLimit) { + if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) { break; } } From 8e5a1df98cd935979fc503222dc6e286fc431cf4 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Thu, 3 Sep 2020 13:16:49 -0500 Subject: [PATCH 09/28] add handler for <= 0 in ReservoirSegmentSampler --- .../druid/server/coordinator/ReservoirSegmentSampler.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java index a4863b643ef3..3289f0f2cc14 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java @@ -53,6 +53,13 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder( int calculatedSegmentLimit = Integer.MAX_VALUE; int numSoFar = 0; + // Reset a bad value of percentOfSegmentsToConsider to 100. We don't allow consideration of 0% or less of segments. + if (percentOfSegmentsToConsider <= 0) { + log.debug("Resetting percentOfSegmentsToConsider to 100 because only values from 1 to 100 are allowed." + + "You Provided [%d]", percentOfSegmentsToConsider); + percentOfSegmentsToConsider = 100; + } + // Calculate the integer limit for the number of segments to be considered for moving if % is less than 100 if (percentOfSegmentsToConsider < 100) { int totalSegments = 0; From b3f680dfc1dcf0418b6c8cc8d910e2211779f10e Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Thu, 3 Sep 2020 13:17:05 -0500 Subject: [PATCH 10/28] fixup CoordinatorDynamicConfigTest naming and argument ordering --- .../http/CoordinatorDynamicConfigTest.java | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 75df37062d2c..572f25b46363 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -44,7 +44,7 @@ public void testSerde() throws Exception + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" - + " \"maxPercentOfSegmentsToConsiderPerMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -67,19 +67,19 @@ public void testSerde() throws Exception ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, 1); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false, 1); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, 1); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, 1); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true); actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual); - assertConfig(actual, 1, 1, 1, 1, 10, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, 1); + assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true); } @Test @@ -90,7 +90,7 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" - + " \"maxPercentOfSegmentsToConsiderPerMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -110,13 +110,13 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio ); ImmutableSet decommissioning = ImmutableSet.of(); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false, 1); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false, 1); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, 1); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); } @Test @@ -127,7 +127,7 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" - + " \"maxPercentOfSegmentsToConsiderPerMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -153,6 +153,7 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception 1, 1, 1, + 1, 2, true, ImmutableSet.of("test1", "test2"), @@ -160,8 +161,7 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception 1, ImmutableSet.of(), 0, - false, - 1 + false ); } @@ -169,7 +169,7 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception public void testSerdeCorrectsInvalidBadMaxPercentOfSegmentsToConsiderPerMove() throws Exception { String jsonStr = "{\n" - + " \"maxPercentOfSegmentsToConsiderPerMove\": 0\n" + + " \"percentOfSegmentsToConsiderPerMove\": 0\n" + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( @@ -185,7 +185,7 @@ public void testSerdeCorrectsInvalidBadMaxPercentOfSegmentsToConsiderPerMove() t Assert.assertEquals(100, actual.getPercentOfSegmentsToConsiderPerMove()); jsonStr = "{\n" - + " \"maxPercentOfSegmentsToConsiderPerMove\": -100\n" + + " \"percentOfSegmentsToConsiderPerMove\": -100\n" + "}\n"; actual = mapper.readValue( @@ -209,7 +209,7 @@ public void testSerdeWithKillAllDataSources() throws Exception + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" - + " \"maxPercentOfSegmentsToConsiderPerMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -228,14 +228,14 @@ public void testSerdeWithKillAllDataSources() throws Exception CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false, 1); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false); //ensure whitelist is empty when killAllDataSources is true try { jsonStr = "{\n" + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" + " \"killAllDataSources\": true,\n" - + " \"maxPercentOfSegmentsToConsiderPerMove\": 1\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1\n" + "}\n"; mapper.readValue( jsonStr, @@ -257,7 +257,7 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti + " \"mergeBytesLimit\": 1,\n" + " \"mergeSegmentsLimit\" : 1,\n" + " \"maxSegmentsToMove\": 1,\n" - + " \"maxPercentOfSegmentsToConsiderPerMove\": 1,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" @@ -275,7 +275,7 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false, 1); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false); } @Test @@ -290,6 +290,7 @@ public void testBuilderDefaults() 100, 5, 100, + 15, 10, 1, false, @@ -298,8 +299,7 @@ public void testBuilderDefaults() 0, emptyList, 70, - false, - 15 + false ); } @@ -334,7 +334,8 @@ private void assertConfig( long expectedMergeBytesLimit, int expectedMergeSegmentsLimit, int expectedMaxSegmentsToMove, - int expectedMaxPercentOfSegmentsToConsiderPerMove, + int expectedPercentOfSegmentsToConsiderPerMove, + int expectedReplicantLifetime, int expectedReplicationThrottleLimit, int expectedBalancerComputeThreads, boolean expectedEmitingBalancingStats, @@ -343,8 +344,7 @@ private void assertConfig( int expectedMaxSegmentsInNodeLoadingQueue, Set decommissioningNodes, int decommissioningMaxPercentOfMaxSegmentsToMove, - boolean pauseCoordination, - int expectedReplicantLifetime + boolean pauseCoordination ) { Assert.assertEquals( @@ -354,7 +354,7 @@ private void assertConfig( Assert.assertEquals(expectedMergeBytesLimit, config.getMergeBytesLimit()); Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit()); Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove()); - Assert.assertEquals(expectedMaxPercentOfSegmentsToConsiderPerMove, config.getPercentOfSegmentsToConsiderPerMove()); + Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove, config.getPercentOfSegmentsToConsiderPerMove()); Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime()); Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit()); Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads()); From a8a7c4277f4a1654a319bdf2f9668578cb260f05 Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Tue, 8 Sep 2020 12:18:13 -0500 Subject: [PATCH 11/28] fix items in docs after spellcheck flags --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 6bf3ba61cf8e..fc3a8e6dd57d 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -756,7 +756,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| -|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (least available capacity first) and then iterates over the servers considering their segments for moving. The default config of 100% should make sense for most small to medium sized clusters. An admin for a cluster with 1MM segments could choose to only consider 500k of these segments for each move operation by overrriding this config to 50. +|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (least available capacity first) and then iterates over the servers considering their segments for moving. The default config of 100% should make sense for most small to medium sized clusters. An admin for a cluster with 1,000,000 segments could choose to only consider 500,000 of these segments for each move operation by overriding this config to 50. |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| From fb20cb2cad1efee921acbe44091255d8487b3b4c Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Tue, 8 Sep 2020 16:04:32 -0500 Subject: [PATCH 12/28] Fix lgtm flag on missing space in string literal --- .../druid/server/coordinator/ReservoirSegmentSampler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java index 3289f0f2cc14..6a4e0cca986a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java @@ -56,7 +56,7 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder( // Reset a bad value of percentOfSegmentsToConsider to 100. We don't allow consideration of 0% or less of segments. if (percentOfSegmentsToConsider <= 0) { log.debug("Resetting percentOfSegmentsToConsider to 100 because only values from 1 to 100 are allowed." - + "You Provided [%d]", percentOfSegmentsToConsider); + + " You Provided [%d]", percentOfSegmentsToConsider); percentOfSegmentsToConsider = 100; } From 85660be45588b0ac53dc3534faa2fc1937188ed6 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Thu, 29 Oct 2020 09:00:30 -0500 Subject: [PATCH 13/28] improve documentation for new config --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 42de7f697998..134df616d4ba 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -765,7 +765,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| -|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (least available capacity first) and then iterates over the servers considering their segments for moving. The default config of 100% should make sense for most small to medium sized clusters. An admin for a cluster with 1,000,000 segments could choose to only consider 500,000 of these segments for each move operation by overriding this config to 50. +|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.| |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| From f899ade562426c903aa92cb4f0e1a8f8a7a6a23d Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 30 Oct 2020 15:53:05 -0500 Subject: [PATCH 14/28] Add default value to config docs and add advice in cluster tuning doc --- docs/configuration/index.md | 2 +- docs/operations/basic-cluster-tuning.md | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 134df616d4ba..46766c5a16f6 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -765,7 +765,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| -|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.| +|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.|100| |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| diff --git a/docs/operations/basic-cluster-tuning.md b/docs/operations/basic-cluster-tuning.md index 5bfc00242cc3..897d4351f451 100644 --- a/docs/operations/basic-cluster-tuning.md +++ b/docs/operations/basic-cluster-tuning.md @@ -280,6 +280,23 @@ The heap requirements of the Coordinator scale with the number of servers, segme You can set the Coordinator heap to the same size as your Broker heap, or slightly smaller: both services have to process cluster-wide state and answer API requests about this state. +#### Dynamic Configuration + +`percentOfSegmentsToConsiderPerMove` +* The default value is 100. This means that the Coordinator will consider all segments when it is looking for a segment to move. The Coordinator makes a weighted choice, with segments on Servers with the least capacity being the most likely segments to be moved. + * This weighted selection strategy means that the segments on the servers who have the most available capacity are the least likely to be chosen. + * As the number of segments in the cluster increases, the probability of choosing the Nth segment to move decreases; where N is the last segment considered for moving. + * An admin can use this config to skip consideration of that Nth segment. +* Instead of skipping a precise amount of segments, we skip a percentage of segments in the cluster. + * For example, with the value set to 25, only the first 25% of segments will be considered as a segment that can be moved. This 25% of segments will come from the servers that have the least available capacity. + * In this example, each time the Coordinator looks for a segment to move, it will consider 75% less segments than it did when the configuration was 100. On clusters with 100s of thousands of segments, this can add up to meaningful coordination time savings. +* General recommendations for this configuration: + * If you are not worried about the amount of time it takes your Coordinator to complete a full coordination cycle, you likely do not need to modify this config. + * If you are frustrated with how long the Coordinator takes to run a full coordination cycle, and you have set the Coordinator dynamic config `maxSegmentsToMove` to a value above 0 (the default is 5), setting this config to a non-default value can help shorten coordination time. + * The recommended starting point value is 66. It represents a meaningful decraease in the percentage of segments considered while also not being too aggressive (You will consider 1/3 fewer segments per move operation with this value). +* The impact that modifying this config will have on your coordination time will be a function of how low you set the config value, the value for `maxSegmentsToMove` and the total number of segments in your cluster. + * If your cluster has a relatively small number of segments, or you choose to move few segments per coordination cycle, there may not be much savings to be had here. + ### Overlord The main performance-related setting on the Overlord is the heap size. From 848ac007d557347b039bd1611e1c30cb215661ea Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 30 Oct 2020 16:59:01 -0500 Subject: [PATCH 15/28] Add percentOfSegmentsToConsiderPerMove to web console coord config dialog --- .../coordinator-dynamic-config-dialog.tsx | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/web-console/src/dialogs/coordinator-dynamic-config-dialog/coordinator-dynamic-config-dialog.tsx b/web-console/src/dialogs/coordinator-dynamic-config-dialog/coordinator-dynamic-config-dialog.tsx index 8478ec6a39a4..6ae2ab3b7b91 100644 --- a/web-console/src/dialogs/coordinator-dynamic-config-dialog/coordinator-dynamic-config-dialog.tsx +++ b/web-console/src/dialogs/coordinator-dynamic-config-dialog/coordinator-dynamic-config-dialog.tsx @@ -264,6 +264,24 @@ export const CoordinatorDynamicConfigDialog = React.memo(function CoordinatorDyn ), }, + { + name: 'percentOfSegmentsToConsiderPerMove', + type: 'number', + defaultValue: 100, + info: ( + <> + The percentage of the total number of segments in the cluster that are considered + every time a segment needs to be selected for a move. Druid orders servers by + available capacity ascending (the least available capacity first) and then iterates + over the servers. For each server, Druid iterates over the segments on the server, + considering them for moving. The default config of 100% means that every segment on + every server is a candidate to be moved. This should make sense for most small to + medium-sized clusters. However, an admin may find it preferable to drop this value + lower if they don't think that it is worthwhile to consider every single segment in + the cluster each time it is looking for a segment to move. + + ), + }, ]} model={dynamicConfig} onChange={m => setDynamicConfig(m)} From 982b0167153fecb8b9e0701af9a7cb5f9c8c4af5 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Tue, 3 Nov 2020 10:06:14 -0600 Subject: [PATCH 16/28] update jest snapshot after console change --- .../coordinator-dynamic-config-dialog.spec.tsx.snap | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap index dc3cdacd610e..238c2fad9ee0 100644 --- a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap @@ -165,6 +165,14 @@ exports[`coordinator dynamic config matches snapshot 1`] = ` "name": "decommissioningMaxPercentOfMaxSegmentsToMove", "type": "number", }, + Object { + "defaultValue": 100, + "info": + The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move. + , + "name": "percentOfSegmentsToConsiderPerMove", + "type": "number", + }, ] } model={Object {}} From cce72eb3fc67e1980ef45f2598068cee7f7e5b38 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Tue, 3 Nov 2020 10:07:56 -0600 Subject: [PATCH 17/28] fix spell checker errors --- docs/operations/basic-cluster-tuning.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/operations/basic-cluster-tuning.md b/docs/operations/basic-cluster-tuning.md index 897d4351f451..02a9b6cd271d 100644 --- a/docs/operations/basic-cluster-tuning.md +++ b/docs/operations/basic-cluster-tuning.md @@ -289,11 +289,11 @@ You can set the Coordinator heap to the same size as your Broker heap, or slight * An admin can use this config to skip consideration of that Nth segment. * Instead of skipping a precise amount of segments, we skip a percentage of segments in the cluster. * For example, with the value set to 25, only the first 25% of segments will be considered as a segment that can be moved. This 25% of segments will come from the servers that have the least available capacity. - * In this example, each time the Coordinator looks for a segment to move, it will consider 75% less segments than it did when the configuration was 100. On clusters with 100s of thousands of segments, this can add up to meaningful coordination time savings. + * In this example, each time the Coordinator looks for a segment to move, it will consider 75% less segments than it did when the configuration was 100. On clusters with hundreds of thousands of segments, this can add up to meaningful coordination time savings. * General recommendations for this configuration: * If you are not worried about the amount of time it takes your Coordinator to complete a full coordination cycle, you likely do not need to modify this config. * If you are frustrated with how long the Coordinator takes to run a full coordination cycle, and you have set the Coordinator dynamic config `maxSegmentsToMove` to a value above 0 (the default is 5), setting this config to a non-default value can help shorten coordination time. - * The recommended starting point value is 66. It represents a meaningful decraease in the percentage of segments considered while also not being too aggressive (You will consider 1/3 fewer segments per move operation with this value). + * The recommended starting point value is 66. It represents a meaningful decrease in the percentage of segments considered while also not being too aggressive (You will consider 1/3 fewer segments per move operation with this value). * The impact that modifying this config will have on your coordination time will be a function of how low you set the config value, the value for `maxSegmentsToMove` and the total number of segments in your cluster. * If your cluster has a relatively small number of segments, or you choose to move few segments per coordination cycle, there may not be much savings to be had here. From 1c8e9da06a6455bd461e16556c95e0b5cd6e3ee1 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Tue, 3 Nov 2020 12:15:03 -0600 Subject: [PATCH 18/28] Improve debug logging in getRandomSegmentBalancerHolder to cover all bad inputs for % of segments to consider --- .../druid/server/coordinator/ReservoirSegmentSampler.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java index 6a4e0cca986a..a4cdfe5acd16 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java @@ -53,8 +53,9 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder( int calculatedSegmentLimit = Integer.MAX_VALUE; int numSoFar = 0; - // Reset a bad value of percentOfSegmentsToConsider to 100. We don't allow consideration of 0% or less of segments. - if (percentOfSegmentsToConsider <= 0) { + // Reset a bad value of percentOfSegmentsToConsider to 100. We don't allow consideration less than or equal to + // 0% of segments or greater than 100% of segments. + if (percentOfSegmentsToConsider <= 0 || percentOfSegmentsToConsider > 100) { log.debug("Resetting percentOfSegmentsToConsider to 100 because only values from 1 to 100 are allowed." + " You Provided [%d]", percentOfSegmentsToConsider); percentOfSegmentsToConsider = 100; From bf5907491dc33809c150f1c26140f3dae951dfd5 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 18 Dec 2020 17:19:09 -0600 Subject: [PATCH 19/28] add new config back to web console module after merge with master --- ...dinator-dynamic-config-dialog.spec.tsx.snap | 8 ++++++++ .../coordinator-dynamic-config.tsx | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap index 50f17d44ee65..ff3008c61146 100644 --- a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap @@ -167,6 +167,14 @@ exports[`coordinator dynamic config matches snapshot 1`] = ` "name": "decommissioningMaxPercentOfMaxSegmentsToMove", "type": "number", }, + Object { + "defaultValue": 100, + "info": + The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move. + , + "name": "percentOfSegmentsToConsiderPerMove", + "type": "number", + }, Object { "defaultValue": false, "info": diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index b5621ce31f8a..11b613c06807 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -193,6 +193,24 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ ), }, + { + name: 'percentOfSegmentsToConsiderPerMove', + type: 'number', + defaultValue: 100, + info: ( + <> + The percentage of the total number of segments in the cluster that are considered + every time a segment needs to be selected for a move. Druid orders servers by + available capacity ascending (the least available capacity first) and then iterates + over the servers. For each server, Druid iterates over the segments on the server, + considering them for moving. The default config of 100% means that every segment on + every server is a candidate to be moved. This should make sense for most small to + medium-sized clusters. However, an admin may find it preferable to drop this value + lower if they don't think that it is worthwhile to consider every single segment in + the cluster each time it is looking for a segment to move. + + ), + }, { name: 'pauseCoordination', type: 'boolean', From dc3f80af30d215ef231b19e183494c9aff562348 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 18 Dec 2020 17:19:21 -0600 Subject: [PATCH 20/28] fix ReservoirSegmentSamplerTest --- .../ReservoirSegmentSamplerTest.java | 25 ++++++------------- 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java index 91bfb0790865..f59f24274038 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java @@ -195,35 +195,24 @@ public void getRandomBalancerSegmentHolderTest() @Test public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit() { - EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce(); - EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce(); - EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); - EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce(); + int iterations = 5000; + + EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).times(iterations); ImmutableDruidServerTests.expectSegments(druidServer1, segments1); - EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer1); - EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce(); - EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce(); - EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes(); - EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce(); - EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).times(iterations); ImmutableDruidServerTests.expectSegments(druidServer2, segments2); - EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer2); - EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce(); - EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce(); - EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes(); - EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce(); - EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce(); + EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).times(iterations); ImmutableDruidServerTests.expectSegments(druidServer3, segments3); - EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); EasyMock.replay(druidServer3); ImmutableDruidServerTests.expectSegments(druidServer4, segments4); EasyMock.replay(druidServer4); + // Have to use anyTimes() because the number of times a segment on a given server is chosen is indetermistic. EasyMock.expect(holder1.getServer()).andReturn(druidServer1).anyTimes(); EasyMock.replay(holder1); EasyMock.expect(holder2.getServer()).andReturn(druidServer2).anyTimes(); @@ -241,7 +230,7 @@ public void getRandomBalancerSegmentHolderTestSegmentsToConsiderLimit() holderList.add(holder4); Map segmentCountMap = new HashMap<>(); - for (int i = 0; i < 5000; i++) { + for (int i = 0; i < iterations; i++) { segmentCountMap.put( ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 75).getSegment(), 1 ); From 9ac54ffddbd6ada5f4c4dda4677cde3f1a383673 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 18 Dec 2020 17:30:43 -0600 Subject: [PATCH 21/28] fix line breaks in coordinator console dialog --- .../druid-models/coordinator-dynamic-config.tsx | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index 11b613c06807..f2fa7d0c8ec9 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -199,15 +199,14 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ defaultValue: 100, info: ( <> - The percentage of the total number of segments in the cluster that are considered - every time a segment needs to be selected for a move. Druid orders servers by - available capacity ascending (the least available capacity first) and then iterates - over the servers. For each server, Druid iterates over the segments on the server, - considering them for moving. The default config of 100% means that every segment on - every server is a candidate to be moved. This should make sense for most small to - medium-sized clusters. However, an admin may find it preferable to drop this value - lower if they don't think that it is worthwhile to consider every single segment in - the cluster each time it is looking for a segment to move. + The percentage of the total number of segments in the cluster that are considered every time + a segment needs to be selected for a move. Druid orders servers by available capacity + ascending (the least available capacity first) and then iterates over the servers. For each + server, Druid iterates over the segments on the server, considering them for moving. The + default config of 100% means that every segment on every server is a candidate to be moved. + This should make sense for most small to medium-sized clusters. However, an admin may find + it preferable to drop this value lower if they don't think that it is worthwhile to consider + every single segment in the cluster each time it is looking for a segment to move. ), }, From 9197bb0b1738be30f79016dab9837002a8056152 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 18 Dec 2020 18:42:24 -0600 Subject: [PATCH 22/28] Add a test that helps ensure not regressions for percentOfSegmentsToConsiderPerMove --- .../coordinator/BalanceSegmentsTest.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index c57e140d5112..810c640f8ce1 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -536,6 +536,76 @@ public void testRun2() Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); } + /** + * Testing that the dynamic coordinator config value, percentOfSegmentsToConsiderPerMove, is honored when calling + * out to pickSegmentToMove. This config limits the number of segments that are considered when looking for a segment + * to move. + */ + @Test + public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove() + { + mockDruidServer(druidServer1, "1", "normal", 50L, 100L, Arrays.asList(segment1, segment2)); + mockDruidServer(druidServer2, "2", "normal", 30L, 100L, Arrays.asList(segment3, segment4)); + mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyList()); + + EasyMock.replay(druidServer4); + + mockCoordinator(coordinator); + + BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); + + // The first call for decommissioning servers + EasyMock.expect( + strategy.pickSegmentToMove( + ImmutableList.of(), + broadcastDatasources, + 40 + ) + ) + .andReturn(null); + + // The second call for the single non decommissioning server move + EasyMock.expect( + strategy.pickSegmentToMove( + ImmutableList.of( + new ServerHolder(druidServer3, peon3, false), + new ServerHolder(druidServer2, peon2, false), + new ServerHolder(druidServer1, peon1, false) + ), + broadcastDatasources, + 40 + ) + ) + .andReturn(new BalancerSegmentHolder(druidServer2, segment3)); + + EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(new ServerHolder(druidServer3, peon3)) + .anyTimes(); + EasyMock.replay(strategy); + + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder( + ImmutableList.of(druidServer1, druidServer2, druidServer3), + ImmutableList.of(peon1, peon2, peon3), + ImmutableList.of(false, false, false) + ) + .withDynamicConfigs( + CoordinatorDynamicConfig.builder() + .withMaxSegmentsToMove(1) + .withPercentOfSegmentsToConsiderPerMove(40) + .build() + ) + .withBalancerStrategy(strategy) + .withBroadcastDatasources(broadcastDatasources) + .build(); + + params = new BalanceSegmentsTester(coordinator).run(params); + Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); + Assert.assertThat( + peon3.getSegmentsToLoad(), + Matchers.is(Matchers.equalTo(ImmutableSet.of(segment3))) + ); + } + private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder( List druidServers, List peons From 6f24d44757ea227aae61f6fd0ea7c7fb1d54ae4f Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 18 Dec 2020 22:01:24 -0600 Subject: [PATCH 23/28] Make improvements based off of feedback in review --- .../server/coordinator/BalancerStrategy.java | 2 +- .../coordinator/CoordinatorDynamicConfig.java | 16 ++-- .../coordinator/CostBalancerStrategy.java | 2 +- .../coordinator/RandomBalancerStrategy.java | 2 +- .../coordinator/ReservoirSegmentSampler.java | 12 +-- .../coordinator/BalanceSegmentsTest.java | 2 +- .../http/CoordinatorDynamicConfigTest.java | 84 +++++++++++++------ 7 files changed, 75 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java index e988e2383721..db451693674a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java @@ -75,7 +75,7 @@ public interface BalancerStrategy BalancerSegmentHolder pickSegmentToMove( List serverHolders, Set broadcastDatasources, - int percentOfSegmentsToConsider + double percentOfSegmentsToConsider ); /** diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index d21babbc87a3..6f3c68c2cbd7 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -53,7 +53,7 @@ public class CoordinatorDynamicConfig private final long mergeBytesLimit; private final int mergeSegmentsLimit; private final int maxSegmentsToMove; - private final int percentOfSegmentsToConsiderPerMove; + private final double percentOfSegmentsToConsiderPerMove; private final int replicantLifetime; private final int replicationThrottleLimit; private final int balancerComputeThreads; @@ -96,7 +96,7 @@ public CoordinatorDynamicConfig( @JsonProperty("mergeBytesLimit") long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove, - @JsonProperty("percentOfSegmentsToConsiderPerMove") int percentOfSegmentsToConsiderPerMove, + @JsonProperty("percentOfSegmentsToConsiderPerMove") double percentOfSegmentsToConsiderPerMove, @JsonProperty("replicantLifetime") int replicantLifetime, @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit, @JsonProperty("balancerComputeThreads") int balancerComputeThreads, @@ -125,12 +125,10 @@ public CoordinatorDynamicConfig( this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; - // This helps with ease of migration to the new config, but could confuse users. Docs explicitly state this value must be > 0 - if (percentOfSegmentsToConsiderPerMove <= 0 || percentOfSegmentsToConsiderPerMove > 100) { - this.percentOfSegmentsToConsiderPerMove = 100; - } else { - this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; - } + + Preconditions.checkArgument(percentOfSegmentsToConsiderPerMove > 0 && percentOfSegmentsToConsiderPerMove <= 100, "percentOfSegmentsToConsiderPerMove should be between 1 and 100!"); + this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; + this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; this.balancerComputeThreads = Math.max(balancerComputeThreads, 1); @@ -220,7 +218,7 @@ public int getMaxSegmentsToMove() } @JsonProperty - public int getPercentOfSegmentsToConsiderPerMove() + public double getPercentOfSegmentsToConsiderPerMove() { return percentOfSegmentsToConsiderPerMove; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java index 0142b266d902..ac56544b0730 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java @@ -215,7 +215,7 @@ static double computeJointSegmentsCost(final DataSegment segment, final Iterable public BalancerSegmentHolder pickSegmentToMove( final List serverHolders, Set broadcastDatasources, - int percentOfSegmentsToConsider + double percentOfSegmentsToConsider ) { return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java index 839bff4b7e23..8f3b96d67ab2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java @@ -57,7 +57,7 @@ public ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List public BalancerSegmentHolder pickSegmentToMove( List serverHolders, Set broadcastDatasources, - int percentOfSegmentsToConsider + double percentOfSegmentsToConsider ) { return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java index a4cdfe5acd16..e0d74aebeb6e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java @@ -45,7 +45,7 @@ final class ReservoirSegmentSampler static BalancerSegmentHolder getRandomBalancerSegmentHolder( final List serverHolders, Set broadcastDatasources, - int percentOfSegmentsToConsider + double percentOfSegmentsToConsider ) { ServerHolder fromServerHolder = null; @@ -56,8 +56,8 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder( // Reset a bad value of percentOfSegmentsToConsider to 100. We don't allow consideration less than or equal to // 0% of segments or greater than 100% of segments. if (percentOfSegmentsToConsider <= 0 || percentOfSegmentsToConsider > 100) { - log.debug("Resetting percentOfSegmentsToConsider to 100 because only values from 1 to 100 are allowed." - + " You Provided [%d]", percentOfSegmentsToConsider); + log.warn("Resetting percentOfSegmentsToConsider to 100 because only values from 1 to 100 are allowed." + + " You Provided [%f]", percentOfSegmentsToConsider); percentOfSegmentsToConsider = 100; } @@ -70,7 +70,7 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder( // If totalSegments are zero, we will assume it is a mistake and move on to iteration without updating // calculatedSegmentLimit if (totalSegments != 0) { - calculatedSegmentLimit = (int) Math.ceil((double) totalSegments * ((double) percentOfSegmentsToConsider / (double) 100)); + calculatedSegmentLimit = (int) Math.ceil((double) totalSegments * (percentOfSegmentsToConsider / 100.0)); } } @@ -97,8 +97,8 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder( // We have iterated over the alloted number of segments and will return the currently proposed segment or null // We will only break out early if we are iterating less than 100% of the total cluster segments if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) { - log.debug("Breaking out of iteration over potential segments to move because we hit the limit [%d percent] of" - + " segments to consider to move. Segments Iterated: [%s]", percentOfSegmentsToConsider, numSoFar); + log.debug("Breaking out of iteration over potential segments to move because we hit the limit [%f percent] of" + + " segments to consider to move. Segments Iterated: [%d]", percentOfSegmentsToConsider, numSoFar); break; } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index 810c640f8ce1..26175f613216 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -719,7 +719,7 @@ public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, Li public BalancerSegmentHolder pickSegmentToMove( List serverHolders, Set broadcastDatasources, - int percentOfSegmentsToConsider + double percentOfSegmentsToConsider ) { return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size()); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 572f25b46363..3955e1bb699d 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; +import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.IAE; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; @@ -168,37 +169,68 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception @Test public void testSerdeCorrectsInvalidBadMaxPercentOfSegmentsToConsiderPerMove() throws Exception { - String jsonStr = "{\n" - + " \"percentOfSegmentsToConsiderPerMove\": 0\n" - + "}\n"; + try { + String jsonStr = "{\n" + + " \"percentOfSegmentsToConsiderPerMove\": 0\n" + + "}\n"; - CoordinatorDynamicConfig actual = mapper.readValue( - mapper.writeValueAsString( - mapper.readValue( - jsonStr, - CoordinatorDynamicConfig.class - ) - ), - CoordinatorDynamicConfig.class - ); + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); - Assert.assertEquals(100, actual.getPercentOfSegmentsToConsiderPerMove()); + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } - jsonStr = "{\n" - + " \"percentOfSegmentsToConsiderPerMove\": -100\n" - + "}\n"; + try { + String jsonStr = "{\n" + + " \"percentOfSegmentsToConsiderPerMove\": -100\n" + + "}\n"; - actual = mapper.readValue( - mapper.writeValueAsString( - mapper.readValue( - jsonStr, - CoordinatorDynamicConfig.class - ) - ), - CoordinatorDynamicConfig.class - ); + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); - Assert.assertEquals(100, actual.getPercentOfSegmentsToConsiderPerMove()); + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } + + try { + String jsonStr = "{\n" + + " \"percentOfSegmentsToConsiderPerMove\": 105\n" + + "}\n"; + + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } } @Test From 62962e0bcfe0afb57440f9ed1317db566066a3fa Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 18 Dec 2020 22:06:56 -0600 Subject: [PATCH 24/28] additional cleanup coming from review --- .../coordinator/CoordinatorDynamicConfig.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 6f3c68c2cbd7..38e2539f913c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -126,7 +126,10 @@ public CoordinatorDynamicConfig( this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; - Preconditions.checkArgument(percentOfSegmentsToConsiderPerMove > 0 && percentOfSegmentsToConsiderPerMove <= 100, "percentOfSegmentsToConsiderPerMove should be between 1 and 100!"); + Preconditions.checkArgument( + percentOfSegmentsToConsiderPerMove > 0 && percentOfSegmentsToConsiderPerMove <= 100, + "percentOfSegmentsToConsiderPerMove should be between 1 and 100!" + ); this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; this.replicantLifetime = replicantLifetime; @@ -439,7 +442,7 @@ public static class Builder private Long mergeBytesLimit; private Integer mergeSegmentsLimit; private Integer maxSegmentsToMove; - private Integer percentOfSegmentsToConsiderPerMove; + private Double percentOfSegmentsToConsiderPerMove; private Integer replicantLifetime; private Integer replicationThrottleLimit; private Boolean emitBalancingStats; @@ -463,7 +466,7 @@ public Builder( @JsonProperty("mergeBytesLimit") @Nullable Long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove, - @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Integer percentOfSegmentsToConsiderPerMove, + @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, @JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime, @JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit, @JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads, @@ -483,6 +486,10 @@ public Builder( this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; + Preconditions.checkArgument( + percentOfSegmentsToConsiderPerMove > 0 && percentOfSegmentsToConsiderPerMove <= 100, + "percentOfSegmentsToConsiderPerMove should be between 1 and 100!" + ); this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; @@ -521,7 +528,7 @@ public Builder withMaxSegmentsToMove(int maxSegmentsToMove) return this; } - public Builder withPercentOfSegmentsToConsiderPerMove(int percentOfSegmentsToConsiderPerMove) + public Builder withPercentOfSegmentsToConsiderPerMove(double percentOfSegmentsToConsiderPerMove) { this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; return this; From 93bed105b71da265866bbf5bbc6dd41d5fa5218f Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Mon, 21 Dec 2020 10:14:36 -0600 Subject: [PATCH 25/28] Add a warning log if limit on segments to consider for move can't be calcluated --- .../druid/server/coordinator/ReservoirSegmentSampler.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java index e0d74aebeb6e..dd43760ec874 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java @@ -71,6 +71,9 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder( // calculatedSegmentLimit if (totalSegments != 0) { calculatedSegmentLimit = (int) Math.ceil((double) totalSegments * (percentOfSegmentsToConsider / 100.0)); + } else { + log.warn("Unable to calculate limit on segments to consider because ServerHolder collection indicates" + + " zero segments existing in the cluster."); } } From d76c92487e4e70c75633ab0ead3670e6152af3cb Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Mon, 21 Dec 2020 10:24:53 -0600 Subject: [PATCH 26/28] remove unused import --- .../apache/druid/server/http/CoordinatorDynamicConfigTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 3955e1bb699d..18effc062de6 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; -import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.IAE; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; From e30ddf3e9247968a4dc468a1a18f7e3b2a453000 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Mon, 21 Dec 2020 10:31:39 -0600 Subject: [PATCH 27/28] fix tests for CoordinatorDynamicConfig --- .../apache/druid/server/http/CoordinatorDynamicConfigTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 18effc062de6..15c49a2de1af 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -385,7 +385,7 @@ private void assertConfig( Assert.assertEquals(expectedMergeBytesLimit, config.getMergeBytesLimit()); Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit()); Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove()); - Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove, config.getPercentOfSegmentsToConsiderPerMove()); + Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove, config.getPercentOfSegmentsToConsiderPerMove(), 0); Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime()); Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit()); Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads()); From 43766ed659a84e547fb0f25dc4a82ea5692b6788 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Mon, 21 Dec 2020 10:31:53 -0600 Subject: [PATCH 28/28] remove precondition test that is redundant in CoordinatorDynamicConfig Builder class --- .../druid/server/coordinator/CoordinatorDynamicConfig.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 38e2539f913c..4415f6a8ce9c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -486,10 +486,6 @@ public Builder( this.mergeBytesLimit = mergeBytesLimit; this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; - Preconditions.checkArgument( - percentOfSegmentsToConsiderPerMove > 0 && percentOfSegmentsToConsiderPerMove <= 100, - "percentOfSegmentsToConsiderPerMove should be between 1 and 100!" - ); this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit;