diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index a14b3beb56c7..7472e1b4e502 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -936,7 +936,8 @@ A sample Coordinator dynamic config JSON object is shown below:
"decommissioningMaxPercentOfMaxSegmentsToMove": 70,
"pauseCoordination": false,
"replicateAfterLoadTimeout": false,
- "maxNonPrimaryReplicantsToLoad": 2147483647
+ "maxNonPrimaryReplicantsToLoad": 2147483647,
+ "maxSegmentsToLoadPerCoordinationCycle": 2147483647
}
```
@@ -962,7 +963,8 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`decommissioningMaxPercentOfMaxSegmentsToMove`| Upper limit of segments the Coordinator can move from decommissioning servers to active non-decommissioning servers during a single run. This value is relative to the total maximum number of segments that can be moved at any given time based upon the value of `maxSegmentsToMove`.
If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, the Coordinator does not move segments to decommissioning servers, effectively putting them in a type of "maintenance" mode. In this case, decommissioning servers do not participate in balancing or assignment by load rules. The Coordinator still considers segments on decommissioning servers as candidates to replicate on active servers.
Decommissioning can stall if there are no available active servers to move the segments to. You can use the maximum percent of decommissioning segment movements to prioritize balancing or to decrease commissioning time to prevent active servers from being overloaded. The value must be between 0 and 100.|70|
|`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false|
|`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false|
-|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time.|`Integer.MAX_VALUE`|
+|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of non-primary replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time. This configuration has no effect if the value is greater than the value of `maxSegmentsToLoadPerCoordinationCycle`|`Integer.MAX_VALUE`|
+|`maxSegmentsToLoadPerCoordinationCycle`|This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from `maxNonPrimaryReplicantsToLoad` because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spinning and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing.|`Integer.MAX_VALUE`|
To view the audit history of Coordinator dynamic config issue a GET request to the URL -
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index b9f0d490a3d3..b791b901a5e5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -104,6 +104,12 @@ public class CoordinatorDynamicConfig
*/
private final int maxNonPrimaryReplicantsToLoad;
+ /**
+ * This is the upper limit on segments to load per coordination cycle. Once this limit is hit, the coordinator will
+ * move on to it's next task.
+ */
+ private final int maxSegmentsToLoadPerCoordinationCycle;
+
private static final Logger log = new Logger(CoordinatorDynamicConfig.class);
@JsonCreator
@@ -137,7 +143,8 @@ public CoordinatorDynamicConfig(
@JsonProperty("pauseCoordination") boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout,
@JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad,
- @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment
+ @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment,
+ @JsonProperty("maxSegmentsToLoadPerCoordinationCycle") @Nullable Integer maxSegmentsToLoadPerCoordinationCycle
)
{
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
@@ -209,6 +216,22 @@ public CoordinatorDynamicConfig(
} else {
this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
}
+
+ if (maxSegmentsToLoadPerCoordinationCycle == null) {
+ log.debug(
+ "maxSegmentsToLoadPerCoordinationCycle was null! This is likely because your metastore does not "
+ + "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value "
+ + "to the Druid default of %d. It is recommended that you re-submit your dynamic config with your "
+ + "desired value for maxNonPrimaryReplicantsToLoad",
+ Builder.DEFAULT_MAX_SEGMENTS_TO_LOAD_PER_COORDINATION_CYCLE
+ );
+ maxSegmentsToLoadPerCoordinationCycle = Builder.DEFAULT_MAX_SEGMENTS_TO_LOAD_PER_COORDINATION_CYCLE;
+ }
+ Preconditions.checkArgument(
+ maxSegmentsToLoadPerCoordinationCycle >= 0,
+ "maxPrimaryReplicantsToLoad must be greater than or equal to 0."
+ );
+ this.maxSegmentsToLoadPerCoordinationCycle = maxSegmentsToLoadPerCoordinationCycle;
}
private static Set parseJsonStringOrArray(Object jsonStringOrArray)
@@ -390,6 +413,13 @@ public int getMaxNonPrimaryReplicantsToLoad()
return maxNonPrimaryReplicantsToLoad;
}
+ @Min(0)
+ @JsonProperty
+ public int getMaxSegmentsToLoadPerCoordinationCycle()
+ {
+ return maxSegmentsToLoadPerCoordinationCycle;
+ }
+
@Override
public String toString()
{
@@ -413,6 +443,7 @@ public String toString()
", pauseCoordination=" + pauseCoordination +
", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout +
", maxNonPrimaryReplicantsToLoad=" + maxNonPrimaryReplicantsToLoad +
+ ", maxSegmentsToLoadPerCoordinationCycle=" + maxSegmentsToLoadPerCoordinationCycle +
'}';
}
@@ -480,6 +511,9 @@ public boolean equals(Object o)
if (maxNonPrimaryReplicantsToLoad != that.maxNonPrimaryReplicantsToLoad) {
return false;
}
+ if (maxSegmentsToLoadPerCoordinationCycle != that.maxSegmentsToLoadPerCoordinationCycle) {
+ return false;
+ }
return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove;
}
@@ -503,7 +537,8 @@ public int hashCode()
decommissioningNodes,
decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination,
- maxNonPrimaryReplicantsToLoad
+ maxNonPrimaryReplicantsToLoad,
+ maxSegmentsToLoadPerCoordinationCycle
);
}
@@ -531,6 +566,7 @@ public static class Builder
private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false;
private static final int DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE;
private static final boolean DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT = false;
+ private static final int DEFAULT_MAX_SEGMENTS_TO_LOAD_PER_COORDINATION_CYCLE = Integer.MAX_VALUE;
private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
private Long mergeBytesLimit;
@@ -551,6 +587,7 @@ public static class Builder
private Boolean replicateAfterLoadTimeout;
private Integer maxNonPrimaryReplicantsToLoad;
private Boolean useRoundRobinSegmentAssignment;
+ private Integer maxSegmentsToLoadPerCoordinationCycle;
public Builder()
{
@@ -578,7 +615,8 @@ public Builder(
@JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout,
@JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad,
- @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment
+ @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment,
+ @JsonProperty("maxSegmentsToLoadPerCoordinationCycle") @Nullable Integer maxSegmentsToLoadPerCoordinationCycle
)
{
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
@@ -601,6 +639,7 @@ public Builder(
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad;
this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
+ this.maxSegmentsToLoadPerCoordinationCycle = maxSegmentsToLoadPerCoordinationCycle;
}
public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis)
@@ -713,6 +752,12 @@ public Builder withUseRoundRobinSegmentAssignment(boolean useRoundRobinSegmentAs
return this;
}
+ public Builder withMaxSegmentsToLoadPerCoordinationCycle(int maxSegmentsToLoadPerCoordinationCycle)
+ {
+ this.maxSegmentsToLoadPerCoordinationCycle = maxSegmentsToLoadPerCoordinationCycle;
+ return this;
+ }
+
public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
@@ -742,7 +787,9 @@ public CoordinatorDynamicConfig build()
replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
: maxNonPrimaryReplicantsToLoad,
- useRoundRobinSegmentAssignment == null ? DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
+ useRoundRobinSegmentAssignment == null ? DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment,
+ maxSegmentsToLoadPerCoordinationCycle == null ? DEFAULT_MAX_SEGMENTS_TO_LOAD_PER_COORDINATION_CYCLE
+ : maxSegmentsToLoadPerCoordinationCycle
);
}
@@ -781,7 +828,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
maxNonPrimaryReplicantsToLoad == null
? defaults.getMaxNonPrimaryReplicantsToLoad()
: maxNonPrimaryReplicantsToLoad,
- useRoundRobinSegmentAssignment == null ? defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
+ useRoundRobinSegmentAssignment == null ? defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment,
+ maxSegmentsToLoadPerCoordinationCycle == null ? defaults.getMaxSegmentsToLoadPerCoordinationCycle() : maxSegmentsToLoadPerCoordinationCycle
);
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
index 6a79fd9dcafb..8661a71381fb 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
@@ -30,6 +30,7 @@
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ReplicationThrottler;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
+import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@@ -122,6 +123,12 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
}
for (DataSegment segment : params.getUsedSegments()) {
+ if (stats.getGlobalStat(LoadRule.AGGREGATE_ASSIGNED_COUNT) >= params.getCoordinatorDynamicConfig().getMaxSegmentsToLoadPerCoordinationCycle()) {
+ log.info("Maximum number of segments [%d] have been loaded for the current RunRules execution.",
+ params.getCoordinatorDynamicConfig().getMaxSegmentsToLoadPerCoordinationCycle());
+ break;
+ }
+
if (overshadowed.contains(segment)) {
// Skipping overshadowed segments
continue;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
index 933b63e47263..d9145cd79114 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
@@ -54,7 +54,8 @@
public abstract class LoadRule implements Rule
{
private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
- static final String ASSIGNED_COUNT = "assignedCount";
+ public static final String AGGREGATE_ASSIGNED_COUNT = "aggregateAssignedCount";
+ public static final String ASSIGNED_COUNT = "assignedCount";
static final String DROPPED_COUNT = "droppedCount";
public final String NON_PRIMARY_ASSIGNED_COUNT = "totalNonPrimaryReplicantsLoaded";
public static final String REQUIRED_CAPACITY = "requiredCapacity";
@@ -183,6 +184,7 @@ private void assign(
// numAssigned - 1 because we don't want to count the primary assignment
stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned - 1);
+ stats.addToGlobalStat(AGGREGATE_ASSIGNED_COUNT, numAssigned);
stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned);
@@ -329,6 +331,7 @@ private void assignReplicas(
createLoadQueueSizeLimitingPredicate(segment),
segment
);
+ stats.addToGlobalStat(AGGREGATE_ASSIGNED_COUNT, numAssigned);
stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned);
stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned);
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
index eb3be4c89510..4c8875456612 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
@@ -1444,6 +1444,55 @@ public void testOneNodesOneTierOneReplicantCostBalancerStrategyNotEnoughSpace()
EasyMock.verify(mockPeon);
}
+ @Test
+ public void testRunRulesMaxSegmentsToLoadLimit()
+ {
+ mockCoordinator();
+ mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ mockEmptyPeon();
+
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
+ Collections.singletonList(
+ new IntervalLoadRule(
+ Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"),
+ ImmutableMap.of("normal", 2)
+ )
+ )).atLeastOnce();
+ EasyMock.replay(databaseRuleManager);
+
+ DruidCluster druidCluster = DruidClusterBuilder
+ .newBuilder()
+ .addTier(
+ "normal",
+ new ServerHolder(
+ new DruidServer("serverNorm", "hostNorm", null, 1000, ServerType.HISTORICAL, "normal", 0)
+ .toImmutableDruidServer(),
+ mockPeon
+ ),
+ new ServerHolder(
+ new DruidServer("serverNorm2", "hostNorm2", null, 1000, ServerType.HISTORICAL, "normal", 0)
+ .toImmutableDruidServer(),
+ mockPeon
+ )
+ )
+ .build();
+
+ ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+ BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
+
+ DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy)
+ .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToLoadPerCoordinationCycle(20).build())
+ .build();
+
+ DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
+ CoordinatorStats stats = afterParams.getCoordinatorStats();
+ Assert.assertEquals(20L, stats.getGlobalStat(LoadRule.AGGREGATE_ASSIGNED_COUNT));
+
+ exec.shutdown();
+ EasyMock.verify(mockPeon);
+ }
+
private void mockCoordinator()
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();
diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index 707ea1d0f90d..917a2c409c27 100644
--- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -27,420 +27,113 @@
import org.junit.Assert;
import org.junit.Test;
-import java.util.Set;
-
-/**
- *
- */
public class CoordinatorDynamicConfigTest
{
- private static final int EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100;
-
private final ObjectMapper mapper = TestHelper.makeJsonMapper();
@Test
- public void testSerde() throws Exception
+ public void testEqualsAndHashCodeSanity()
{
- String jsonStr = "{\n"
- + " \"millisToWaitBeforeDeleting\": 1,\n"
- + " \"mergeBytesLimit\": 1,\n"
- + " \"mergeSegmentsLimit\" : 1,\n"
- + " \"maxSegmentsToMove\": 1,\n"
- + " \"percentOfSegmentsToConsiderPerMove\": 1,\n"
- + " \"replicantLifetime\": 1,\n"
- + " \"replicationThrottleLimit\": 1,\n"
- + " \"balancerComputeThreads\": 2, \n"
- + " \"emitBalancingStats\": true,\n"
- + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
- + " \"maxSegmentsInNodeLoadingQueue\": 1,\n"
- + " \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
- + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n"
- + " \"pauseCoordination\": false,\n"
- + " \"replicateAfterLoadTimeout\": false,\n"
- + " \"maxNonPrimaryReplicantsToLoad\": 2147483647\n"
- + "}\n";
+ CoordinatorDynamicConfig config1 = CoordinatorDynamicConfig.builder().build();
+ CoordinatorDynamicConfig config2 = CoordinatorDynamicConfig.builder().build();
+ Assert.assertEquals(config1, config2);
+ Assert.assertEquals(config1.hashCode(), config2.hashCode());
+ }
- CoordinatorDynamicConfig actual = mapper.readValue(
- mapper.writeValueAsString(
- mapper.readValue(
- jsonStr,
- CoordinatorDynamicConfig.class
- )
- ),
- CoordinatorDynamicConfig.class
- );
- ImmutableSet decommissioning = ImmutableSet.of("host1", "host2");
- ImmutableSet whitelist = ImmutableSet.of("test1", "test2");
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 1,
- true,
- 1,
- 1,
- 2,
- true,
- whitelist,
- false,
- 1,
- decommissioning,
- 9,
- false,
- false,
- Integer.MAX_VALUE
- );
+ @Test
+ public void testSerdeFailForInvalidPercentOfSegmentsToConsiderPerMove() throws Exception
+ {
+ try {
+ String jsonStr = "{\n"
+ + " \"percentOfSegmentsToConsiderPerMove\": 0\n"
+ + "}\n";
- actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 1,
- true,
- 1,
- 1,
- 2,
- true,
- whitelist,
- false,
- 1,
- ImmutableSet.of("host1"),
- 9,
- false,
- false,
- Integer.MAX_VALUE
- );
+ mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
+ );
- actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 1,
- true,
- 1,
- 1,
- 2,
- true,
- whitelist,
- false,
- 1,
- ImmutableSet.of("host1"),
- 5,
- false,
- false,
- Integer.MAX_VALUE
- );
+ Assert.fail("deserialization should fail.");
+ }
+ catch (JsonMappingException e) {
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
- actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual);
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 1,
- true,
- 1,
- 1,
- 2,
- true,
- whitelist,
- false,
- 1,
- ImmutableSet.of("host1"),
- 5,
- true,
- false,
- Integer.MAX_VALUE
- );
+ try {
+ String jsonStr = "{\n"
+ + " \"percentOfSegmentsToConsiderPerMove\": -100\n"
+ + "}\n";
- actual = CoordinatorDynamicConfig.builder()
- .withPercentOfSegmentsToConsiderPerMove(10)
- .withUseBatchedSegmentSampler(false)
- .build(actual);
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 10,
- false,
- 1,
- 1,
- 2,
- true,
- whitelist,
- false,
- 1,
- ImmutableSet.of("host1"),
- 5,
- true,
- false,
- Integer.MAX_VALUE
- );
+ mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
+ );
- actual = CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual);
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 10,
- false,
- 1,
- 1,
- 2,
- true,
- whitelist,
- false,
- 1,
- ImmutableSet.of("host1"),
- 5,
- true,
- true,
- Integer.MAX_VALUE
- );
+ Assert.fail("deserialization should fail.");
+ }
+ catch (JsonMappingException e) {
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
- actual = CoordinatorDynamicConfig.builder().withMaxNonPrimaryReplicantsToLoad(10).build(actual);
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 10,
- false,
- 1,
- 1,
- 2,
- true,
- whitelist,
- false,
- 1,
- ImmutableSet.of("host1"),
- 5,
- true,
- true,
- 10
- );
- }
+ try {
+ String jsonStr = "{\n"
+ + " \"percentOfSegmentsToConsiderPerMove\": 105\n"
+ + "}\n";
- @Test
- public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources()
- {
- CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(
- 1,
- 1,
- 1,
- 1,
- null,
- false,
- 1,
- 2,
- 10,
- true,
- null,
- null,
- null,
- ImmutableSet.of("host1"),
- 5,
- true,
- true,
- 10,
- false
- );
- Assert.assertTrue(config.isKillUnusedSegmentsInAllDataSources());
- Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty());
- }
+ mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
+ );
- @Test
- public void testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegmentsInAllDatasources()
- {
- CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(
- 1,
- 1,
- 1,
- 1,
- null,
- false,
- 1,
- 2,
- 10,
- true,
- ImmutableSet.of("test1"),
- null,
- null,
- ImmutableSet.of("host1"),
- 5,
- true,
- true,
- 10,
- false
- );
- Assert.assertFalse(config.isKillUnusedSegmentsInAllDataSources());
- Assert.assertEquals(ImmutableSet.of("test1"), config.getSpecificDataSourcesToKillUnusedSegmentsIn());
+ Assert.fail("deserialization should fail.");
+ }
+ catch (JsonMappingException e) {
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
}
@Test
- public void testDecommissioningParametersBackwardCompatibility() throws Exception
+ public void testSerdeFailForInvalidDecommissioningMaxPercentOfSegmentsToConsiderToMove() throws Exception
{
- String jsonStr = "{\n"
- + " \"millisToWaitBeforeDeleting\": 1,\n"
- + " \"mergeBytesLimit\": 1,\n"
- + " \"mergeSegmentsLimit\" : 1,\n"
- + " \"maxSegmentsToMove\": 1,\n"
- + " \"replicantLifetime\": 1,\n"
- + " \"replicationThrottleLimit\": 1,\n"
- + " \"balancerComputeThreads\": 2, \n"
- + " \"emitBalancingStats\": true,\n"
- + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
- + " \"maxSegmentsInNodeLoadingQueue\": 1\n"
- + "}\n";
-
- CoordinatorDynamicConfig actual = mapper.readValue(
- mapper.writeValueAsString(
- mapper.readValue(
- jsonStr,
- CoordinatorDynamicConfig.class
- )
- ),
- CoordinatorDynamicConfig.class
- );
- ImmutableSet decommissioning = ImmutableSet.of();
- ImmutableSet whitelist = ImmutableSet.of("test1", "test2");
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 100,
- true, 1,
- 1,
- 2,
- true,
- whitelist,
- false,
- 1,
- decommissioning,
- 0,
- false,
- false,
- Integer.MAX_VALUE
- );
-
- actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 100,
- true,
- 1,
- 1,
- 2,
- true,
- whitelist,
- false,
- 1,
- ImmutableSet.of("host1"),
- 0,
- false,
- false,
- Integer.MAX_VALUE
- );
-
- actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 100,
- true,
- 1,
- 1,
- 2,
- true,
- whitelist,
- false,
- 1,
- ImmutableSet.of("host1"),
- 5,
- false,
- false,
- Integer.MAX_VALUE
- );
- }
+ try {
+ String jsonStr = "{\n"
+ + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": -100\n"
+ + "}\n";
- @Test
- public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception
- {
- String jsonStr = "{\n"
- + " \"millisToWaitBeforeDeleting\": 1,\n"
- + " \"mergeBytesLimit\": 1,\n"
- + " \"mergeSegmentsLimit\" : 1,\n"
- + " \"maxSegmentsToMove\": 1,\n"
- + " \"percentOfSegmentsToConsiderPerMove\": 1,\n"
- + " \"replicantLifetime\": 1,\n"
- + " \"replicationThrottleLimit\": 1,\n"
- + " \"balancerComputeThreads\": 2, \n"
- + " \"emitBalancingStats\": true,\n"
- + " \"killDataSourceWhitelist\": \"test1, test2\", \n"
- + " \"maxSegmentsInNodeLoadingQueue\": 1\n"
- + "}\n";
+ mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
+ );
- CoordinatorDynamicConfig actual = mapper.readValue(
- mapper.writeValueAsString(
- mapper.readValue(
- jsonStr,
- CoordinatorDynamicConfig.class
- )
- ),
- CoordinatorDynamicConfig.class
- );
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 1,
- true,
- 1,
- 1,
- 2,
- true,
- ImmutableSet.of("test1", "test2"),
- false,
- 1,
- ImmutableSet.of(),
- 0,
- false,
- false,
- Integer.MAX_VALUE
- );
- }
+ Assert.fail("deserialization should fail.");
+ }
+ catch (JsonMappingException e) {
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
- @Test
- public void testSerdeHandleInvalidPercentOfSegmentsToConsiderPerMove() throws Exception
- {
try {
String jsonStr = "{\n"
- + " \"percentOfSegmentsToConsiderPerMove\": 0\n"
+ + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 105\n"
+ "}\n";
mapper.readValue(
@@ -458,10 +151,14 @@ public void testSerdeHandleInvalidPercentOfSegmentsToConsiderPerMove() throws Ex
catch (JsonMappingException e) {
Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
}
+ }
+ @Test
+ public void testSerdeFailForInvalidMaxNonPrimaryReplicantsToLoad() throws Exception
+ {
try {
String jsonStr = "{\n"
- + " \"percentOfSegmentsToConsiderPerMove\": -100\n"
+ + " \"maxNonPrimaryReplicantsToLoad\": -1\n"
+ "}\n";
mapper.readValue(
@@ -479,10 +176,14 @@ public void testSerdeHandleInvalidPercentOfSegmentsToConsiderPerMove() throws Ex
catch (JsonMappingException e) {
Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
}
+ }
+ @Test
+ public void testSerdeFailForInvalidMaxPrimaryReplicantsToLoad() throws Exception
+ {
try {
String jsonStr = "{\n"
- + " \"percentOfSegmentsToConsiderPerMove\": 105\n"
+ + " \"maxSegmentsToLoadPerCoordinationCycle\": -1\n"
+ "}\n";
mapper.readValue(
@@ -503,23 +204,11 @@ public void testSerdeHandleInvalidPercentOfSegmentsToConsiderPerMove() throws Ex
}
@Test
- public void testHandleMissingPercentOfSegmentsToConsiderPerMove() throws Exception
+ public void testDynamicSerdeOfSpecificDataSourcesToKillUnusedSegmentsIn() throws Exception
{
- String jsonStr = "{\n"
- + " \"millisToWaitBeforeDeleting\": 1,\n"
- + " \"mergeBytesLimit\": 1,\n"
- + " \"mergeSegmentsLimit\" : 1,\n"
- + " \"maxSegmentsToMove\": 1,\n"
- + " \"replicantLifetime\": 1,\n"
- + " \"replicationThrottleLimit\": 1,\n"
- + " \"balancerComputeThreads\": 2, \n"
- + " \"emitBalancingStats\": true,\n"
- + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
- + " \"maxSegmentsInNodeLoadingQueue\": 1,\n"
- + " \"decommissioningNodes\": [\"host1\", \"host2\"],\n"
- + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n"
- + " \"pauseCoordination\": false\n"
- + "}\n";
+ String jsonStr = "{\n" +
+ " \"killDataSourceWhitelist\": \"test1, test2\"\n" +
+ "}\n";
CoordinatorDynamicConfig actual = mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
@@ -529,48 +218,41 @@ public void testHandleMissingPercentOfSegmentsToConsiderPerMove() throws Excepti
),
CoordinatorDynamicConfig.class
);
- ImmutableSet decommissioning = ImmutableSet.of("host1", "host2");
- ImmutableSet whitelist = ImmutableSet.of("test1", "test2");
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 100,
- true,
- 1,
- 1,
- 2,
- true,
- whitelist,
- false,
- 1,
- decommissioning,
- 9,
- false,
- false,
- Integer.MAX_VALUE
+ Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getSpecificDataSourcesToKillUnusedSegmentsIn());
+
+ jsonStr = "{\n" +
+ " \"killDataSourceWhitelist\": [\"test1\", \"test2\"]\n" +
+ "}\n";
+ actual = mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
+ );
+ Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getSpecificDataSourcesToKillUnusedSegmentsIn());
+
+ jsonStr = "{}";
+ actual = mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
);
+ Assert.assertEquals(ImmutableSet.of(), actual.getSpecificDataSourcesToKillUnusedSegmentsIn());
}
@Test
- public void testSerdeWithKillAllDataSources() throws Exception
+ public void testDynamicSerdeOfDataSourcesToNotKillStalePendingSegmentsIn() throws Exception
{
- String jsonStr = "{\n"
- + " \"millisToWaitBeforeDeleting\": 1,\n"
- + " \"mergeBytesLimit\": 1,\n"
- + " \"mergeSegmentsLimit\" : 1,\n"
- + " \"maxSegmentsToMove\": 1,\n"
- + " \"percentOfSegmentsToConsiderPerMove\": 1,\n"
- + " \"replicantLifetime\": 1,\n"
- + " \"replicationThrottleLimit\": 1,\n"
- + " \"balancerComputeThreads\": 2, \n"
- + " \"emitBalancingStats\": true,\n"
- + " \"killAllDataSources\": true,\n"
- + " \"maxSegmentsInNodeLoadingQueue\": 1\n"
- + "}\n";
-
+ String jsonStr = "{\n" +
+ " \"killPendingSegmentsSkipList\": \"test1, test2\"\n" +
+ "}\n";
CoordinatorDynamicConfig actual = mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
@@ -580,62 +262,41 @@ public void testSerdeWithKillAllDataSources() throws Exception
),
CoordinatorDynamicConfig.class
);
+ Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getDataSourcesToNotKillStalePendingSegmentsIn());
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 1,
- true,
- 1,
- 1,
- 2,
- true,
- ImmutableSet.of(),
- true,
- 1,
- ImmutableSet.of(),
- 0,
- false,
- false,
- Integer.MAX_VALUE
+ jsonStr = "{\n" +
+ " \"killPendingSegmentsSkipList\": [\"test1\", \"test2\"]\n" +
+ "}\n";
+ actual = mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
);
+ Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getDataSourcesToNotKillStalePendingSegmentsIn());
- // killAllDataSources is a config in versions 0.22.x and older and is no longer used.
- // This used to be an invalid config, but as of 0.23.0 the killAllDataSources flag no longer exsist,
- // so this is a valid config
- jsonStr = "{\n"
- + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n"
- + " \"killAllDataSources\": true,\n"
- + " \"percentOfSegmentsToConsiderPerMove\": 1\n"
- + "}\n";
+ jsonStr = "{}";
actual = mapper.readValue(
- jsonStr,
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
CoordinatorDynamicConfig.class
);
-
- Assert.assertFalse(actual.isKillUnusedSegmentsInAllDataSources());
- Assert.assertEquals(2, actual.getSpecificDataSourcesToKillUnusedSegmentsIn().size());
+ Assert.assertEquals(ImmutableSet.of(), actual.getDecommissioningNodes());
}
@Test
- public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Exception
+ public void testDynamicSerdeOfDecommissioningNodes() throws Exception
{
- String jsonStr = "{\n"
- + " \"millisToWaitBeforeDeleting\": 1,\n"
- + " \"mergeBytesLimit\": 1,\n"
- + " \"mergeSegmentsLimit\" : 1,\n"
- + " \"maxSegmentsToMove\": 1,\n"
- + " \"percentOfSegmentsToConsiderPerMove\": 1,\n"
- + " \"replicantLifetime\": 1,\n"
- + " \"replicationThrottleLimit\": 1,\n"
- + " \"balancerComputeThreads\": 2, \n"
- + " \"emitBalancingStats\": true,\n"
- + " \"killAllDataSources\": true\n"
- + "}\n";
-
+ String jsonStr = "{\n" +
+ " \"decommissioningNodes\": \"test1, test2\"\n" +
+ "}\n";
CoordinatorDynamicConfig actual = mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
@@ -645,205 +306,143 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti
),
CoordinatorDynamicConfig.class
);
+ Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getDecommissioningNodes());
- assertConfig(
- actual,
- 1,
- 1,
- 1,
- 1,
- 1,
- true,
- 1,
- 1,
- 2,
- true,
- ImmutableSet.of(),
- true,
- EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
- ImmutableSet.of(),
- 0,
- false,
- false,
- Integer.MAX_VALUE
+ jsonStr = "{\n" +
+ " \"decommissioningNodes\": [\"test1\", \"test2\"]\n" +
+ "}\n";
+ actual = mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
);
- }
+ Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getDecommissioningNodes());
- @Test
- public void testBuilderDefaults()
- {
- CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder().build();
- ImmutableSet emptyList = ImmutableSet.of();
- assertConfig(
- defaultConfig,
- 900000,
- 524288000,
- 100,
- 5,
- 100,
- true,
- 15,
- 10,
- 1,
- false,
- emptyList,
- true,
- EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
- emptyList,
- 70,
- false,
- false,
- Integer.MAX_VALUE
+ jsonStr = "{}";
+ actual = mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
);
+ Assert.assertEquals(ImmutableSet.of(), actual.getDataSourcesToNotKillStalePendingSegmentsIn());
}
@Test
- public void testBuilderWithDefaultSpecificDataSourcesToKillUnusedSegmentsInSpecified()
+ public void testSerdeHandleNullableFields() throws Exception
{
- CoordinatorDynamicConfig defaultConfig =
- CoordinatorDynamicConfig.builder()
- .withSpecificDataSourcesToKillUnusedSegmentsIn(ImmutableSet.of("DATASOURCE"))
- .build();
- CoordinatorDynamicConfig config = CoordinatorDynamicConfig.builder().build(defaultConfig);
- assertConfig(
- config,
- 900000,
- 524288000,
- 100,
- 5,
- 100,
- true,
- 15,
- 10,
- 1,
- false,
- ImmutableSet.of("DATASOURCE"),
- false,
- EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
- ImmutableSet.of(),
- 70,
- false,
- false,
- Integer.MAX_VALUE
+ String jsonStr = "{}";
+ CoordinatorDynamicConfig actual = mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
);
+ Assert.assertEquals(100.0, actual.getPercentOfSegmentsToConsiderPerMove(), 0.0);
+ Assert.assertEquals(100.0, actual.getMaxSegmentsInNodeLoadingQueue(), 0.0);
+ Assert.assertEquals(Integer.MAX_VALUE, actual.getMaxNonPrimaryReplicantsToLoad());
+ Assert.assertEquals(Integer.MAX_VALUE, actual.getMaxSegmentsToLoadPerCoordinationCycle());
+ Assert.assertEquals(false, actual.isUseRoundRobinSegmentAssignment());
}
@Test
- public void testUpdate()
+ public void testIsKillUnusedSegmentsInAllDataSources() throws Exception
{
- CoordinatorDynamicConfig current = CoordinatorDynamicConfig
- .builder()
- .withSpecificDataSourcesToKillUnusedSegmentsIn(ImmutableSet.of("x"))
- .build();
-
- Assert.assertEquals(
- current,
- new CoordinatorDynamicConfig.Builder(
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- ).build(current)
+ String jsonStr = "{\n" +
+ " \"killDataSourceWhitelist\": []\n" +
+ "}\n";
+ CoordinatorDynamicConfig actual = mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
);
- }
-
- @Test
- public void testSerdeHandleInvalidMaxNonPrimaryReplicantsToLoad() throws Exception
- {
- try {
- String jsonStr = "{\n"
- + " \"maxNonPrimaryReplicantsToLoad\": -1\n"
- + "}\n";
-
- mapper.readValue(
- mapper.writeValueAsString(
- mapper.readValue(
- jsonStr,
- CoordinatorDynamicConfig.class
- )
- ),
- CoordinatorDynamicConfig.class
- );
+ Assert.assertTrue(actual.isKillUnusedSegmentsInAllDataSources());
- Assert.fail("deserialization should fail.");
- }
- catch (JsonMappingException e) {
- Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
- }
+ jsonStr = "{\n" +
+ " \"killDataSourceWhitelist\": [\"test1\", \"test2\"]\n" +
+ "}\n";
+ actual = mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
+ );
+ Assert.assertFalse(actual.isKillUnusedSegmentsInAllDataSources());
}
+ /**
+ * This test is applicable for legacy configs that are removed between versions.
+ *
+ * If these unkown fields are included in the JSON payload for deserialization, they are ignored.
+ * @throws Exception
+ */
@Test
- public void testEqualsAndHashCodeSanity()
+ public void testUnknownFieldsHaveNoImpactOnDeserialization() throws Exception
{
- CoordinatorDynamicConfig config1 = CoordinatorDynamicConfig.builder().build();
- CoordinatorDynamicConfig config2 = CoordinatorDynamicConfig.builder().build();
- Assert.assertEquals(config1, config2);
- Assert.assertEquals(config1.hashCode(), config2.hashCode());
+ String jsonStr = "{\n" +
+ " \"dummyField\": []\n" +
+ "}\n";
+ CoordinatorDynamicConfig dummyActual = mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
+ );
+ jsonStr = "{}";
+ CoordinatorDynamicConfig actual = mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
+ );
+ Assert.assertEquals(actual, dummyActual);
}
- private void assertConfig(
- CoordinatorDynamicConfig config,
- long expectedLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
- long expectedMergeBytesLimit,
- int expectedMergeSegmentsLimit,
- int expectedMaxSegmentsToMove,
- int expectedPercentOfSegmentsToConsiderPerMove,
- boolean expectedUseBatchedSegmentSampler,
- int expectedReplicantLifetime,
- int expectedReplicationThrottleLimit,
- int expectedBalancerComputeThreads,
- boolean expectedEmitingBalancingStats,
- Set expectedSpecificDataSourcesToKillUnusedSegmentsIn,
- boolean expectedKillUnusedSegmentsInAllDataSources,
- int expectedMaxSegmentsInNodeLoadingQueue,
- Set decommissioningNodes,
- int decommissioningMaxPercentOfMaxSegmentsToMove,
- boolean pauseCoordination,
- boolean replicateAfterLoadTimeout,
- int maxNonPrimaryReplicantsToLoad
- )
+ /**
+ * Confirms that the builder will honor values in CoordinatorDynamicConfig when it is used as the Defaults supplier.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBuilderBuildWithCoordinatorDynamicConfigDefaults() throws Exception
{
- Assert.assertEquals(
- expectedLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
- config.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
- );
- Assert.assertEquals(expectedMergeBytesLimit, config.getMergeBytesLimit());
- Assert.assertEquals(expectedMergeSegmentsLimit, config.getMergeSegmentsLimit());
- Assert.assertEquals(expectedMaxSegmentsToMove, config.getMaxSegmentsToMove());
- Assert.assertEquals(expectedPercentOfSegmentsToConsiderPerMove, config.getPercentOfSegmentsToConsiderPerMove(), 0);
- Assert.assertEquals(expectedUseBatchedSegmentSampler, config.useBatchedSegmentSampler());
- Assert.assertEquals(expectedReplicantLifetime, config.getReplicantLifetime());
- Assert.assertEquals(expectedReplicationThrottleLimit, config.getReplicationThrottleLimit());
- Assert.assertEquals(expectedBalancerComputeThreads, config.getBalancerComputeThreads());
- Assert.assertEquals(expectedEmitingBalancingStats, config.emitBalancingStats());
- Assert.assertEquals(
- expectedSpecificDataSourcesToKillUnusedSegmentsIn,
- config.getSpecificDataSourcesToKillUnusedSegmentsIn()
- );
- Assert.assertEquals(expectedKillUnusedSegmentsInAllDataSources, config.isKillUnusedSegmentsInAllDataSources());
- Assert.assertEquals(expectedMaxSegmentsInNodeLoadingQueue, config.getMaxSegmentsInNodeLoadingQueue());
- Assert.assertEquals(decommissioningNodes, config.getDecommissioningNodes());
- Assert.assertEquals(
- decommissioningMaxPercentOfMaxSegmentsToMove,
- config.getDecommissioningMaxPercentOfMaxSegmentsToMove()
+ String jsonStr = "{\n" +
+ " \"decommissioningNodes\": \"test1, test2\"\n" +
+ "}\n";
+ CoordinatorDynamicConfig defaults = mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ CoordinatorDynamicConfig.class
+ )
+ ),
+ CoordinatorDynamicConfig.class
);
- Assert.assertEquals(pauseCoordination, config.getPauseCoordination());
- Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout());
- Assert.assertEquals(maxNonPrimaryReplicantsToLoad, config.getMaxNonPrimaryReplicantsToLoad());
+ CoordinatorDynamicConfig actual = CoordinatorDynamicConfig.builder().build(defaults);
+ Assert.assertEquals(ImmutableSet.of("test1", "test2"), actual.getDecommissioningNodes());
+
}
}
diff --git a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
index ca957309ff83..25a915a01aa7 100644
--- a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
+++ b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx
@@ -37,6 +37,7 @@ export interface CoordinatorDynamicConfig {
decommissioningNodes?: string[];
decommissioningMaxPercentOfMaxSegmentsToMove?: number;
pauseCoordination?: boolean;
+ maxSegmentsToLoadPerCoordinationCycle?: number;
maxNonPrimaryReplicantsToLoad?: number;
}
@@ -254,6 +255,23 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[
>
),
},
+ {
+ name: 'maxSegmentsToLoadPerCoordinationCycle',
+ type: 'number',
+ defaultValue: 2147483647,
+ info: (
+ <>
+ This it the maximum number of segments - both primary and non-primary replicants - that can
+ be loaded per Coordination run. The default is equivalent to there being no limit. This
+ differs from maxNonPrimaryReplicantsToLoad because it includes the count of primary
+ replicants that are loaded in the limit. An operator may want to use this configuration to
+ prevent the coordinator from spinning and loading many segments that are already loaded, but
+ appeared to be unavailable due to a temporary network issue causing some number of
+ Historical servers to have their segments go missing (or some other event that caused a
+ similar event).
+ >
+ ),
+ },
{
name: 'maxNonPrimaryReplicantsToLoad',
type: 'number',
@@ -264,7 +282,9 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[
this limit is hit, only primary replicants will be loaded for the remainder of the cycle.
Tuning this value lower can help reduce the delay in loading primary segments when the
cluster has a very large number of non-primary replicants to load (such as when a single
- historical drops out of the cluster leaving many under-replicated segments).
+ historical drops out of the cluster leaving many under-replicated segments). This
+ configuration has no effect if the value is greater than the value of
+ maxSegmentsToLoadPerCoordinationCycle
>
),
},