From 9f7e4eefdb73a0841e0a58bd7c757a9118e15086 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 16 Apr 2021 12:40:38 -0500 Subject: [PATCH 1/8] lay the groundwork for throttling replicant loads per RunRules execution --- .../server/coordinator/ReplicationThrottler.java | 14 +++++++++++++- .../druid/server/coordinator/duty/RunRules.java | 7 +++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java index 2c41cd1ddd9f..cbc94a640389 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java @@ -41,9 +41,11 @@ public class ReplicationThrottler private volatile int maxReplicants; private volatile int maxLifetime; + private volatile boolean loadPrimaryReplicantsOnly; public ReplicationThrottler(int maxReplicants, int maxLifetime) { + this.loadPrimaryReplicantsOnly = false; updateParams(maxReplicants, maxLifetime); } @@ -58,6 +60,16 @@ public void updateReplicationState(String tier) update(tier, currentlyReplicating, replicatingLookup, "create"); } + public boolean isLoadPrimaryReplicantsOnly() + { + return loadPrimaryReplicantsOnly; + } + + public void setLoadPrimaryReplicantsOnly(boolean loadPrimaryReplicantsOnly) + { + this.loadPrimaryReplicantsOnly = loadPrimaryReplicantsOnly; + } + private void update(String tier, ReplicatorSegmentHolder holder, Map lookup, String type) { int size = holder.getNumProcessing(tier); @@ -87,7 +99,7 @@ private void update(String tier, ReplicatorSegmentHolder holder, Map Integer.MAX_VALUE) { + log.info( + "Maximum number of non-primary replicants [%d] have been loaded for the current RunRules execution. Only loading primary replicants from here on.", + Integer.MAX_VALUE + ); + paramsWithReplicationManager.getReplicationManager().setLoadPrimaryReplicantsOnly(true); + } foundMatchingRule = true; break; } From eb391c1eff4a626213b498750f6035ec758f94f7 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 16 Apr 2021 17:36:32 -0500 Subject: [PATCH 2/8] Add dynamic coordinator config to control new replicant threshold. --- .../coordinator/CoordinatorDynamicConfig.java | 48 +++++++++++++++++-- .../coordinator/ReplicationThrottler.java | 7 +-- .../server/coordinator/duty/RunRules.java | 17 +++++-- .../server/coordinator/rules/LoadRule.java | 6 +++ .../coordinator/BalanceSegmentsProfiler.java | 2 +- .../server/coordinator/RunRulesTest.java | 4 +- .../http/CoordinatorDynamicConfigTest.java | 42 +++++++++------- ...inator-dynamic-config-dialog.spec.tsx.snap | 8 ++++ .../coordinator-dynamic-config.tsx | 15 ++++++ 9 files changed, 116 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 6e22b4ff91e5..24b53a354a2e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -96,6 +96,13 @@ public class CoordinatorDynamicConfig */ private final boolean replicateAfterLoadTimeout; + /** + * This is the maximum number of non-primary segment replicants to load per Coordination run. This number can + * be set to put a hard upper limit on the number of replicants loaded. It is a tool that can help prevent + * long delays in new data loads after events such as a Historical server leaving the cluster. + */ + private final int maxNonPrimaryReplicantsToLoad; + private static final Logger log = new Logger(CoordinatorDynamicConfig.class); @JsonCreator @@ -129,7 +136,8 @@ public CoordinatorDynamicConfig( @JsonProperty("decommissioningNodes") Object decommissioningNodes, @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove, @JsonProperty("pauseCoordination") boolean pauseCoordination, - @JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout + @JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout, + @JsonProperty("maxNonPrimaryReplicantsToLoad") int maxNonPrimaryReplicantsToLoad ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -176,6 +184,12 @@ public CoordinatorDynamicConfig( } this.pauseCoordination = pauseCoordination; this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; + + Preconditions.checkArgument( + maxNonPrimaryReplicantsToLoad >= 0, + "maxNonPrimaryReplicantsToLoad must be greater than or equal to 0." + ); + this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad; } private static Set parseJsonStringOrArray(Object jsonStringOrArray) @@ -336,6 +350,13 @@ public boolean getReplicateAfterLoadTimeout() return replicateAfterLoadTimeout; } + @Min(0) + @JsonProperty + public int getMaxNonPrimaryReplicantsToLoad() + { + return maxNonPrimaryReplicantsToLoad; + } + @Override public String toString() { @@ -358,6 +379,7 @@ public String toString() ", decommissioningMaxPercentOfMaxSegmentsToMove=" + decommissioningMaxPercentOfMaxSegmentsToMove + ", pauseCoordination=" + pauseCoordination + ", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout + + ", maxNonPrimaryReplicantsToLoad=" + maxNonPrimaryReplicantsToLoad + '}'; } @@ -422,6 +444,9 @@ public boolean equals(Object o) if (replicateAfterLoadTimeout != that.replicateAfterLoadTimeout) { return false; } + if (maxNonPrimaryReplicantsToLoad != that.maxNonPrimaryReplicantsToLoad) { + return false; + } return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove; } @@ -444,7 +469,8 @@ public int hashCode() dataSourcesToNotKillStalePendingSegmentsIn, decommissioningNodes, decommissioningMaxPercentOfMaxSegmentsToMove, - pauseCoordination + pauseCoordination, + maxNonPrimaryReplicantsToLoad ); } @@ -470,6 +496,7 @@ public static class Builder private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70; private static final boolean DEFAULT_PAUSE_COORDINATION = false; private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false; + private static final int DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE; private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments; private Long mergeBytesLimit; @@ -488,6 +515,7 @@ public static class Builder private Integer decommissioningMaxPercentOfMaxSegmentsToMove; private Boolean pauseCoordination; private Boolean replicateAfterLoadTimeout; + private Integer maxNonPrimaryReplicantsToLoad; public Builder() { @@ -513,7 +541,8 @@ public Builder( @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove, @JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination, - @JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout + @JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout, + @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -534,6 +563,7 @@ public Builder( this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove; this.pauseCoordination = pauseCoordination; this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; + this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad; } public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis) @@ -632,6 +662,12 @@ public Builder withReplicateAfterLoadTimeout(boolean replicateAfterLoadTimeout) return this; } + public Builder withMaxNonPrimaryReplicantsToLoad(int maxNonPrimaryReplicantsToLoad) + { + this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad; + return this; + } + public CoordinatorDynamicConfig build() { return new CoordinatorDynamicConfig( @@ -660,7 +696,8 @@ public CoordinatorDynamicConfig build() ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT : decommissioningMaxPercentOfMaxSegmentsToMove, pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination, - replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout + replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout, + maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD : maxNonPrimaryReplicantsToLoad ); } @@ -695,7 +732,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove() : decommissioningMaxPercentOfMaxSegmentsToMove, pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination, - replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout + replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout, + maxNonPrimaryReplicantsToLoad == null ? defaults.getMaxNonPrimaryReplicantsToLoad() : maxNonPrimaryReplicantsToLoad ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java index cbc94a640389..a6aaee0bb5ec 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java @@ -43,16 +43,17 @@ public class ReplicationThrottler private volatile int maxLifetime; private volatile boolean loadPrimaryReplicantsOnly; - public ReplicationThrottler(int maxReplicants, int maxLifetime) + public ReplicationThrottler(int maxReplicants, int maxLifetime, boolean loadPrimaryReplicantsOnly) { this.loadPrimaryReplicantsOnly = false; - updateParams(maxReplicants, maxLifetime); + updateParams(maxReplicants, maxLifetime, loadPrimaryReplicantsOnly); } - public void updateParams(int maxReplicants, int maxLifetime) + public void updateParams(int maxReplicants, int maxLifetime, boolean loadPrimaryReplicantsOnly) { this.maxReplicants = maxReplicants; this.maxLifetime = maxLifetime; + this.loadPrimaryReplicantsOnly = loadPrimaryReplicantsOnly; } public void updateReplicationState(String tier) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java index 9f608e9fab12..1425e2ce4941 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java @@ -55,7 +55,8 @@ public RunRules(DruidCoordinator coordinator) this( new ReplicationThrottler( coordinator.getDynamicConfigs().getReplicationThrottleLimit(), - coordinator.getDynamicConfigs().getReplicantLifetime() + coordinator.getDynamicConfigs().getReplicantLifetime(), + false ), coordinator ); @@ -72,7 +73,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { replicatorThrottler.updateParams( coordinator.getDynamicConfigs().getReplicationThrottleLimit(), - coordinator.getDynamicConfigs().getReplicantLifetime() + coordinator.getDynamicConfigs().getReplicantLifetime(), + false ); CoordinatorStats stats = new CoordinatorStats(); @@ -128,14 +130,19 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) boolean foundMatchingRule = false; for (Rule rule : rules) { if (rule.appliesTo(segment, now)) { - stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment)); - if (stats.getGlobalStat("totalNonPrimaryReplicantsLoaded") > Integer.MAX_VALUE) { + if ( + stats.getGlobalStat( + "totalNonPrimaryReplicantsLoaded") >= paramsWithReplicationManager.getCoordinatorDynamicConfig() + .getMaxNonPrimaryReplicantsToLoad() + && !paramsWithReplicationManager.getReplicationManager().isLoadPrimaryReplicantsOnly() + ) { log.info( "Maximum number of non-primary replicants [%d] have been loaded for the current RunRules execution. Only loading primary replicants from here on.", - Integer.MAX_VALUE + paramsWithReplicationManager.getCoordinatorDynamicConfig().getMaxNonPrimaryReplicantsToLoad() ); paramsWithReplicationManager.getReplicationManager().setLoadPrimaryReplicantsOnly(true); } + stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment)); foundMatchingRule = true; break; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java index ddf4a8536a07..e34cd98772a2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java @@ -57,6 +57,7 @@ public abstract class LoadRule implements Rule private static final EmittingLogger log = new EmittingLogger(LoadRule.class); static final String ASSIGNED_COUNT = "assignedCount"; static final String DROPPED_COUNT = "droppedCount"; + public final String NON_PRIMARY_ASSIGNED_COUNT = "totalNonPrimaryReplicantsLoaded"; public static final String REQUIRED_CAPACITY = "requiredCapacity"; private final Object2IntMap targetReplicants = new Object2IntOpenHashMap<>(); @@ -180,6 +181,10 @@ private void assign( createLoadQueueSizeLimitingPredicate(params).and(holder -> !holder.equals(primaryHolderToLoad)), segment ); + + // numAssigned - 1 because we don't want to count the primary assignment + stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned - 1); + stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned); // do assign replicas for the other tiers. @@ -305,6 +310,7 @@ private void assignReplicas( createLoadQueueSizeLimitingPredicate(params), segment ); + stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned); stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java index 5d53a53fd106..87dca60a29de 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java @@ -148,7 +148,7 @@ public void bigProfiler() ) .withEmitter(emitter) .withDatabaseRuleManager(manager) - .withReplicationManager(new ReplicationThrottler(2, 500)) + .withReplicationManager(new ReplicationThrottler(2, 500, false)) .build(); BalanceSegmentsTester tester = new BalanceSegmentsTester(coordinator); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index 344fec40fe6a..1751bc9b79ed 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -100,7 +100,7 @@ public void setUp() start = start.plusHours(1); } - ruleRunner = new RunRules(new ReplicationThrottler(24, 1), coordinator); + ruleRunner = new RunRules(new ReplicationThrottler(24, 1, false), coordinator); } @After @@ -906,7 +906,7 @@ public void testReplicantThrottleAcrossTiers() DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy).build(); - RunRules runner = new RunRules(new ReplicationThrottler(7, 1), coordinator); + RunRules runner = new RunRules(new ReplicationThrottler(7, 1, false), coordinator); DruidCoordinatorRuntimeParams afterParams = runner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index d95abcd482a5..4f0c06013c5c 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -54,7 +54,8 @@ public void testSerde() throws Exception + " \"decommissioningNodes\": [\"host1\", \"host2\"],\n" + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n" + " \"pauseCoordination\": false,\n" - + " \"replicateAfterLoadTimeout\": false\n" + + " \"replicateAfterLoadTimeout\": false,\n" + + " \"maxNonPrimaryReplicantsToLoad\": 2147483647\n" + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( @@ -68,22 +69,25 @@ public void testSerde() throws Exception ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false, Integer.MAX_VALUE); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false, false, Integer.MAX_VALUE); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false, Integer.MAX_VALUE); actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false, Integer.MAX_VALUE); actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual); - assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false); + assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false, Integer.MAX_VALUE); actual = CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual); - assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, true); + assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, true, Integer.MAX_VALUE); + + actual = CoordinatorDynamicConfig.builder().withMaxNonPrimaryReplicantsToLoad(10).build(actual); + assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, true, 10); } @@ -114,13 +118,13 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio ); ImmutableSet decommissioning = ImmutableSet.of(); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false, false); + assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false, false, 0); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false, false); + assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false, false, 0); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false); + assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false, 0); } @Test @@ -166,7 +170,8 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception ImmutableSet.of(), 0, false, - false + false, + 0 ); } @@ -266,7 +271,7 @@ public void testHandleMissingPercentOfSegmentsToConsiderPerMove() throws Excepti ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false); + assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false, 0); } @Test @@ -296,7 +301,7 @@ public void testSerdeWithKillAllDataSources() throws Exception CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false, false, 0); //ensure whitelist is empty when killAllDataSources is true try { @@ -343,7 +348,7 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false, false, 0); } @Test @@ -368,7 +373,8 @@ public void testBuilderDefaults() emptyList, 70, false, - false + false, + Integer.MAX_VALUE ); } @@ -383,7 +389,7 @@ public void testUpdate() Assert.assertEquals( current, new CoordinatorDynamicConfig - .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) + .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) .build(current) ); } @@ -414,7 +420,8 @@ private void assertConfig( Set decommissioningNodes, int decommissioningMaxPercentOfMaxSegmentsToMove, boolean pauseCoordination, - boolean replicateAfterLoadTimeout + boolean replicateAfterLoadTimeout, + int maxNonPrimaryReplicantsToLoad ) { Assert.assertEquals( @@ -442,5 +449,6 @@ private void assertConfig( ); Assert.assertEquals(pauseCoordination, config.getPauseCoordination()); Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout()); + Assert.assertEquals(maxNonPrimaryReplicantsToLoad, config.getMaxNonPrimaryReplicantsToLoad()); } } diff --git a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap index 9bc6e3fb045c..b435fcad8732 100644 --- a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap @@ -191,6 +191,14 @@ exports[`coordinator dynamic config matches snapshot 1`] = ` "name": "replicateAfterLoadTimeout", "type": "boolean", }, + Object { + "defaultValue": 2147483647, + "info": + The maximum number of non-primary replicants to load in a single Coordinator cycle. Once this limit is hit, only primary replicants will be loaded for the remainder of the cycle. Tuning this value lower can help reduce the delay in loading primary segments when the cluster has a very large number of non-primary replicants to load (such as when a single historical drops out of the cluster leaving many under-replicated segments). + , + "name": "maxNonPrimaryReplicantsToLoad", + "type": "number", + }, ] } model={Object {}} diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index e23211d575db..da6c013d7e7b 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -37,6 +37,7 @@ export interface CoordinatorDynamicConfig { decommissioningNodes?: string[]; decommissioningMaxPercentOfMaxSegmentsToMove?: number; pauseCoordination?: boolean; + maxNonPrimaryReplicantsToLoad?: number; } export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[] = [ @@ -234,4 +235,18 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ ), }, + { + name: 'maxNonPrimaryReplicantsToLoad', + type: 'number', + defaultValue: 2147483647, + info: ( + <> + The maximum number of non-primary replicants to load in a single Coordinator cycle. Once + this limit is hit, only primary replicants will be loaded for the remainder of the cycle. + Tuning this value lower can help reduce the delay in loading primary segments when the + cluster has a very large number of non-primary replicants to load (such as when a single + historical drops out of the cluster leaving many under-replicated segments). + + ), + }, ]; From 012ad7aa7406a35c126242c92cca4466c8dfc478 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Sun, 18 Apr 2021 16:30:30 -0500 Subject: [PATCH 3/8] remove redundant line --- .../apache/druid/server/coordinator/ReplicationThrottler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java index a6aaee0bb5ec..4e25086fd745 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java @@ -45,7 +45,6 @@ public class ReplicationThrottler public ReplicationThrottler(int maxReplicants, int maxLifetime, boolean loadPrimaryReplicantsOnly) { - this.loadPrimaryReplicantsOnly = false; updateParams(maxReplicants, maxLifetime, loadPrimaryReplicantsOnly); } From c6a0608c73b6f472cc31193c767abead9f8e14e5 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Mon, 19 Apr 2021 17:52:59 -0500 Subject: [PATCH 4/8] add some unit tests --- .../server/coordinator/RunRulesTest.java | 128 ++++++++++++++++++ .../http/CoordinatorDynamicConfigTest.java | 25 ++++ 2 files changed, 153 insertions(+) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index 1751bc9b79ed..5af5c82afb59 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -110,6 +110,134 @@ public void tearDown() EasyMock.verify(databaseRuleManager); } + /** + * Nodes: + * normal - 2 replicants + * maxNonPrimaryReplicantsToLoad - 10 + * Expect only 34 segments to be loaded despite there being 48 primary + non-primary replicants to load! + */ + @Test + public void testOneTierTwoReplicantsWithStrictReplicantLimit() + { + mockCoordinator(); + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + mockEmptyPeon(); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new IntervalLoadRule( + Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), + ImmutableMap.of("normal", 2) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + "normal", + new ServerHolder( + new DruidServer("serverNorm", "hostNorm", null, 1000, ServerType.HISTORICAL, "normal", 0) + .toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + new DruidServer("serverNorm2", "hostNorm2", null, 1000, ServerType.HISTORICAL, "normal", 0) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).withMaxNonPrimaryReplicantsToLoad(10).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + + Assert.assertEquals(34L, stats.getTieredStat("assignedCount", "normal")); + Assert.assertEquals(10L, stats.getGlobalStat("totalNonPrimaryReplicantsLoaded")); + + exec.shutdown(); + EasyMock.verify(mockPeon); + } + + /** + * Nodes: + * normal - 2 replicants + * hot - 2 replicants + * maxNonPrimaryReplicantsToLoad - 48 + * Expect only 72 segments to be loaded despite there being 96 primary + non-primary replicants to load! + */ + @Test + public void testTwoTiersTwoReplicantsWithStrictReplicantLimit() + { + mockCoordinator(); + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + mockEmptyPeon(); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new IntervalLoadRule( + Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), + ImmutableMap.of("hot", 2, "normal", 2) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + "hot", + new ServerHolder( + new DruidServer("serverHot", "hostHot", null, 1000, ServerType.HISTORICAL, "hot", 0) + .toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + new DruidServer("serverHot2", "hostHot2", null, 1000, ServerType.HISTORICAL, "hot", 0) + .toImmutableDruidServer(), + mockPeon + ) + ) + .addTier( + "normal", + new ServerHolder( + new DruidServer("serverNorm", "hostNorm", null, 1000, ServerType.HISTORICAL, "normal", 0) + .toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + new DruidServer("serverNorm2", "hostNorm2", null, 1000, ServerType.HISTORICAL, "normal", 0) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).withMaxNonPrimaryReplicantsToLoad(48).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + + Assert.assertEquals(72L,stats.getTieredStat("assignedCount", "hot") + stats.getTieredStat("assignedCount", "normal")); + Assert.assertEquals(48L, stats.getGlobalStat("totalNonPrimaryReplicantsLoaded")); + + exec.shutdown(); + EasyMock.verify(mockPeon); + } + /** * Nodes: * hot - 1 replicant diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 4f0c06013c5c..9f2f568fa393 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -394,6 +394,31 @@ public void testUpdate() ); } + @Test + public void testSerdeHandleInvalidMaxNonPrimaryReplicantsToLoad() throws Exception + { + try { + String jsonStr = "{\n" + + " \"maxNonPrimaryReplicantsToLoad\": -1\n" + + "}\n"; + + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + @Test public void testEqualsAndHashCodeSanity() { From 13a7c77dfba6cfe166734ee373a9feb3712a3aaa Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Mon, 19 Apr 2021 20:55:58 -0500 Subject: [PATCH 5/8] fix checkstyle error --- .../java/org/apache/druid/server/coordinator/RunRulesTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index 5af5c82afb59..d43b603e9982 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -231,7 +231,7 @@ public void testTwoTiersTwoReplicantsWithStrictReplicantLimit() DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertEquals(72L,stats.getTieredStat("assignedCount", "hot") + stats.getTieredStat("assignedCount", "normal")); + Assert.assertEquals(72L, stats.getTieredStat("assignedCount", "hot") + stats.getTieredStat("assignedCount", "normal")); Assert.assertEquals(48L, stats.getGlobalStat("totalNonPrimaryReplicantsLoaded")); exec.shutdown(); From e14a07ebebb09e8c3d2839ca8c583d676a7c2410 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Tue, 20 Apr 2021 14:42:59 -0500 Subject: [PATCH 6/8] add documentation for new dynamic config --- docs/configuration/index.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index cd26a9e7afef..f062cdab6f6e 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -797,7 +797,8 @@ A sample Coordinator dynamic config JSON object is shown below: "decommissioningNodes": ["localhost:8182", "localhost:8282"], "decommissioningMaxPercentOfMaxSegmentsToMove": 70, "pauseCoordination": false, - "replicateAfterLoadTimeout": false + "replicateAfterLoadTimeout": false, + "maxNonPrimaryReplicantsToLoad": 2147483647 } ``` @@ -822,6 +823,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70| |`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false| |`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false| +|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle.|`Integer.MAX_VALUE`| To view the audit history of Coordinator dynamic config issue a GET request to the URL - From 48e45246044432ee2fd99f8c836135f29e4473c4 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Wed, 21 Apr 2021 16:41:02 -0500 Subject: [PATCH 7/8] improve docs and logs --- docs/configuration/index.md | 2 +- .../java/org/apache/druid/server/coordinator/duty/RunRules.java | 2 +- website/.spelling | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index f062cdab6f6e..b120d513e09a 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -823,7 +823,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70| |`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false| |`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false| -|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle.|`Integer.MAX_VALUE`| +|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time.|`Integer.MAX_VALUE`| To view the audit history of Coordinator dynamic config issue a GET request to the URL - diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java index 1425e2ce4941..cf285891f615 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java @@ -137,7 +137,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) && !paramsWithReplicationManager.getReplicationManager().isLoadPrimaryReplicantsOnly() ) { log.info( - "Maximum number of non-primary replicants [%d] have been loaded for the current RunRules execution. Only loading primary replicants from here on.", + "Maximum number of non-primary replicants [%d] have been loaded for the current RunRules execution. Only loading primary replicants from here on for this coordinator run cycle.", paramsWithReplicationManager.getCoordinatorDynamicConfig().getMaxNonPrimaryReplicantsToLoad() ); paramsWithReplicationManager.getReplicationManager().setLoadPrimaryReplicantsOnly(true); diff --git a/website/.spelling b/website/.spelling index bd2ecfa167c0..240ba7a219c2 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1783,6 +1783,7 @@ preloaded queryType remoteTaskRunnerConfig rendezvousHash +replicants resultsets roundRobin runtime.properties From 2ccd9ffea9dce28cafc5d4ac61719645f2ca6d7b Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Mon, 26 Apr 2021 11:06:50 -0500 Subject: [PATCH 8/8] Alter how null is handled for new config. If null, manually set as default --- .../coordinator/CoordinatorDynamicConfig.java | 12 +++++++++++- .../server/http/CoordinatorDynamicConfigTest.java | 14 +++++++------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 24b53a354a2e..d125aa4edab5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -137,7 +137,7 @@ public CoordinatorDynamicConfig( @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove, @JsonProperty("pauseCoordination") boolean pauseCoordination, @JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout, - @JsonProperty("maxNonPrimaryReplicantsToLoad") int maxNonPrimaryReplicantsToLoad + @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -185,6 +185,16 @@ public CoordinatorDynamicConfig( this.pauseCoordination = pauseCoordination; this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; + if (maxNonPrimaryReplicantsToLoad == null) { + log.debug( + "maxNonPrimaryReplicantsToLoad was null! This is likely because your metastore does not " + + "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value " + + "to the Druid default of %d. It is recommended that you re-submit your dynamic config with your " + + "desired value for maxNonPrimaryReplicantsToLoad", + Builder.DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD + ); + maxNonPrimaryReplicantsToLoad = Builder.DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD; + } Preconditions.checkArgument( maxNonPrimaryReplicantsToLoad >= 0, "maxNonPrimaryReplicantsToLoad must be greater than or equal to 0." diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 9f2f568fa393..e02208274d83 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -118,13 +118,13 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio ); ImmutableSet decommissioning = ImmutableSet.of(); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false, false, 0); + assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false, false, Integer.MAX_VALUE); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false, false, 0); + assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false, false, Integer.MAX_VALUE); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false, 0); + assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false, Integer.MAX_VALUE); } @Test @@ -171,7 +171,7 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception 0, false, false, - 0 + Integer.MAX_VALUE ); } @@ -271,7 +271,7 @@ public void testHandleMissingPercentOfSegmentsToConsiderPerMove() throws Excepti ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false, 0); + assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false, Integer.MAX_VALUE); } @Test @@ -301,7 +301,7 @@ public void testSerdeWithKillAllDataSources() throws Exception CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false, false, 0); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false, false, Integer.MAX_VALUE); //ensure whitelist is empty when killAllDataSources is true try { @@ -348,7 +348,7 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false, false, 0); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false, false, Integer.MAX_VALUE); } @Test