From cd4b629aeefdfba201b599aaf5a7f52afecad78e Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Thu, 5 May 2022 11:55:38 -0500 Subject: [PATCH 01/10] Add coordinator dynamic config to limit the number of segments loaded per RunRules execution --- docs/configuration/index.md | 6 +- .../coordinator/CoordinatorDynamicConfig.java | 60 ++++++++++++-- .../server/coordinator/duty/RunRules.java | 7 ++ .../server/coordinator/rules/LoadRule.java | 5 +- .../server/coordinator/RunRulesTest.java | 49 ++++++++++++ .../http/CoordinatorDynamicConfigTest.java | 79 ++++++++++++++++++- ...inator-dynamic-config-dialog.spec.tsx.snap | 8 ++ .../coordinator-dynamic-config.tsx | 17 ++++ 8 files changed, 218 insertions(+), 13 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 1e829bce74b7..c8fa20332eb2 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -897,7 +897,8 @@ A sample Coordinator dynamic config JSON object is shown below: "decommissioningMaxPercentOfMaxSegmentsToMove": 70, "pauseCoordination": false, "replicateAfterLoadTimeout": false, - "maxNonPrimaryReplicantsToLoad": 2147483647 + "maxNonPrimaryReplicantsToLoad": 2147483647, + "maxSegmentsToLoad": 2147483647 } ``` @@ -922,7 +923,8 @@ Issuing a GET request at the same URL will return the spec that is currently in |`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70| |`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false| |`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false| -|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time.|`Integer.MAX_VALUE`| +|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of non-primary replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time.|`Integer.MAX_VALUE`| +|`maxSegmentsToLoad`|This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from `maxNonPrimaryReplicantsToLoad` because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spining and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing.|`Integer.MAX_VALUE`| To view the audit history of Coordinator dynamic config issue a GET request to the URL - diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 90574780eabe..b17db904e282 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -102,6 +102,12 @@ public class CoordinatorDynamicConfig */ private final int maxNonPrimaryReplicantsToLoad; + /** + * This is the upper limit on segments to load per coordination cycle. Once this limit is hit, the coordinator will + * move on to it's next task. + */ + private final int maxSegmentsToLoad; + private static final Logger log = new Logger(CoordinatorDynamicConfig.class); @JsonCreator @@ -113,7 +119,8 @@ public CoordinatorDynamicConfig( @JsonProperty("mergeBytesLimit") long mergeBytesLimit, @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove, - @Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, + @Deprecated @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable + Double percentOfSegmentsToConsiderPerMove, @JsonProperty("useBatchedSegmentSampler") boolean useBatchedSegmentSampler, @JsonProperty("replicantLifetime") int replicantLifetime, @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit, @@ -134,7 +141,8 @@ public CoordinatorDynamicConfig( @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove, @JsonProperty("pauseCoordination") boolean pauseCoordination, @JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout, - @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad + @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad, + @JsonProperty("maxSegmentsToLoad") @Nullable Integer maxSegmentsToLoad ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -195,6 +203,22 @@ public CoordinatorDynamicConfig( "maxNonPrimaryReplicantsToLoad must be greater than or equal to 0." ); this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad; + + if (maxSegmentsToLoad == null) { + log.debug( + "maxSegmentsToLoad was null! This is likely because your metastore does not " + + "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value " + + "to the Druid default of %d. It is recommended that you re-submit your dynamic config with your " + + "desired value for maxNonPrimaryReplicantsToLoad", + Builder.DEFAULT_MAX_SEGMENTS_TO_LOAD + ); + maxSegmentsToLoad = Builder.DEFAULT_MAX_SEGMENTS_TO_LOAD; + } + Preconditions.checkArgument( + maxSegmentsToLoad >= 0, + "maxPrimaryReplicantsToLoad must be greater than or equal to 0." + ); + this.maxSegmentsToLoad = maxSegmentsToLoad; } private static Set parseJsonStringOrArray(Object jsonStringOrArray) @@ -369,6 +393,13 @@ public int getMaxNonPrimaryReplicantsToLoad() return maxNonPrimaryReplicantsToLoad; } + @Min(0) + @JsonProperty + public int getMaxSegmentsToLoad() + { + return maxSegmentsToLoad; + } + @Override public String toString() { @@ -392,6 +423,7 @@ public String toString() ", pauseCoordination=" + pauseCoordination + ", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout + ", maxNonPrimaryReplicantsToLoad=" + maxNonPrimaryReplicantsToLoad + + ", maxPrimaryReplicantsToLoad=" + maxSegmentsToLoad + '}'; } @@ -459,6 +491,9 @@ public boolean equals(Object o) if (maxNonPrimaryReplicantsToLoad != that.maxNonPrimaryReplicantsToLoad) { return false; } + if (maxSegmentsToLoad != that.maxSegmentsToLoad) { + return false; + } return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove; } @@ -482,7 +517,8 @@ public int hashCode() decommissioningNodes, decommissioningMaxPercentOfMaxSegmentsToMove, pauseCoordination, - maxNonPrimaryReplicantsToLoad + maxNonPrimaryReplicantsToLoad, + maxSegmentsToLoad ); } @@ -509,6 +545,7 @@ public static class Builder private static final boolean DEFAULT_PAUSE_COORDINATION = false; private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false; private static final int DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE; + private static final int DEFAULT_MAX_SEGMENTS_TO_LOAD = Integer.MAX_VALUE; private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments; private Long mergeBytesLimit; @@ -528,6 +565,7 @@ public static class Builder private Boolean pauseCoordination; private Boolean replicateAfterLoadTimeout; private Integer maxNonPrimaryReplicantsToLoad; + private Integer maxSegmentsToLoad; public Builder() { @@ -554,7 +592,8 @@ public Builder( @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove, @JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination, @JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout, - @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad + @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad, + @JsonProperty("maxSegmentsToLoad") @Nullable Integer maxSegmentsToLoad ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -576,6 +615,7 @@ public Builder( this.pauseCoordination = pauseCoordination; this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad; + this.maxSegmentsToLoad = maxSegmentsToLoad; } public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis) @@ -681,6 +721,12 @@ public Builder withMaxNonPrimaryReplicantsToLoad(int maxNonPrimaryReplicantsToLo return this; } + public Builder withMaxSegmentsToLoad(int maxSegmentsToLoad) + { + this.maxSegmentsToLoad = maxSegmentsToLoad; + return this; + } + public CoordinatorDynamicConfig build() { return new CoordinatorDynamicConfig( @@ -709,7 +755,8 @@ public CoordinatorDynamicConfig build() pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination, replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout, maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD - : maxNonPrimaryReplicantsToLoad + : maxNonPrimaryReplicantsToLoad, + maxSegmentsToLoad == null ? DEFAULT_MAX_SEGMENTS_TO_LOAD : maxSegmentsToLoad ); } @@ -747,7 +794,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout, maxNonPrimaryReplicantsToLoad == null ? defaults.getMaxNonPrimaryReplicantsToLoad() - : maxNonPrimaryReplicantsToLoad + : maxNonPrimaryReplicantsToLoad, + maxSegmentsToLoad == null ? defaults.getMaxSegmentsToLoad() : maxSegmentsToLoad ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java index cf285891f615..f04d4559b4a5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java @@ -30,6 +30,7 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ReplicationThrottler; import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule; +import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -122,6 +123,12 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) } for (DataSegment segment : params.getUsedSegments()) { + if (stats.getGlobalStat(LoadRule.AGGREGATE_ASSIGNED_COUNT) >= params.getCoordinatorDynamicConfig().getMaxSegmentsToLoad()) { + log.info("Maximum number of segments [%d] have been loaded for the current RunRules execution.", + params.getCoordinatorDynamicConfig().getMaxSegmentsToLoad()); + break; + } + if (overshadowed.contains(segment.getId())) { // Skipping overshadowed segments continue; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java index e34cd98772a2..9726e53fe41b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java @@ -55,7 +55,8 @@ public abstract class LoadRule implements Rule { private static final EmittingLogger log = new EmittingLogger(LoadRule.class); - static final String ASSIGNED_COUNT = "assignedCount"; + public static final String AGGREGATE_ASSIGNED_COUNT = "aggregateAssignedCount"; + public static final String ASSIGNED_COUNT = "assignedCount"; static final String DROPPED_COUNT = "droppedCount"; public final String NON_PRIMARY_ASSIGNED_COUNT = "totalNonPrimaryReplicantsLoaded"; public static final String REQUIRED_CAPACITY = "requiredCapacity"; @@ -184,6 +185,7 @@ private void assign( // numAssigned - 1 because we don't want to count the primary assignment stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned - 1); + stats.addToGlobalStat(AGGREGATE_ASSIGNED_COUNT, numAssigned); stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned); @@ -310,6 +312,7 @@ private void assignReplicas( createLoadQueueSizeLimitingPredicate(params), segment ); + stats.addToGlobalStat(AGGREGATE_ASSIGNED_COUNT, numAssigned); stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned); stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index dc4a6f0abe43..17842025c59a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -1458,6 +1458,55 @@ public void testOneNodesOneTierOneReplicantCostBalancerStrategyNotEnoughSpace() EasyMock.verify(mockPeon); } + @Test + public void testRunRulesMaxSegmentsToLoadLimit() + { + mockCoordinator(); + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + mockEmptyPeon(); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new IntervalLoadRule( + Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), + ImmutableMap.of("normal", 2) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + "normal", + new ServerHolder( + new DruidServer("serverNorm", "hostNorm", null, 1000, ServerType.HISTORICAL, "normal", 0) + .toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + new DruidServer("serverNorm2", "hostNorm2", null, 1000, ServerType.HISTORICAL, "normal", 0) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToLoad(20).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + Assert.assertEquals(20L, stats.getGlobalStat(LoadRule.AGGREGATE_ASSIGNED_COUNT)); + + exec.shutdown(); + EasyMock.verify(mockPeon); + } + private void mockCoordinator() { EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 73f7cd29e4fd..dac5f2051fdb 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -89,6 +89,7 @@ public void testSerde() throws Exception 9, false, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); @@ -111,6 +112,7 @@ public void testSerde() throws Exception 9, false, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); @@ -133,6 +135,7 @@ public void testSerde() throws Exception 5, false, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); @@ -155,6 +158,7 @@ public void testSerde() throws Exception 5, true, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); @@ -177,6 +181,7 @@ public void testSerde() throws Exception 5, true, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); @@ -199,6 +204,7 @@ public void testSerde() throws Exception 5, true, true, + Integer.MAX_VALUE, Integer.MAX_VALUE ); @@ -221,7 +227,31 @@ public void testSerde() throws Exception 5, true, true, - 10 + 10, + Integer.MAX_VALUE + ); + + actual = CoordinatorDynamicConfig.builder().withMaxSegmentsToLoad(20).build(actual); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 10, + 1, + 1, + 2, + true, + whitelist, + false, + 1, + ImmutableSet.of("host1"), + 5, + true, + true, + 10, + 20 ); } @@ -245,7 +275,9 @@ public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources() 5, true, true, - 10); + 10, + 20 + ); Assert.assertTrue(config.isKillUnusedSegmentsInAllDataSources()); Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty()); } @@ -270,7 +302,9 @@ public void testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegme 5, true, true, - 10); + 10, + 20 + ); Assert.assertFalse(config.isKillUnusedSegmentsInAllDataSources()); Assert.assertEquals(ImmutableSet.of("test1"), config.getSpecificDataSourcesToKillUnusedSegmentsIn()); } @@ -320,6 +354,7 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio 0, false, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); @@ -342,6 +377,7 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio 0, false, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); @@ -364,6 +400,7 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio 5, false, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); } @@ -412,10 +449,36 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception 0, false, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); } + @Test + public void testInvalidMaxPrimaryReplicantsToLoad() throws Exception + { + try { + String jsonStr = "{\n" + + " \"maxSegmentsToLoad\": -1\n" + + "}\n"; + + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + @Test public void testSerdeHandleInvalidPercentOfSegmentsToConsiderPerMove() throws Exception { @@ -530,6 +593,7 @@ public void testHandleMissingPercentOfSegmentsToConsiderPerMove() throws Excepti 9, false, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); } @@ -579,6 +643,7 @@ public void testSerdeWithKillAllDataSources() throws Exception 0, false, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); @@ -643,6 +708,7 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti 0, false, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); } @@ -670,6 +736,7 @@ public void testBuilderDefaults() 70, false, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); } @@ -700,6 +767,7 @@ public void testBuilderWithDefaultSpecificDataSourcesToKillUnusedSegmentsInSpeci 70, false, false, + Integer.MAX_VALUE, Integer.MAX_VALUE ); } @@ -732,6 +800,7 @@ public void testUpdate() null, null, null, + null, null ).build(current) ); @@ -789,7 +858,8 @@ private void assertConfig( int decommissioningMaxPercentOfMaxSegmentsToMove, boolean pauseCoordination, boolean replicateAfterLoadTimeout, - int maxNonPrimaryReplicantsToLoad + int maxNonPrimaryReplicantsToLoad, + int maxSegmentsToLoad ) { Assert.assertEquals( @@ -818,5 +888,6 @@ private void assertConfig( Assert.assertEquals(pauseCoordination, config.getPauseCoordination()); Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout()); Assert.assertEquals(maxNonPrimaryReplicantsToLoad, config.getMaxNonPrimaryReplicantsToLoad()); + Assert.assertEquals(maxSegmentsToLoad, config.getMaxSegmentsToLoad()); } } diff --git a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap index bc7684f8fe16..85e38a4ea757 100644 --- a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap @@ -200,6 +200,14 @@ exports[`CoordinatorDynamicConfigDialog matches snapshot 1`] = ` "name": "replicateAfterLoadTimeout", "type": "boolean", }, + Object { + "defaultValue": 2147483647, + "info": + This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from maxNonPrimaryReplicantsToLoad because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spining and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing (or some other event that caused a similar event). + , + "name": "maxNonSegmentsToLoad", + "type": "number", + }, Object { "defaultValue": 2147483647, "info": diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index f646315a4323..0fc3c4c71e3f 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -37,6 +37,7 @@ export interface CoordinatorDynamicConfig { decommissioningNodes?: string[]; decommissioningMaxPercentOfMaxSegmentsToMove?: number; pauseCoordination?: boolean; + maxSegmentsToLoad?: number; maxNonPrimaryReplicantsToLoad?: number; } @@ -253,6 +254,22 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ ), }, + { + name: 'maxNonSegmentsToLoad', + type: 'number', + defaultValue: 2147483647, + info: ( + <> + This it the maximum number of segments - both primary and non-primary replicants - that can + be loaded per Coordination run. The default is equivalent to there being no limit. This + differs from maxNonPrimaryReplicantsToLoad because it includes the count of primary + replicants that are loaded in the limit. An operator may want to use this configuration to + prevent the coordinator from spining and loading many segments that are already loaded, but + appeared to be unavailable due to a temporary network issue causing some number of Historical + servers to have their segments go missing (or some other event that caused a similar event). + + ), + }, { name: 'maxNonPrimaryReplicantsToLoad', type: 'number', From fee9e6658837ad831871c763f04673564f598011 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Thu, 5 May 2022 11:59:21 -0500 Subject: [PATCH 02/10] fix spelling mistakes --- docs/configuration/index.md | 2 +- web-console/src/druid-models/coordinator-dynamic-config.tsx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index c8fa20332eb2..cfeb78d36ab0 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -924,7 +924,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false| |`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false| |`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of non-primary replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time.|`Integer.MAX_VALUE`| -|`maxSegmentsToLoad`|This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from `maxNonPrimaryReplicantsToLoad` because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spining and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing.|`Integer.MAX_VALUE`| +|`maxSegmentsToLoad`|This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from `maxNonPrimaryReplicantsToLoad` because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spinning and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing.|`Integer.MAX_VALUE`| To view the audit history of Coordinator dynamic config issue a GET request to the URL - diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index 0fc3c4c71e3f..43fa3f6cd716 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -264,7 +264,7 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ be loaded per Coordination run. The default is equivalent to there being no limit. This differs from maxNonPrimaryReplicantsToLoad because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to - prevent the coordinator from spining and loading many segments that are already loaded, but + prevent the coordinator from spinning and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing (or some other event that caused a similar event). From a3d8c26f3eb3e1e71e50684e1b42d0c991be8262 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Thu, 5 May 2022 12:21:23 -0500 Subject: [PATCH 03/10] Update docs --- docs/configuration/index.md | 2 +- .../coordinator-dynamic-config-dialog.spec.tsx.snap | 4 ++-- web-console/src/druid-models/coordinator-dynamic-config.tsx | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index cfeb78d36ab0..442eb73ab975 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -923,7 +923,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70| |`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false| |`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false| -|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of non-primary replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time.|`Integer.MAX_VALUE`| +|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of non-primary replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time. This configuration has no effect if the value is greater than the value of `maxSegmentsToLoad`|`Integer.MAX_VALUE`| |`maxSegmentsToLoad`|This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from `maxNonPrimaryReplicantsToLoad` because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spinning and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing.|`Integer.MAX_VALUE`| diff --git a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap index 85e38a4ea757..19f96357eb81 100644 --- a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap @@ -203,7 +203,7 @@ exports[`CoordinatorDynamicConfigDialog matches snapshot 1`] = ` Object { "defaultValue": 2147483647, "info": - This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from maxNonPrimaryReplicantsToLoad because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spining and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing (or some other event that caused a similar event). + This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from maxNonPrimaryReplicantsToLoad because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spinning and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing (or some other event that caused a similar event). , "name": "maxNonSegmentsToLoad", "type": "number", @@ -211,7 +211,7 @@ exports[`CoordinatorDynamicConfigDialog matches snapshot 1`] = ` Object { "defaultValue": 2147483647, "info": - The maximum number of non-primary replicants to load in a single Coordinator cycle. Once this limit is hit, only primary replicants will be loaded for the remainder of the cycle. Tuning this value lower can help reduce the delay in loading primary segments when the cluster has a very large number of non-primary replicants to load (such as when a single historical drops out of the cluster leaving many under-replicated segments). + The maximum number of non-primary replicants to load in a single Coordinator cycle. Once this limit is hit, only primary replicants will be loaded for the remainder of the cycle. Tuning this value lower can help reduce the delay in loading primary segments when the cluster has a very large number of non-primary replicants to load (such as when a single historical drops out of the cluster leaving many under-replicated segments). This configuration has no effect if the value is greater than the value of maxSegmentsToLoad , "name": "maxNonPrimaryReplicantsToLoad", "type": "number", diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index 43fa3f6cd716..6101d6149ab9 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -280,7 +280,8 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ this limit is hit, only primary replicants will be loaded for the remainder of the cycle. Tuning this value lower can help reduce the delay in loading primary segments when the cluster has a very large number of non-primary replicants to load (such as when a single - historical drops out of the cluster leaving many under-replicated segments). + historical drops out of the cluster leaving many under-replicated segments). This + configuration has no effect if the value is greater than the value of maxSegmentsToLoad ), }, From f90f231f143d9f64ec9e3c05ecb15aad2de305c0 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Fri, 6 May 2022 09:56:32 -0500 Subject: [PATCH 04/10] Run prettier --write on web console files --- web-console/src/druid-models/coordinator-dynamic-config.tsx | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index 6101d6149ab9..db2620bdef94 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -265,8 +265,9 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ differs from maxNonPrimaryReplicantsToLoad because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spinning and loading many segments that are already loaded, but - appeared to be unavailable due to a temporary network issue causing some number of Historical - servers to have their segments go missing (or some other event that caused a similar event). + appeared to be unavailable due to a temporary network issue causing some number of + Historical servers to have their segments go missing (or some other event that caused a + similar event). ), }, From 8c35880b8a85804193c7be123be77530aead6e4a Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Mon, 9 May 2022 10:44:07 -0500 Subject: [PATCH 05/10] Fix naming error in console for new config --- web-console/src/druid-models/coordinator-dynamic-config.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index db2620bdef94..13b340d4277e 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -255,7 +255,7 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ ), }, { - name: 'maxNonSegmentsToLoad', + name: 'maxSegmentsToLoad', type: 'number', defaultValue: 2147483647, info: ( From df08bbfb91b138d2bdc35659387e61085e1d132b Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Tue, 31 May 2022 10:53:35 -0500 Subject: [PATCH 06/10] update web-console snapshot --- .../coordinator-dynamic-config-dialog.spec.tsx.snap | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap index 19f96357eb81..913f31072c9a 100644 --- a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap @@ -205,7 +205,7 @@ exports[`CoordinatorDynamicConfigDialog matches snapshot 1`] = ` "info": This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from maxNonPrimaryReplicantsToLoad because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spinning and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing (or some other event that caused a similar event). , - "name": "maxNonSegmentsToLoad", + "name": "maxSegmentsToLoad", "type": "number", }, Object { From ee62efcf02f1fb63ceb737036ffff7265a31d06e Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Tue, 7 Jun 2022 10:22:04 -0500 Subject: [PATCH 07/10] update naming of new dynamic config for clarity --- docs/configuration/index.md | 6 +-- .../coordinator/CoordinatorDynamicConfig.java | 43 ++++++++++--------- .../server/coordinator/duty/RunRules.java | 4 +- .../server/coordinator/RunRulesTest.java | 2 +- .../http/CoordinatorDynamicConfigTest.java | 4 +- ...inator-dynamic-config-dialog.spec.tsx.snap | 4 +- .../coordinator-dynamic-config.tsx | 7 +-- 7 files changed, 36 insertions(+), 34 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 1ea86ed381df..079224207f2c 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -899,7 +899,7 @@ A sample Coordinator dynamic config JSON object is shown below: "pauseCoordination": false, "replicateAfterLoadTimeout": false, "maxNonPrimaryReplicantsToLoad": 2147483647, - "maxSegmentsToLoad": 2147483647 + "maxSegmentsToLoadPerCoordinationCycle": 2147483647 } ``` @@ -924,8 +924,8 @@ Issuing a GET request at the same URL will return the spec that is currently in |`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70| |`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false| |`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false| -|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of non-primary replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time. This configuration has no effect if the value is greater than the value of `maxSegmentsToLoad`|`Integer.MAX_VALUE`| -|`maxSegmentsToLoad`|This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from `maxNonPrimaryReplicantsToLoad` because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spinning and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing.|`Integer.MAX_VALUE`| +|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of non-primary replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time. This configuration has no effect if the value is greater than the value of `maxSegmentsToLoadPerCoordinationCycle`|`Integer.MAX_VALUE`| +|`maxSegmentsToLoadPerCoordinationCycle`|This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from `maxNonPrimaryReplicantsToLoad` because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spinning and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing.|`Integer.MAX_VALUE`| To view the audit history of Coordinator dynamic config issue a GET request to the URL - diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index b17db904e282..92d0dd7797dc 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -106,7 +106,7 @@ public class CoordinatorDynamicConfig * This is the upper limit on segments to load per coordination cycle. Once this limit is hit, the coordinator will * move on to it's next task. */ - private final int maxSegmentsToLoad; + private final int maxSegmentsToLoadPerCoordinationCycle; private static final Logger log = new Logger(CoordinatorDynamicConfig.class); @@ -142,7 +142,7 @@ public CoordinatorDynamicConfig( @JsonProperty("pauseCoordination") boolean pauseCoordination, @JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout, @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad, - @JsonProperty("maxSegmentsToLoad") @Nullable Integer maxSegmentsToLoad + @JsonProperty("maxSegmentsToLoadPerCoordinationCycle") @Nullable Integer maxSegmentsToLoadPerCoordinationCycle ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -204,21 +204,21 @@ public CoordinatorDynamicConfig( ); this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad; - if (maxSegmentsToLoad == null) { + if (maxSegmentsToLoadPerCoordinationCycle == null) { log.debug( - "maxSegmentsToLoad was null! This is likely because your metastore does not " + "maxSegmentsToLoadPerCoordinationCycle was null! This is likely because your metastore does not " + "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value " + "to the Druid default of %d. It is recommended that you re-submit your dynamic config with your " + "desired value for maxNonPrimaryReplicantsToLoad", - Builder.DEFAULT_MAX_SEGMENTS_TO_LOAD + Builder.DEFAULT_MAX_SEGMENTS_TO_LOAD_PER_COORDINATION_CYCLE ); - maxSegmentsToLoad = Builder.DEFAULT_MAX_SEGMENTS_TO_LOAD; + maxSegmentsToLoadPerCoordinationCycle = Builder.DEFAULT_MAX_SEGMENTS_TO_LOAD_PER_COORDINATION_CYCLE; } Preconditions.checkArgument( - maxSegmentsToLoad >= 0, + maxSegmentsToLoadPerCoordinationCycle >= 0, "maxPrimaryReplicantsToLoad must be greater than or equal to 0." ); - this.maxSegmentsToLoad = maxSegmentsToLoad; + this.maxSegmentsToLoadPerCoordinationCycle = maxSegmentsToLoadPerCoordinationCycle; } private static Set parseJsonStringOrArray(Object jsonStringOrArray) @@ -395,9 +395,9 @@ public int getMaxNonPrimaryReplicantsToLoad() @Min(0) @JsonProperty - public int getMaxSegmentsToLoad() + public int getMaxSegmentsToLoadPerCoordinationCycle() { - return maxSegmentsToLoad; + return maxSegmentsToLoadPerCoordinationCycle; } @Override @@ -423,7 +423,7 @@ public String toString() ", pauseCoordination=" + pauseCoordination + ", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout + ", maxNonPrimaryReplicantsToLoad=" + maxNonPrimaryReplicantsToLoad + - ", maxPrimaryReplicantsToLoad=" + maxSegmentsToLoad + + ", maxPrimaryReplicantsToLoad=" + maxSegmentsToLoadPerCoordinationCycle + '}'; } @@ -491,7 +491,7 @@ public boolean equals(Object o) if (maxNonPrimaryReplicantsToLoad != that.maxNonPrimaryReplicantsToLoad) { return false; } - if (maxSegmentsToLoad != that.maxSegmentsToLoad) { + if (maxSegmentsToLoadPerCoordinationCycle != that.maxSegmentsToLoadPerCoordinationCycle) { return false; } return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove; @@ -518,7 +518,7 @@ public int hashCode() decommissioningMaxPercentOfMaxSegmentsToMove, pauseCoordination, maxNonPrimaryReplicantsToLoad, - maxSegmentsToLoad + maxSegmentsToLoadPerCoordinationCycle ); } @@ -545,7 +545,7 @@ public static class Builder private static final boolean DEFAULT_PAUSE_COORDINATION = false; private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false; private static final int DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE; - private static final int DEFAULT_MAX_SEGMENTS_TO_LOAD = Integer.MAX_VALUE; + private static final int DEFAULT_MAX_SEGMENTS_TO_LOAD_PER_COORDINATION_CYCLE = Integer.MAX_VALUE; private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments; private Long mergeBytesLimit; @@ -565,7 +565,7 @@ public static class Builder private Boolean pauseCoordination; private Boolean replicateAfterLoadTimeout; private Integer maxNonPrimaryReplicantsToLoad; - private Integer maxSegmentsToLoad; + private Integer maxSegmentsToLoadPerCoordinationCycle; public Builder() { @@ -593,7 +593,7 @@ public Builder( @JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination, @JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout, @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad, - @JsonProperty("maxSegmentsToLoad") @Nullable Integer maxSegmentsToLoad + @JsonProperty("maxSegmentsToLoadPerCoordinationCycle") @Nullable Integer maxSegmentsToLoadPerCoordinationCycle ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -615,7 +615,7 @@ public Builder( this.pauseCoordination = pauseCoordination; this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad; - this.maxSegmentsToLoad = maxSegmentsToLoad; + this.maxSegmentsToLoadPerCoordinationCycle = maxSegmentsToLoadPerCoordinationCycle; } public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis) @@ -721,9 +721,9 @@ public Builder withMaxNonPrimaryReplicantsToLoad(int maxNonPrimaryReplicantsToLo return this; } - public Builder withMaxSegmentsToLoad(int maxSegmentsToLoad) + public Builder withMaxSegmentsToLoadPerCoordinationCycle(int maxSegmentsToLoadPerCoordinationCycle) { - this.maxSegmentsToLoad = maxSegmentsToLoad; + this.maxSegmentsToLoadPerCoordinationCycle = maxSegmentsToLoadPerCoordinationCycle; return this; } @@ -756,7 +756,8 @@ public CoordinatorDynamicConfig build() replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout, maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD : maxNonPrimaryReplicantsToLoad, - maxSegmentsToLoad == null ? DEFAULT_MAX_SEGMENTS_TO_LOAD : maxSegmentsToLoad + maxSegmentsToLoadPerCoordinationCycle == null ? DEFAULT_MAX_SEGMENTS_TO_LOAD_PER_COORDINATION_CYCLE + : maxSegmentsToLoadPerCoordinationCycle ); } @@ -795,7 +796,7 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) maxNonPrimaryReplicantsToLoad == null ? defaults.getMaxNonPrimaryReplicantsToLoad() : maxNonPrimaryReplicantsToLoad, - maxSegmentsToLoad == null ? defaults.getMaxSegmentsToLoad() : maxSegmentsToLoad + maxSegmentsToLoadPerCoordinationCycle == null ? defaults.getMaxSegmentsToLoadPerCoordinationCycle() : maxSegmentsToLoadPerCoordinationCycle ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java index f04d4559b4a5..747b719f0b8b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java @@ -123,9 +123,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) } for (DataSegment segment : params.getUsedSegments()) { - if (stats.getGlobalStat(LoadRule.AGGREGATE_ASSIGNED_COUNT) >= params.getCoordinatorDynamicConfig().getMaxSegmentsToLoad()) { + if (stats.getGlobalStat(LoadRule.AGGREGATE_ASSIGNED_COUNT) >= params.getCoordinatorDynamicConfig().getMaxSegmentsToLoadPerCoordinationCycle()) { log.info("Maximum number of segments [%d] have been loaded for the current RunRules execution.", - params.getCoordinatorDynamicConfig().getMaxSegmentsToLoad()); + params.getCoordinatorDynamicConfig().getMaxSegmentsToLoadPerCoordinationCycle()); break; } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index 17842025c59a..bc462590590f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -1496,7 +1496,7 @@ public void testRunRulesMaxSegmentsToLoadLimit() BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy) - .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToLoad(20).build()) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToLoadPerCoordinationCycle(20).build()) .build(); DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index dac5f2051fdb..cdf466a79c36 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -231,7 +231,7 @@ public void testSerde() throws Exception Integer.MAX_VALUE ); - actual = CoordinatorDynamicConfig.builder().withMaxSegmentsToLoad(20).build(actual); + actual = CoordinatorDynamicConfig.builder().withMaxSegmentsToLoadPerCoordinationCycle(20).build(actual); assertConfig( actual, 1, @@ -888,6 +888,6 @@ private void assertConfig( Assert.assertEquals(pauseCoordination, config.getPauseCoordination()); Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout()); Assert.assertEquals(maxNonPrimaryReplicantsToLoad, config.getMaxNonPrimaryReplicantsToLoad()); - Assert.assertEquals(maxSegmentsToLoad, config.getMaxSegmentsToLoad()); + Assert.assertEquals(maxSegmentsToLoad, config.getMaxSegmentsToLoadPerCoordinationCycle()); } } diff --git a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap index 913f31072c9a..b315c263ce1a 100644 --- a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap @@ -205,13 +205,13 @@ exports[`CoordinatorDynamicConfigDialog matches snapshot 1`] = ` "info": This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from maxNonPrimaryReplicantsToLoad because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spinning and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing (or some other event that caused a similar event). , - "name": "maxSegmentsToLoad", + "name": "maxSegmentsToLoadPerCoordinationCycle", "type": "number", }, Object { "defaultValue": 2147483647, "info": - The maximum number of non-primary replicants to load in a single Coordinator cycle. Once this limit is hit, only primary replicants will be loaded for the remainder of the cycle. Tuning this value lower can help reduce the delay in loading primary segments when the cluster has a very large number of non-primary replicants to load (such as when a single historical drops out of the cluster leaving many under-replicated segments). This configuration has no effect if the value is greater than the value of maxSegmentsToLoad + The maximum number of non-primary replicants to load in a single Coordinator cycle. Once this limit is hit, only primary replicants will be loaded for the remainder of the cycle. Tuning this value lower can help reduce the delay in loading primary segments when the cluster has a very large number of non-primary replicants to load (such as when a single historical drops out of the cluster leaving many under-replicated segments). This configuration has no effect if the value is greater than the value of maxSegmentsToLoadPerCoordinationCycle , "name": "maxNonPrimaryReplicantsToLoad", "type": "number", diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index 13b340d4277e..8178f94456a8 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -37,7 +37,7 @@ export interface CoordinatorDynamicConfig { decommissioningNodes?: string[]; decommissioningMaxPercentOfMaxSegmentsToMove?: number; pauseCoordination?: boolean; - maxSegmentsToLoad?: number; + maxSegmentsToLoadPerCoordinationCycle?: number; maxNonPrimaryReplicantsToLoad?: number; } @@ -255,7 +255,7 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ ), }, { - name: 'maxSegmentsToLoad', + name: 'maxSegmentsToLoadPerCoordinationCycle', type: 'number', defaultValue: 2147483647, info: ( @@ -282,7 +282,8 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ Tuning this value lower can help reduce the delay in loading primary segments when the cluster has a very large number of non-primary replicants to load (such as when a single historical drops out of the cluster leaving many under-replicated segments). This - configuration has no effect if the value is greater than the value of maxSegmentsToLoad + configuration has no effect if the value is greater than the value of + maxSegmentsToLoadPerCoordinationCycle ), }, From 5499eff17adcaec2802bb34a0f1e476ad457aa7f Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Tue, 7 Jun 2022 10:28:37 -0500 Subject: [PATCH 08/10] fixup CoordinatorDynamicConfigTest --- .../druid/server/http/CoordinatorDynamicConfigTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index cdf466a79c36..daf02c8d4fbd 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -459,7 +459,7 @@ public void testInvalidMaxPrimaryReplicantsToLoad() throws Exception { try { String jsonStr = "{\n" - + " \"maxSegmentsToLoad\": -1\n" + + " \"maxSegmentsToLoadPerCoordinationCycle\": -1\n" + "}\n"; mapper.readValue( @@ -859,7 +859,7 @@ private void assertConfig( boolean pauseCoordination, boolean replicateAfterLoadTimeout, int maxNonPrimaryReplicantsToLoad, - int maxSegmentsToLoad + int maxSegmentsToLoadPerCoordinationCycle ) { Assert.assertEquals( @@ -888,6 +888,6 @@ private void assertConfig( Assert.assertEquals(pauseCoordination, config.getPauseCoordination()); Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout()); Assert.assertEquals(maxNonPrimaryReplicantsToLoad, config.getMaxNonPrimaryReplicantsToLoad()); - Assert.assertEquals(maxSegmentsToLoad, config.getMaxSegmentsToLoadPerCoordinationCycle()); + Assert.assertEquals(maxSegmentsToLoadPerCoordinationCycle, config.getMaxSegmentsToLoadPerCoordinationCycle()); } } From b138fa2f8ac9a093c6acfc0f167094b1ddf2a771 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Tue, 7 Jun 2022 10:29:38 -0500 Subject: [PATCH 09/10] fixup CoordinatorDynamicConfig --- .../druid/server/coordinator/CoordinatorDynamicConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 92d0dd7797dc..d80d1d223c1c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -423,7 +423,7 @@ public String toString() ", pauseCoordination=" + pauseCoordination + ", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout + ", maxNonPrimaryReplicantsToLoad=" + maxNonPrimaryReplicantsToLoad + - ", maxPrimaryReplicantsToLoad=" + maxSegmentsToLoadPerCoordinationCycle + + ", maxSegmentsToLoadPerCoordinationCycle=" + maxSegmentsToLoadPerCoordinationCycle + '}'; } From 38b8a6abea12db6735d91d7f7dca3e1ee8b6f7bc Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Wed, 8 Jun 2022 11:03:25 -0500 Subject: [PATCH 10/10] re-work CooridnatorDynamicConfigTest to focus on testing the most interesting parts of CoordinatorDynamicConfig --- .../http/CoordinatorDynamicConfigTest.java | 940 +++++------------- 1 file changed, 247 insertions(+), 693 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index daf02c8d4fbd..ff40d5794bad 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -27,439 +27,92 @@ import org.junit.Assert; import org.junit.Test; -import java.util.Set; - -/** - * - */ public class CoordinatorDynamicConfigTest { - private static final int EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100; - private final ObjectMapper mapper = TestHelper.makeJsonMapper(); @Test - public void testSerde() throws Exception + public void testEqualsAndHashCodeSanity() { - String jsonStr = "{\n" - + " \"millisToWaitBeforeDeleting\": 1,\n" - + " \"mergeBytesLimit\": 1,\n" - + " \"mergeSegmentsLimit\" : 1,\n" - + " \"maxSegmentsToMove\": 1,\n" - + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" - + " \"replicantLifetime\": 1,\n" - + " \"replicationThrottleLimit\": 1,\n" - + " \"balancerComputeThreads\": 2, \n" - + " \"emitBalancingStats\": true,\n" - + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" - + " \"maxSegmentsInNodeLoadingQueue\": 1,\n" - + " \"decommissioningNodes\": [\"host1\", \"host2\"],\n" - + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n" - + " \"pauseCoordination\": false,\n" - + " \"replicateAfterLoadTimeout\": false,\n" - + " \"maxNonPrimaryReplicantsToLoad\": 2147483647\n" - + "}\n"; - - CoordinatorDynamicConfig actual = mapper.readValue( - mapper.writeValueAsString( - mapper.readValue( - jsonStr, - CoordinatorDynamicConfig.class - ) - ), - CoordinatorDynamicConfig.class - ); - ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); - ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 1, - 1, - 1, - 2, - true, - whitelist, - false, - 1, - decommissioning, - 9, - false, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE - ); - - actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 1, - 1, - 1, - 2, - true, - whitelist, - false, - 1, - ImmutableSet.of("host1"), - 9, - false, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE - ); - - actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 1, - 1, - 1, - 2, - true, - whitelist, - false, - 1, - ImmutableSet.of("host1"), - 5, - false, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE - ); - - actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 1, - 1, - 1, - 2, - true, - whitelist, - false, - 1, - ImmutableSet.of("host1"), - 5, - true, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE - ); - - actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 10, - 1, - 1, - 2, - true, - whitelist, - false, - 1, - ImmutableSet.of("host1"), - 5, - true, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE - ); - - actual = CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 10, - 1, - 1, - 2, - true, - whitelist, - false, - 1, - ImmutableSet.of("host1"), - 5, - true, - true, - Integer.MAX_VALUE, - Integer.MAX_VALUE - ); - - actual = CoordinatorDynamicConfig.builder().withMaxNonPrimaryReplicantsToLoad(10).build(actual); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 10, - 1, - 1, - 2, - true, - whitelist, - false, - 1, - ImmutableSet.of("host1"), - 5, - true, - true, - 10, - Integer.MAX_VALUE - ); - - actual = CoordinatorDynamicConfig.builder().withMaxSegmentsToLoadPerCoordinationCycle(20).build(actual); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 10, - 1, - 1, - 2, - true, - whitelist, - false, - 1, - ImmutableSet.of("host1"), - 5, - true, - true, - 10, - 20 - ); + CoordinatorDynamicConfig config1 = CoordinatorDynamicConfig.builder().build(); + CoordinatorDynamicConfig config2 = CoordinatorDynamicConfig.builder().build(); + Assert.assertEquals(config1, config2); + Assert.assertEquals(config1.hashCode(), config2.hashCode()); } @Test - public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources() + public void testSerdeFailForInvalidPercentOfSegmentsToConsiderPerMove() throws Exception { - CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1, - 1, - 1, - 1, - null, - false, - 1, - 2, - 10, - true, - null, - null, - null, - ImmutableSet.of("host1"), - 5, - true, - true, - 10, - 20 - ); - Assert.assertTrue(config.isKillUnusedSegmentsInAllDataSources()); - Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty()); - } + try { + String jsonStr = "{\n" + + " \"percentOfSegmentsToConsiderPerMove\": 0\n" + + "}\n"; - @Test - public void testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegmentsInAllDatasources() - { - CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1, - 1, - 1, - 1, - null, - false, - 1, - 2, - 10, - true, - ImmutableSet.of("test1"), - null, - null, - ImmutableSet.of("host1"), - 5, - true, - true, - 10, - 20 - ); - Assert.assertFalse(config.isKillUnusedSegmentsInAllDataSources()); - Assert.assertEquals(ImmutableSet.of("test1"), config.getSpecificDataSourcesToKillUnusedSegmentsIn()); - } + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); - @Test - public void testDecommissioningParametersBackwardCompatibility() throws Exception - { - String jsonStr = "{\n" - + " \"millisToWaitBeforeDeleting\": 1,\n" - + " \"mergeBytesLimit\": 1,\n" - + " \"mergeSegmentsLimit\" : 1,\n" - + " \"maxSegmentsToMove\": 1,\n" - + " \"replicantLifetime\": 1,\n" - + " \"replicationThrottleLimit\": 1,\n" - + " \"balancerComputeThreads\": 2, \n" - + " \"emitBalancingStats\": true,\n" - + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" - + " \"maxSegmentsInNodeLoadingQueue\": 1\n" - + "}\n"; + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } - CoordinatorDynamicConfig actual = mapper.readValue( - mapper.writeValueAsString( - mapper.readValue( - jsonStr, - CoordinatorDynamicConfig.class - ) - ), - CoordinatorDynamicConfig.class - ); - ImmutableSet decommissioning = ImmutableSet.of(); - ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 100, - 1, - 1, - 2, - true, - whitelist, - false, - 1, - decommissioning, - 0, - false, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE - ); + try { + String jsonStr = "{\n" + + " \"percentOfSegmentsToConsiderPerMove\": -100\n" + + "}\n"; - actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 100, - 1, - 1, - 2, - true, - whitelist, - false, - 1, - ImmutableSet.of("host1"), - 0, - false, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE - ); + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); - actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 100, - 1, - 1, - 2, - true, - whitelist, - false, - 1, - ImmutableSet.of("host1"), - 5, - false, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE - ); - } + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } - @Test - public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception - { - String jsonStr = "{\n" - + " \"millisToWaitBeforeDeleting\": 1,\n" - + " \"mergeBytesLimit\": 1,\n" - + " \"mergeSegmentsLimit\" : 1,\n" - + " \"maxSegmentsToMove\": 1,\n" - + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" - + " \"replicantLifetime\": 1,\n" - + " \"replicationThrottleLimit\": 1,\n" - + " \"balancerComputeThreads\": 2, \n" - + " \"emitBalancingStats\": true,\n" - + " \"killDataSourceWhitelist\": \"test1, test2\", \n" - + " \"maxSegmentsInNodeLoadingQueue\": 1\n" - + "}\n"; + try { + String jsonStr = "{\n" + + " \"percentOfSegmentsToConsiderPerMove\": 105\n" + + "}\n"; - CoordinatorDynamicConfig actual = mapper.readValue( - mapper.writeValueAsString( - mapper.readValue( - jsonStr, - CoordinatorDynamicConfig.class - ) - ), - CoordinatorDynamicConfig.class - ); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 1, - 1, - 1, - 2, - true, - ImmutableSet.of("test1", "test2"), - false, - 1, - ImmutableSet.of(), - 0, - false, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE - ); + mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + + Assert.fail("deserialization should fail."); + } + catch (JsonMappingException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + } } @Test - public void testInvalidMaxPrimaryReplicantsToLoad() throws Exception + public void testSerdeFailForInvalidDecommissioningMaxPercentOfSegmentsToConsiderToMove() throws Exception { try { String jsonStr = "{\n" - + " \"maxSegmentsToLoadPerCoordinationCycle\": -1\n" + + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": -100\n" + "}\n"; mapper.readValue( @@ -477,14 +130,10 @@ public void testInvalidMaxPrimaryReplicantsToLoad() throws Exception catch (JsonMappingException e) { Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); } - } - @Test - public void testSerdeHandleInvalidPercentOfSegmentsToConsiderPerMove() throws Exception - { try { String jsonStr = "{\n" - + " \"percentOfSegmentsToConsiderPerMove\": 0\n" + + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 105\n" + "}\n"; mapper.readValue( @@ -502,10 +151,14 @@ public void testSerdeHandleInvalidPercentOfSegmentsToConsiderPerMove() throws Ex catch (JsonMappingException e) { Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); } + } + @Test + public void testSerdeFailForInvalidMaxNonPrimaryReplicantsToLoad() throws Exception + { try { String jsonStr = "{\n" - + " \"percentOfSegmentsToConsiderPerMove\": -100\n" + + " \"maxNonPrimaryReplicantsToLoad\": -1\n" + "}\n"; mapper.readValue( @@ -523,10 +176,14 @@ public void testSerdeHandleInvalidPercentOfSegmentsToConsiderPerMove() throws Ex catch (JsonMappingException e) { Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); } + } + @Test + public void testSerdeFailForInvalidMaxPrimaryReplicantsToLoad() throws Exception + { try { String jsonStr = "{\n" - + " \"percentOfSegmentsToConsiderPerMove\": 105\n" + + " \"maxSegmentsToLoadPerCoordinationCycle\": -1\n" + "}\n"; mapper.readValue( @@ -547,23 +204,11 @@ public void testSerdeHandleInvalidPercentOfSegmentsToConsiderPerMove() throws Ex } @Test - public void testHandleMissingPercentOfSegmentsToConsiderPerMove() throws Exception + public void testDynamicSerdeOfSpecificDataSourcesToKillUnusedSegmentsIn() throws Exception { - String jsonStr = "{\n" - + " \"millisToWaitBeforeDeleting\": 1,\n" - + " \"mergeBytesLimit\": 1,\n" - + " \"mergeSegmentsLimit\" : 1,\n" - + " \"maxSegmentsToMove\": 1,\n" - + " \"replicantLifetime\": 1,\n" - + " \"replicationThrottleLimit\": 1,\n" - + " \"balancerComputeThreads\": 2, \n" - + " \"emitBalancingStats\": true,\n" - + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" - + " \"maxSegmentsInNodeLoadingQueue\": 1,\n" - + " \"decommissioningNodes\": [\"host1\", \"host2\"],\n" - + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n" - + " \"pauseCoordination\": false\n" - + "}\n"; + String jsonStr = "{\n" + + " \"killDataSourceWhitelist\": \"test1, test2\"\n" + + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( mapper.writeValueAsString( mapper.readValue( @@ -573,48 +218,41 @@ public void testHandleMissingPercentOfSegmentsToConsiderPerMove() throws Excepti ), CoordinatorDynamicConfig.class ); - ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); - ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 100, - 1, - 1, - 2, - true, - whitelist, - false, - 1, - decommissioning, - 9, - false, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE + Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getSpecificDataSourcesToKillUnusedSegmentsIn()); + + jsonStr = "{\n" + + " \"killDataSourceWhitelist\": [\"test1\", \"test2\"]\n" + + "}\n"; + actual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getSpecificDataSourcesToKillUnusedSegmentsIn()); + + jsonStr = "{}"; + actual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class ); + Assert.assertEquals(ImmutableSet.of(), actual.getSpecificDataSourcesToKillUnusedSegmentsIn()); } @Test - public void testSerdeWithKillAllDataSources() throws Exception + public void testDynamicSerdeOfDataSourcesToNotKillStalePendingSegmentsIn() throws Exception { - String jsonStr = "{\n" - + " \"millisToWaitBeforeDeleting\": 1,\n" - + " \"mergeBytesLimit\": 1,\n" - + " \"mergeSegmentsLimit\" : 1,\n" - + " \"maxSegmentsToMove\": 1,\n" - + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" - + " \"replicantLifetime\": 1,\n" - + " \"replicationThrottleLimit\": 1,\n" - + " \"balancerComputeThreads\": 2, \n" - + " \"emitBalancingStats\": true,\n" - + " \"killAllDataSources\": true,\n" - + " \"maxSegmentsInNodeLoadingQueue\": 1\n" - + "}\n"; - + String jsonStr = "{\n" + + " \"killPendingSegmentsSkipList\": \"test1, test2\"\n" + + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( mapper.writeValueAsString( mapper.readValue( @@ -624,62 +262,41 @@ public void testSerdeWithKillAllDataSources() throws Exception ), CoordinatorDynamicConfig.class ); + Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getDataSourcesToNotKillStalePendingSegmentsIn()); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 1, - 1, - 1, - 2, - true, - ImmutableSet.of(), - true, - 1, - ImmutableSet.of(), - 0, - false, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE + jsonStr = "{\n" + + " \"killPendingSegmentsSkipList\": [\"test1\", \"test2\"]\n" + + "}\n"; + actual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class ); + Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getDataSourcesToNotKillStalePendingSegmentsIn()); - // killAllDataSources is a config in versions 0.22.x and older and is no longer used. - // This used to be an invalid config, but as of 0.23.0 the killAllDataSources flag no longer exsist, - // so this is a valid config - jsonStr = "{\n" - + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" - + " \"killAllDataSources\": true,\n" - + " \"percentOfSegmentsToConsiderPerMove\": 1\n" - + "}\n"; + jsonStr = "{}"; actual = mapper.readValue( - jsonStr, + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), CoordinatorDynamicConfig.class ); - - Assert.assertFalse(actual.isKillUnusedSegmentsInAllDataSources()); - Assert.assertEquals(2, actual.getSpecificDataSourcesToKillUnusedSegmentsIn().size()); + Assert.assertEquals(ImmutableSet.of(), actual.getDecommissioningNodes()); } @Test - public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Exception + public void testDynamicSerdeOfDecommissioningNodes() throws Exception { - String jsonStr = "{\n" - + " \"millisToWaitBeforeDeleting\": 1,\n" - + " \"mergeBytesLimit\": 1,\n" - + " \"mergeSegmentsLimit\" : 1,\n" - + " \"maxSegmentsToMove\": 1,\n" - + " \"percentOfSegmentsToConsiderPerMove\": 1,\n" - + " \"replicantLifetime\": 1,\n" - + " \"replicationThrottleLimit\": 1,\n" - + " \"balancerComputeThreads\": 2, \n" - + " \"emitBalancingStats\": true,\n" - + " \"killAllDataSources\": true\n" - + "}\n"; - + String jsonStr = "{\n" + + " \"decommissioningNodes\": \"test1, test2\"\n" + + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( mapper.writeValueAsString( mapper.readValue( @@ -689,205 +306,142 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti ), CoordinatorDynamicConfig.class ); + Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getDecommissioningNodes()); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 1, - 1, - 1, - 2, - true, - ImmutableSet.of(), - true, - EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, - ImmutableSet.of(), - 0, - false, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE + jsonStr = "{\n" + + " \"decommissioningNodes\": [\"test1\", \"test2\"]\n" + + "}\n"; + actual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class ); - } + Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getDecommissioningNodes()); - @Test - public void testBuilderDefaults() - { - CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder().build(); - ImmutableSet emptyList = ImmutableSet.of(); - assertConfig( - defaultConfig, - 900000, - 524288000, - 100, - 5, - 100, - 15, - 10, - 1, - false, - emptyList, - true, - EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, - emptyList, - 70, - false, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE + jsonStr = "{}"; + actual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class ); + Assert.assertEquals(ImmutableSet.of(), actual.getDataSourcesToNotKillStalePendingSegmentsIn()); } @Test - public void testBuilderWithDefaultSpecificDataSourcesToKillUnusedSegmentsInSpecified() + public void testSerdeHandleNullableFields() throws Exception { - CoordinatorDynamicConfig defaultConfig = - CoordinatorDynamicConfig.builder() - .withSpecificDataSourcesToKillUnusedSegmentsIn(ImmutableSet.of("DATASOURCE")) - .build(); - CoordinatorDynamicConfig config = CoordinatorDynamicConfig.builder().build(defaultConfig); - assertConfig( - config, - 900000, - 524288000, - 100, - 5, - 100, - 15, - 10, - 1, - false, - ImmutableSet.of("DATASOURCE"), - false, - EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, - ImmutableSet.of(), - 70, - false, - false, - Integer.MAX_VALUE, - Integer.MAX_VALUE + String jsonStr = "{}"; + CoordinatorDynamicConfig actual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class ); + Assert.assertEquals(100.0, actual.getPercentOfSegmentsToConsiderPerMove(), 0.0); + Assert.assertEquals(100.0, actual.getMaxSegmentsInNodeLoadingQueue(), 0.0); + Assert.assertEquals(Integer.MAX_VALUE, actual.getMaxNonPrimaryReplicantsToLoad()); + Assert.assertEquals(Integer.MAX_VALUE, actual.getMaxSegmentsToLoadPerCoordinationCycle()); } @Test - public void testUpdate() + public void testIsKillUnusedSegmentsInAllDataSources() throws Exception { - CoordinatorDynamicConfig current = CoordinatorDynamicConfig - .builder() - .withSpecificDataSourcesToKillUnusedSegmentsIn(ImmutableSet.of("x")) - .build(); - - Assert.assertEquals( - current, - new CoordinatorDynamicConfig.Builder( - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ).build(current) + String jsonStr = "{\n" + + " \"killDataSourceWhitelist\": []\n" + + "}\n"; + CoordinatorDynamicConfig actual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class ); - } - - @Test - public void testSerdeHandleInvalidMaxNonPrimaryReplicantsToLoad() throws Exception - { - try { - String jsonStr = "{\n" - + " \"maxNonPrimaryReplicantsToLoad\": -1\n" - + "}\n"; - - mapper.readValue( - mapper.writeValueAsString( - mapper.readValue( - jsonStr, - CoordinatorDynamicConfig.class - ) - ), - CoordinatorDynamicConfig.class - ); + Assert.assertTrue(actual.isKillUnusedSegmentsInAllDataSources()); - Assert.fail("deserialization should fail."); - } - catch (JsonMappingException e) { - Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); - } + jsonStr = "{\n" + + " \"killDataSourceWhitelist\": [\"test1\", \"test2\"]\n" + + "}\n"; + actual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + Assert.assertFalse(actual.isKillUnusedSegmentsInAllDataSources()); } + /** + * This test is applicable for legacy configs that are removed between versions. + * + * If these unkown fields are included in the JSON payload for deserialization, they are ignored. + * @throws Exception + */ @Test - public void testEqualsAndHashCodeSanity() + public void testUnknownFieldsHaveNoImpactOnDeserialization() throws Exception { - CoordinatorDynamicConfig config1 = CoordinatorDynamicConfig.builder().build(); - CoordinatorDynamicConfig config2 = CoordinatorDynamicConfig.builder().build(); - Assert.assertEquals(config1, config2); - Assert.assertEquals(config1.hashCode(), config2.hashCode()); + String jsonStr = "{\n" + + " \"dummyField\": []\n" + + "}\n"; + CoordinatorDynamicConfig dummyActual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + jsonStr = "{}"; + CoordinatorDynamicConfig actual = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class + ); + Assert.assertEquals(actual, dummyActual); } - private void assertConfig( - CoordinatorDynamicConfig config, - long expectedLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments, - long expectedMergeBytesLimit, - int expectedMergeSegmentsLimit, - int expectedMaxSegmentsToMove, - int expectedPercentOfSegmentsToConsiderPerMove, - int expectedReplicantLifetime, - int expectedReplicationThrottleLimit, - int expectedBalancerComputeThreads, - boolean expectedEmitingBalancingStats, - Set expectedSpecificDataSourcesToKillUnusedSegmentsIn, - boolean expectedKillUnusedSegmentsInAllDataSources, - int expectedMaxSegmentsInNodeLoadingQueue, - Set decommissioningNodes, - int decommissioningMaxPercentOfMaxSegmentsToMove, - boolean pauseCoordination, - boolean replicateAfterLoadTimeout, - int maxNonPrimaryReplicantsToLoad, - int maxSegmentsToLoadPerCoordinationCycle - ) + /** + * Confirms that the builder will honor values in CoordinatorDynamicConfig when it is used as the Defaults supplier. + * + * @throws Exception + */ + @Test + public void testBuilderBuildWithCoordinatorDynamicConfigDefaults() throws Exception { - Assert.assertEquals( - expectedLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments, - config.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments() - ); - Assert.assertEquals(expectedMergeBytesLimit, config.getMergeBytesLimit()); - Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit()); - Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove()); - Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove, config.getPercentOfSegmentsToConsiderPerMove(), 0); - Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime()); - Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit()); - Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads()); - Assert.assertEquals(expectedEmitingBalancingStats, config.emitBalancingStats()); - Assert.assertEquals( - expectedSpecificDataSourcesToKillUnusedSegmentsIn, - config.getSpecificDataSourcesToKillUnusedSegmentsIn() - ); - Assert.assertEquals(expectedKillUnusedSegmentsInAllDataSources, config.isKillUnusedSegmentsInAllDataSources()); - Assert.assertEquals(expectedMaxSegmentsInNodeLoadingQueue, config.getMaxSegmentsInNodeLoadingQueue()); - Assert.assertEquals(decommissioningNodes, config.getDecommissioningNodes()); - Assert.assertEquals( - decommissioningMaxPercentOfMaxSegmentsToMove, - config.getDecommissioningMaxPercentOfMaxSegmentsToMove() + String jsonStr = "{\n" + + " \"decommissioningNodes\": \"test1, test2\"\n" + + "}\n"; + CoordinatorDynamicConfig defaults = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ) + ), + CoordinatorDynamicConfig.class ); - Assert.assertEquals(pauseCoordination, config.getPauseCoordination()); - Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout()); - Assert.assertEquals(maxNonPrimaryReplicantsToLoad, config.getMaxNonPrimaryReplicantsToLoad()); - Assert.assertEquals(maxSegmentsToLoadPerCoordinationCycle, config.getMaxSegmentsToLoadPerCoordinationCycle()); + CoordinatorDynamicConfig actual = CoordinatorDynamicConfig.builder().build(defaults); + Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getDecommissioningNodes()); + } }