diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 90574780eabe..475a8f88cde3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -61,6 +61,7 @@ public class CoordinatorDynamicConfig private final int replicationThrottleLimit; private final int balancerComputeThreads; private final boolean emitBalancingStats; + private final boolean useRoundRobinSegmentAssignment; /** * List of specific data sources for which kill tasks are sent in {@link KillUnusedSegments}. @@ -134,7 +135,8 @@ public CoordinatorDynamicConfig( @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove, @JsonProperty("pauseCoordination") boolean pauseCoordination, @JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout, - @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad + @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad, + @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -195,6 +197,12 @@ public CoordinatorDynamicConfig( "maxNonPrimaryReplicantsToLoad must be greater than or equal to 0." ); this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad; + + if (useRoundRobinSegmentAssignment == null) { + this.useRoundRobinSegmentAssignment = Builder.DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT; + } else { + this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment; + } } private static Set parseJsonStringOrArray(Object jsonStringOrArray) @@ -316,6 +324,12 @@ public int getMaxSegmentsInNodeLoadingQueue() return maxSegmentsInNodeLoadingQueue; } + @JsonProperty + public boolean isUseRoundRobinSegmentAssignment() + { + return useRoundRobinSegmentAssignment; + } + /** * List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' * servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate @@ -509,6 +523,7 @@ public static class Builder private static final boolean DEFAULT_PAUSE_COORDINATION = false; private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false; private static final int DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE; + private static final boolean DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT = false; private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments; private Long mergeBytesLimit; @@ -528,6 +543,7 @@ public static class Builder private Boolean pauseCoordination; private Boolean replicateAfterLoadTimeout; private Integer maxNonPrimaryReplicantsToLoad; + private Boolean useRoundRobinSegmentAssignment; public Builder() { @@ -554,7 +570,8 @@ public Builder( @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove, @JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination, @JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout, - @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad + @JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad, + @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -576,6 +593,7 @@ public Builder( this.pauseCoordination = pauseCoordination; this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad; + this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment; } public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis) @@ -681,6 +699,12 @@ public Builder withMaxNonPrimaryReplicantsToLoad(int maxNonPrimaryReplicantsToLo return this; } + public Builder withUseRoundRobinSegmentAssignment(boolean useRoundRobinSegmentAssignment) + { + this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment; + return this; + } + public CoordinatorDynamicConfig build() { return new CoordinatorDynamicConfig( @@ -709,7 +733,8 @@ public CoordinatorDynamicConfig build() pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination, replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout, maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD - : maxNonPrimaryReplicantsToLoad + : maxNonPrimaryReplicantsToLoad, + useRoundRobinSegmentAssignment == null ? DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment ); } @@ -747,7 +772,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout, maxNonPrimaryReplicantsToLoad == null ? defaults.getMaxNonPrimaryReplicantsToLoad() - : maxNonPrimaryReplicantsToLoad + : maxNonPrimaryReplicantsToLoad, + useRoundRobinSegmentAssignment == null ? defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 50239db4ecac..38fcd9efa53e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -993,10 +993,19 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) stopPeonsForDisappearedServers(currentServers); + final RoundRobinServerSelector roundRobinServerSelector; + if (params.getCoordinatorDynamicConfig().isUseRoundRobinSegmentAssignment()) { + roundRobinServerSelector = new RoundRobinServerSelector(cluster); + log.info("Using round-robin segment assignment."); + } else { + roundRobinServerSelector = null; + } + return params.buildFromExisting() .withDruidCluster(cluster) .withLoadManagementPeons(loadManagementPeons) .withSegmentReplicantLookup(segmentReplicantLookup) + .withRoundRobinServerSelector(roundRobinServerSelector) .build(); } @@ -1044,7 +1053,8 @@ DruidCluster prepareCluster(DruidCoordinatorRuntimeParams params, List createUsedSegmentsSet(Iterable private final CoordinatorStats stats; private final BalancerStrategy balancerStrategy; private final Set broadcastDatasources; + private final @Nullable RoundRobinServerSelector roundRobinServerSelector; private DruidCoordinatorRuntimeParams( long startTimeNanos, @@ -80,6 +81,7 @@ private DruidCoordinatorRuntimeParams( @Nullable DataSourcesSnapshot dataSourcesSnapshot, Map loadManagementPeons, ReplicationThrottler replicationManager, + RoundRobinServerSelector roundRobinServerSelector, ServiceEmitter emitter, CoordinatorDynamicConfig coordinatorDynamicConfig, CoordinatorCompactionConfig coordinatorCompactionConfig, @@ -96,6 +98,7 @@ private DruidCoordinatorRuntimeParams( this.dataSourcesSnapshot = dataSourcesSnapshot; this.loadManagementPeons = loadManagementPeons; this.replicationManager = replicationManager; + this.roundRobinServerSelector = roundRobinServerSelector; this.emitter = emitter; this.coordinatorDynamicConfig = coordinatorDynamicConfig; this.coordinatorCompactionConfig = coordinatorCompactionConfig; @@ -150,6 +153,12 @@ public ReplicationThrottler getReplicationManager() return replicationManager; } + @Nullable + public RoundRobinServerSelector getRoundRobinServerSelector() + { + return roundRobinServerSelector; + } + public ServiceEmitter getEmitter() { return emitter; @@ -211,6 +220,7 @@ public Builder buildFromExisting() dataSourcesSnapshot, loadManagementPeons, replicationManager, + roundRobinServerSelector, emitter, coordinatorDynamicConfig, coordinatorCompactionConfig, @@ -231,6 +241,7 @@ public Builder buildFromExistingWithoutSegmentsMetadata() null, // dataSourcesSnapshot loadManagementPeons, replicationManager, + roundRobinServerSelector, emitter, coordinatorDynamicConfig, coordinatorCompactionConfig, @@ -250,6 +261,7 @@ public static class Builder private @Nullable DataSourcesSnapshot dataSourcesSnapshot; private final Map loadManagementPeons; private ReplicationThrottler replicationManager; + private @Nullable RoundRobinServerSelector roundRobinServerSelector; private ServiceEmitter emitter; private CoordinatorDynamicConfig coordinatorDynamicConfig; private CoordinatorCompactionConfig coordinatorCompactionConfig; @@ -267,6 +279,7 @@ private Builder() this.dataSourcesSnapshot = null; this.loadManagementPeons = new HashMap<>(); this.replicationManager = null; + this.roundRobinServerSelector = null; this.emitter = null; this.stats = new CoordinatorStats(); this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build(); @@ -283,6 +296,7 @@ private Builder() @Nullable DataSourcesSnapshot dataSourcesSnapshot, Map loadManagementPeons, ReplicationThrottler replicationManager, + RoundRobinServerSelector roundRobinServerSelector, ServiceEmitter emitter, CoordinatorDynamicConfig coordinatorDynamicConfig, CoordinatorCompactionConfig coordinatorCompactionConfig, @@ -299,6 +313,7 @@ private Builder() this.dataSourcesSnapshot = dataSourcesSnapshot; this.loadManagementPeons = loadManagementPeons; this.replicationManager = replicationManager; + this.roundRobinServerSelector = roundRobinServerSelector; this.emitter = emitter; this.coordinatorDynamicConfig = coordinatorDynamicConfig; this.coordinatorCompactionConfig = coordinatorCompactionConfig; @@ -319,6 +334,7 @@ public DruidCoordinatorRuntimeParams build() dataSourcesSnapshot, loadManagementPeons, replicationManager, + roundRobinServerSelector, emitter, coordinatorDynamicConfig, coordinatorCompactionConfig, @@ -401,6 +417,12 @@ public Builder withReplicationManager(ReplicationThrottler replicationManager) return this; } + public Builder withRoundRobinServerSelector(RoundRobinServerSelector roundRobinServerSelector) + { + this.roundRobinServerSelector = roundRobinServerSelector; + return this; + } + public Builder withEmitter(ServiceEmitter emitter) { this.emitter = emitter; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RoundRobinServerSelector.java b/server/src/main/java/org/apache/druid/server/coordinator/RoundRobinServerSelector.java new file mode 100644 index 000000000000..89459adcc4f3 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/RoundRobinServerSelector.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.concurrent.NotThreadSafe; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * Provides iterators over historicals for a given tier that can load a + * specified segment. + *

+ * Once a selector is initialized with a {@link DruidCluster}, an iterator + * returned by {@link #getServersInTierToLoadSegment(String, DataSegment)} + * iterates over the historicals in a tier in a round robin fashion. The next + * invocation of this method picks up where the last iterator had left off. + *

+ * This class is not thread-safe and must be used from a single thread. + */ +@NotThreadSafe +public class RoundRobinServerSelector +{ + private final Map tierToServers = new HashMap<>(); + + public RoundRobinServerSelector(DruidCluster cluster) + { + cluster.getHistoricals().forEach( + (tier, servers) -> tierToServers.put(tier, new CircularServerList(servers)) + ); + } + + /** + * Returns an iterator over the servers in this tier which are eligible to + * load the given segment. + */ + public Iterator getServersInTierToLoadSegment(String tier, DataSegment segment) + { + final CircularServerList iterator = tierToServers.get(tier); + if (iterator == null) { + return Collections.emptyIterator(); + } + + return new EligibleServerIterator(segment, iterator); + } + + /** + * Iterator over servers in a tier that are eligible to load a given segment. + */ + private static class EligibleServerIterator implements Iterator + { + final CircularServerList delegate; + final DataSegment segment; + + ServerHolder nextEligible; + int remainingIterations; + + EligibleServerIterator(DataSegment segment, CircularServerList delegate) + { + this.delegate = delegate; + this.segment = segment; + this.remainingIterations = delegate.servers.size(); + nextEligible = search(); + } + + @Override + public boolean hasNext() + { + return nextEligible != null; + } + + @Override + public ServerHolder next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + ServerHolder previous = nextEligible; + delegate.advanceCursor(); + nextEligible = search(); + return previous; + } + + ServerHolder search() + { + while (remainingIterations-- > 0) { + ServerHolder nextServer = delegate.peekNext(); + if (nextServer.canLoadSegment(segment)) { + return nextServer; + } else { + delegate.advanceCursor(); + } + } + + return null; + } + } + + /** + * Circular list over all servers in a tier. A single instance of this is + * maintained for each tier. + */ + private static class CircularServerList + { + final List servers = new ArrayList<>(); + int currentPosition; + + CircularServerList(Set servers) + { + this.servers.addAll(servers); + //Collections.shuffle(this.servers); + } + + void advanceCursor() + { + if (++currentPosition >= servers.size()) { + currentPosition = 0; + } + } + + ServerHolder peekNext() + { + int nextPosition = currentPosition < servers.size() ? currentPosition : 0; + return servers.get(nextPosition); + } + } + +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java index 43fdaaef1d1d..057dfb118c80 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java @@ -34,6 +34,7 @@ public class ServerHolder implements Comparable private final ImmutableDruidServer server; private final LoadQueuePeon peon; private final boolean isDecommissioning; + private final int maxSegmentsInLoadQueue; public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon) { @@ -41,10 +42,21 @@ public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon) } public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon, boolean isDecommissioning) + { + this(server, peon, isDecommissioning, 0); + } + + public ServerHolder( + ImmutableDruidServer server, + LoadQueuePeon peon, + boolean isDecommissioning, + int maxSegmentsInNodeLoadingQueue + ) { this.server = server; this.peon = peon; this.isDecommissioning = isDecommissioning; + this.maxSegmentsInLoadQueue = maxSegmentsInNodeLoadingQueue; } public ImmutableDruidServer getServer() @@ -138,6 +150,27 @@ public boolean isServingSegment(SegmentId segmentId) return server.getSegment(segmentId) != null; } + /** + * Checks if the server can load the given segment. + *

+ * A load is possible only if the server meets all of the following criteria: + *

    + *
  • is not being decommissioned
  • + *
  • is not already serving the segment
  • + *
  • is not performing any other action on the segment
  • + *
  • has not already exceeded the load queue limit in this run
  • + *
  • has available disk space
  • + *
+ */ + public boolean canLoadSegment(DataSegment segment) + { + return !isDecommissioning + && !isServingSegment(segment.getId()) + && !isLoadingSegment(segment) + && (maxSegmentsInLoadQueue == 0 || maxSegmentsInLoadQueue > peon.getNumberOfSegmentsInQueue()) + && getAvailableSize() >= segment.getSize(); + } + @Override public int compareTo(ServerHolder serverHolder) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java index 3b20cf091346..933b63e47263 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java @@ -44,7 +44,6 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; -import java.util.Objects; import java.util.TreeSet; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -178,7 +177,7 @@ private void assign( targetReplicants.getOrDefault(tier, 0), numAssigned, // note that the currentReplicantsInTier is the just-assigned primary replica. params, - createLoadQueueSizeLimitingPredicate(params).and(holder -> !holder.equals(primaryHolderToLoad)), + createLoadQueueSizeLimitingPredicate(segment).and(holder -> !holder.equals(primaryHolderToLoad)), segment ); @@ -193,15 +192,10 @@ private void assign( } private static Predicate createLoadQueueSizeLimitingPredicate( - final DruidCoordinatorRuntimeParams params + final DataSegment segment ) { - final int maxSegmentsInNodeLoadingQueue = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); - if (maxSegmentsInNodeLoadingQueue <= 0) { - return Objects::nonNull; - } else { - return s -> (s != null && s.getNumberOfSegmentsInQueue() < maxSegmentsInNodeLoadingQueue); - } + return server -> server != null && server.canLoadSegment(segment); } private static List getFilteredHolders( @@ -219,6 +213,21 @@ private static List getFilteredHolders( return queue.stream().filter(isActive.and(predicate)).collect(Collectors.toList()); } + private Iterator getRoundRobinIterator( + DruidCoordinatorRuntimeParams params, + String tier, + DataSegment segment + ) + { + if (params.getRoundRobinServerSelector() == null + || !params.getCoordinatorDynamicConfig().isUseRoundRobinSegmentAssignment()) { + return null; + } + + return params.getRoundRobinServerSelector() + .getServersInTierToLoadSegment(tier, segment); + } + /** * Iterates through each tier and find the respective segment homes; with the found segment homes, selects the one * with the highest priority to be the holder for the primary replica. @@ -230,6 +239,8 @@ private ServerHolder assignPrimary( ) { ServerHolder topCandidate = null; + final boolean useRoundRobinAssignment = params.getCoordinatorDynamicConfig() + .isUseRoundRobinSegmentAssignment(); for (final Object2IntMap.Entry entry : targetReplicants.object2IntEntrySet()) { final int targetReplicantsInTier = entry.getIntValue(); // sanity check: target number of replicants should be more than zero. @@ -248,7 +259,7 @@ private ServerHolder assignPrimary( final List holders = getFilteredHolders( tier, params.getDruidCluster(), - createLoadQueueSizeLimitingPredicate(params) + createLoadQueueSizeLimitingPredicate(segment) ); // no holders satisfy the predicate if (holders.isEmpty()) { @@ -256,12 +267,20 @@ private ServerHolder assignPrimary( continue; } - final ServerHolder candidate = params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders); + final ServerHolder candidate; + if (useRoundRobinAssignment) { + Iterator roundRobinIterator = getRoundRobinIterator(params, tier, segment); + candidate = roundRobinIterator.hasNext() ? roundRobinIterator.next() : null; + } else { + candidate = params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders); + if (candidate != null) { + strategyCache.put(tier, candidate); + } + } + if (candidate == null) { log.warn(noAvailability); } else { - // cache the result for later use. - strategyCache.put(tier, candidate); if (topCandidate == null || candidate.getServer().getPriority() > topCandidate.getServer().getPriority()) { topCandidate = candidate; @@ -307,7 +326,7 @@ private void assignReplicas( entry.getIntValue(), params.getSegmentReplicantLookup().getTotalReplicants(segment.getId(), tier), params, - createLoadQueueSizeLimitingPredicate(params), + createLoadQueueSizeLimitingPredicate(segment), segment ); stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned); @@ -347,6 +366,7 @@ private int assignReplicasForTier( return 0; } + final Iterator roundRobinServerIterator = getRoundRobinIterator(params, tier, segment); final ReplicationThrottler throttler = params.getReplicationManager(); for (int numAssigned = 0; numAssigned < numToAssign; numAssigned++) { if (!throttler.canCreateReplicant(tier)) { @@ -355,10 +375,15 @@ private int assignReplicasForTier( } // Retrieves from cache if available - ServerHolder holder = strategyCache.remove(tier); - // Does strategy call if not in cache - if (holder == null) { + final ServerHolder holder; + if (strategyCache.containsKey(tier)) { + // found in cache + holder = strategyCache.remove(tier); + } else if (roundRobinServerIterator == null) { + // Call balancer strategy holder = params.getBalancerStrategy().findNewSegmentHomeReplicator(segment, holders); + } else { + holder = roundRobinServerIterator.hasNext() ? roundRobinServerIterator.next() : null; } if (holder == null) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java new file mode 100644 index 000000000000..bcef7ca8a4da --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/RoundRobinServerSelectorTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.Iterator; + +public class RoundRobinServerSelectorTest +{ + private static final String TIER = "normal"; + + private final DataSegment segment = new DataSegment( + "wiki", + Intervals.of("2022-01-01/2022-01-02"), + "1", + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + new NumberedShardSpec(1, 10), + IndexIO.CURRENT_VERSION_ID, + 100 + ); + + @Test + public void testSingleIterator() + { + final ServerHolder serverXL = createHistorical("serverXL", 1000); + final ServerHolder serverL = createHistorical("serverXL", 900); + final ServerHolder serverM = createHistorical("serverXL", 800); + + // This server is too small to house the segment + final ServerHolder serverXS = createHistorical("serverXL", 10); + + DruidCluster cluster = DruidClusterBuilder + .newBuilder() + .addTier(TIER, serverXL, serverM, serverXS, serverL) + .build(); + final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster); + + // Verify that only eligible servers are returned in order of available size + Iterator pickedServers = selector.getServersInTierToLoadSegment(TIER, segment); + + Assert.assertTrue(pickedServers.hasNext()); + Assert.assertEquals(serverXL, pickedServers.next()); + Assert.assertEquals(serverL, pickedServers.next()); + Assert.assertEquals(serverM, pickedServers.next()); + + Assert.assertFalse(pickedServers.hasNext()); + } + + @Test + public void testNextIteratorContinuesFromSamePosition() + { + final ServerHolder serverXL = createHistorical("serverXL", 1000); + final ServerHolder serverL = createHistorical("serverXL", 900); + final ServerHolder serverM = createHistorical("serverXL", 800); + + // This server is too small to house the segment + final ServerHolder serverXS = createHistorical("serverXL", 10); + + DruidCluster cluster = DruidClusterBuilder + .newBuilder() + .addTier(TIER, serverXL, serverM, serverXS, serverL) + .build(); + final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster); + + // Verify that only eligible servers are returned in order of available size + Iterator pickedServers = selector.getServersInTierToLoadSegment(TIER, segment); + Assert.assertTrue(pickedServers.hasNext()); + Assert.assertEquals(serverXL, pickedServers.next()); + + // Second iterator starts from previous position but resets allowed number of iterations + pickedServers = selector.getServersInTierToLoadSegment(TIER, segment); + Assert.assertTrue(pickedServers.hasNext()); + + Assert.assertEquals(serverL, pickedServers.next()); + Assert.assertEquals(serverM, pickedServers.next()); + Assert.assertEquals(serverXL, pickedServers.next()); + + Assert.assertFalse(pickedServers.hasNext()); + } + + @Test + public void testNoServersInTier() + { + DruidCluster cluster = DruidClusterBuilder + .newBuilder() + .addTier(TIER) + .build(); + final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster); + + Iterator eligibleServers = selector.getServersInTierToLoadSegment(TIER, segment); + Assert.assertFalse(eligibleServers.hasNext()); + } + + @Test + public void testNoEligibleServerInTier() + { + DruidCluster cluster = DruidClusterBuilder + .newBuilder() + .addTier( + TIER, + createHistorical("server1", 40), + createHistorical("server2", 30), + createHistorical("server3", 10), + createHistorical("server4", 20) + ) + .build(); + final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster); + + // Verify that only eligible servers are returned in order of available size + Iterator eligibleServers = selector.getServersInTierToLoadSegment(TIER, segment); + Assert.assertFalse(eligibleServers.hasNext()); + } + + private ServerHolder createHistorical(String name, long size) + { + return new ServerHolder( + new DruidServer(name, name, null, size, ServerType.HISTORICAL, TIER, 1).toImmutableDruidServer(), + new LoadQueuePeonTester() + ); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java index 99a975caff48..b039834e7b3d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java @@ -48,6 +48,7 @@ import org.apache.druid.server.coordinator.LoadQueuePeon; import org.apache.druid.server.coordinator.LoadQueuePeonTester; import org.apache.druid.server.coordinator.ReplicationThrottler; +import org.apache.druid.server.coordinator.RoundRobinServerSelector; import org.apache.druid.server.coordinator.SegmentReplicantLookup; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.cost.ClusterCostCache; @@ -61,8 +62,11 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -73,6 +77,7 @@ /** */ +@RunWith(Parameterized.class) public class LoadRuleTest { private static final Logger log = new Logger(LoadRuleTest.class); @@ -95,8 +100,20 @@ public class LoadRuleTest private CachingCostBalancerStrategy cachingCostBalancerStrategy; + private final boolean useRoundRobinAssignment; private BalancerStrategy mockBalancerStrategy; + @Parameterized.Parameters(name = "useRoundRobin = {0}") + public static List getTestParams() + { + return Arrays.asList(true, false); + } + + public LoadRuleTest(boolean useRoundRobinAssignment) + { + this.useRoundRobinAssignment = useRoundRobinAssignment; + } + @Before public void setUp() { @@ -106,7 +123,6 @@ public void setUp() exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); - cachingCostBalancerStrategy = new CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec); mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class); @@ -138,9 +154,11 @@ public void testLoad() throttler.registerReplicantCreation(DruidServer.DEFAULT_TIER, segment.getId(), "hostNorm"); EasyMock.expectLastCall().once(); - EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .times(3); + if (!useRoundRobinAssignment) { + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .times(3); + } EasyMock.replay(throttler, mockPeon, mockBalancerStrategy); @@ -189,6 +207,13 @@ private DruidCoordinatorRuntimeParams makeCoordinatorRuntimeParams( .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false)) .withReplicationManager(throttler) + .withDynamicConfigs( + CoordinatorDynamicConfig + .builder() + .withUseRoundRobinSegmentAssignment(useRoundRobinAssignment) + .build() + ) + .withRoundRobinServerSelector(useRoundRobinAssignment ? new RoundRobinServerSelector(druidCluster) : null) .withBalancerStrategy(mockBalancerStrategy) .withUsedSegmentsInTest(usedSegments) .build(); @@ -440,9 +465,11 @@ public void testLoadPriority() mockPeon2.loadSegment(EasyMock.anyObject(), EasyMock.isNull()); EasyMock.expectLastCall().once(); - EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .times(2); + if (!useRoundRobinAssignment) { + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .times(2); + } EasyMock.replay(throttler, mockPeon1, mockPeon2, mockBalancerStrategy); @@ -626,6 +653,7 @@ public void testDropWithNonExistentTier() @Test public void testMaxLoadingQueueSize() { + final int maxSegmentsInLoadQueue = 2; EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) .andDelegateTo(balancerStrategy) .times(2); @@ -643,7 +671,9 @@ public void testMaxLoadingQueueSize() new ServerHolder( new DruidServer("serverHot", "hostHot", null, 1000, ServerType.HISTORICAL, "hot", 0) .toImmutableDruidServer(), - peon + peon, + false, + maxSegmentsInLoadQueue ) ) .build(); @@ -659,8 +689,11 @@ public void testMaxLoadingQueueSize() .withReplicationManager(throttler) .withBalancerStrategy(mockBalancerStrategy) .withUsedSegmentsInTest(dataSegment1, dataSegment2, dataSegment3) - .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsInNodeLoadingQueue(2).build()) - .build(); + .withDynamicConfigs( + CoordinatorDynamicConfig.builder() + .withMaxSegmentsInNodeLoadingQueue(maxSegmentsInLoadQueue) + .build() + ).build(); CoordinatorStats stats1 = rule.run(null, params, dataSegment1); CoordinatorStats stats2 = rule.run(null, params, dataSegment2); @@ -687,9 +720,11 @@ public void testLoadDecommissioning() final DataSegment segment = createDataSegment("foo"); - EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) - .andDelegateTo(balancerStrategy) - .times(1); + if (!useRoundRobinAssignment) { + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .times(1); + } EasyMock.replay(mockPeon1, mockPeon2, mockBalancerStrategy); @@ -732,12 +767,14 @@ public void testLoadReplicaDuringDecommissioning() ServerHolder holder3 = createServerHolder("tier2", mockPeon3, false); ServerHolder holder4 = createServerHolder("tier2", mockPeon4, false); - EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment, ImmutableList.of(holder2))) - .andReturn(holder2); - EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment, ImmutableList.of(holder4, holder3))) - .andReturn(holder3); - EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment, ImmutableList.of(holder4))) - .andReturn(holder4); + if (!useRoundRobinAssignment) { + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment, ImmutableList.of(holder2))) + .andReturn(holder2); + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment, ImmutableList.of(holder4, holder3))) + .andReturn(holder3); + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(segment, ImmutableList.of(holder4))) + .andReturn(holder4); + } EasyMock.replay(throttler, mockPeon1, mockPeon2, mockPeon3, mockPeon4, mockBalancerStrategy); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java index c4785fd0bc89..f79a9bf48041 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java @@ -22,6 +22,7 @@ import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.timeline.DataSegment; import java.util.List; @@ -108,5 +109,10 @@ interface ClusterState * Adds the specified server to the cluster. */ void addServer(DruidServer server); + + /** + * Publishes the given segments to the cluster. + */ + void addSegments(List segments); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java index b3980630d741..d4e8b7760de0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java @@ -135,6 +135,12 @@ public void addServer(DruidServer server) sim.cluster().addServer(server); } + @Override + public void addSegments(List segments) + { + sim.cluster().addSegments(segments); + } + @Override public double getLoadPercentage(String datasource) { @@ -260,6 +266,7 @@ static DruidServer createHistorical(int uniqueIdInTier, String tier, long server static class DS { static final String WIKI = "wiki"; + static final String KOALA = "koala"; } static class Tier @@ -282,7 +289,7 @@ static class Segments { /** * Segments of datasource {@link DS#WIKI}, size 500 MB each, - * spanning 1 day containing 10 partitions. + * spanning 1 day containing 10 partitions each. */ static final List WIKI_10X1D = CreateDataSegments.ofDatasource(DS.WIKI) @@ -293,7 +300,7 @@ static class Segments /** * Segments of datasource {@link DS#WIKI}, size 500 MB each, - * spanning 100 days containing 10 partitions. + * spanning 100 days containing 10 partitions each. */ static final List WIKI_10X100D = CreateDataSegments.ofDatasource(DS.WIKI) @@ -301,6 +308,17 @@ static class Segments .startingAt("2022-01-01") .withNumPartitions(10) .eachOfSizeInMb(500); + + /** + * Segments of datasource {@link DS#KOALA}, size 500 MB each, + * spanning 100 days containing 100 partitions each. + */ + static final List KOALA_100X100D = + CreateDataSegments.ofDatasource(DS.KOALA) + .forIntervals(100, Granularities.DAY) + .startingAt("2022-01-01") + .withNumPartitions(100) + .eachOfSizeInMb(500); } /** diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index 2e85f8184525..28b84f62e9da 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -176,26 +176,21 @@ public CoordinatorSimulation build() final TestServerInventoryView serverInventoryView = new TestServerInventoryView(); servers.forEach(serverInventoryView::addServer); - final TestSegmentsMetadataManager segmentManager = new TestSegmentsMetadataManager(); - if (segments != null) { - segments.forEach(segmentManager::addSegment); - } - - final TestMetadataRuleManager ruleManager = new TestMetadataRuleManager(); - datasourceRules.forEach( - (datasource, rules) -> - ruleManager.overrideRule(datasource, rules, null) - ); - final Environment env = new Environment( serverInventoryView, - segmentManager, - ruleManager, dynamicConfig, loadImmediately, autoSyncInventory ); + if (segments != null) { + segments.forEach(env.segmentManager::addSegment); + } + datasourceRules.forEach( + (datasource, rules) -> + env.ruleManager.overrideRule(datasource, rules, null) + ); + // Build the coordinator final DruidCoordinator coordinator = new DruidCoordinator( env.coordinatorConfig, @@ -375,6 +370,14 @@ public void addServer(DruidServer server) env.inventory.addServer(server); } + @Override + public void addSegments(List segments) + { + if (segments != null) { + segments.forEach(env.segmentManager::addSegment); + } + } + private void verifySimulationRunning() { if (!running.get()) { @@ -409,8 +412,8 @@ private static class Environment = new TestDruidLeaderSelector(); private final ExecutorFactory executorFactory; - private final TestSegmentsMetadataManager segmentManager; - private final TestMetadataRuleManager ruleManager; + private final TestSegmentsMetadataManager segmentManager = new TestSegmentsMetadataManager(); + private final TestMetadataRuleManager ruleManager = new TestMetadataRuleManager(); private final LoadQueueTaskMaster loadQueueTaskMaster; @@ -437,16 +440,12 @@ private static class Environment private Environment( TestServerInventoryView clusterInventory, - TestSegmentsMetadataManager segmentManager, - TestMetadataRuleManager ruleManager, CoordinatorDynamicConfig dynamicConfig, boolean loadImmediately, boolean autoSyncInventory ) { this.inventory = clusterInventory; - this.segmentManager = segmentManager; - this.ruleManager = ruleManager; this.loadImmediately = loadImmediately; this.autoSyncInventory = autoSyncInventory; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/RoundRobinAssignmentTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/RoundRobinAssignmentTest.java new file mode 100644 index 000000000000..cec100691a44 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/RoundRobinAssignmentTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.timeline.DataSegment; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class RoundRobinAssignmentTest extends CoordinatorSimulationBaseTest +{ + private static final long SIZE_1TB = 1_000_000; + + private List historicals; + + @Override + public void setUp() + { + historicals = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + historicals.add(createHistorical(i, Tier.T1, SIZE_1TB)); + } + } + + @Test + public void testSegmentsAreAssignedUniformly() + { + CoordinatorDynamicConfig config = + CoordinatorDynamicConfig.builder() + .withMaxSegmentsToMove(0) + .withMaxSegmentsInNodeLoadingQueue(0) + .withReplicationThrottleLimit(20000) + .withUseRoundRobinSegmentAssignment(true) + .build(); + + CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withDynamicConfig(config) + .withBalancer("random") + .withRules(DS.WIKI, Load.on(Tier.T1, 2).forever()) + .withServers(historicals) + .withSegments(Segments.WIKI_10X100D) + .build(); + startSimulation(sim); + + // Run 1: all segments are assigned and loaded + runCoordinatorCycle(); + loadQueuedSegments(); + verifyValue(Metric.ASSIGNED_COUNT, 2000L); + + for (DruidServer historical : historicals) { + Assert.assertEquals(200, historical.getTotalSegments()); + } + } + + @Test + public void testMultipleDatasourceSegmentsAreAssignedUniformly() + { + final List segments = new ArrayList<>(Segments.WIKI_10X100D); + segments.addAll(Segments.KOALA_100X100D); + + CoordinatorDynamicConfig config = + CoordinatorDynamicConfig.builder() + .withMaxSegmentsToMove(0) + .withMaxSegmentsInNodeLoadingQueue(0) + .withReplicationThrottleLimit(20000) + .withUseRoundRobinSegmentAssignment(true) + .build(); + + CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withDynamicConfig(config) + .withBalancer("random") + .withRules(DS.WIKI, Load.on(Tier.T1, 3).forever()) + .withRules(DS.KOALA, Load.on(Tier.T1, 1).forever()) + .withServers(historicals) + .withSegments(segments) + .build(); + startSimulation(sim); + + // Run 1: all segments are assigned and loaded + runCoordinatorCycle(); + loadQueuedSegments(); + verifyValue(Metric.ASSIGNED_COUNT, 13000L); + + for (DruidServer historical : historicals) { + Assert.assertEquals(1300, historical.getTotalSegments()); + } + } + +} diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 73f7cd29e4fd..72489036198d 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -228,7 +228,8 @@ public void testSerde() throws Exception @Test public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources() { - CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1, + CoordinatorDynamicConfig config = new CoordinatorDynamicConfig( + 1, 1, 1, 1, @@ -245,7 +246,9 @@ public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources() 5, true, true, - 10); + 10, + false + ); Assert.assertTrue(config.isKillUnusedSegmentsInAllDataSources()); Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty()); } @@ -253,24 +256,27 @@ public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources() @Test public void testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegmentsInAllDatasources() { - CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1, - 1, - 1, - 1, - null, - false, - 1, - 2, - 10, - true, - ImmutableSet.of("test1"), - null, - null, - ImmutableSet.of("host1"), - 5, - true, - true, - 10); + CoordinatorDynamicConfig config = new CoordinatorDynamicConfig( + 1, + 1, + 1, + 1, + null, + false, + 1, + 2, + 10, + true, + ImmutableSet.of("test1"), + null, + null, + ImmutableSet.of("host1"), + 5, + true, + true, + 10, + false + ); Assert.assertFalse(config.isKillUnusedSegmentsInAllDataSources()); Assert.assertEquals(ImmutableSet.of("test1"), config.getSpecificDataSourcesToKillUnusedSegmentsIn()); } @@ -732,6 +738,7 @@ public void testUpdate() null, null, null, + null, null ).build(current) ); diff --git a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx index 6683a721d14c..1d148e6b1879 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config/coordinator-dynamic-config.tsx @@ -207,6 +207,18 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ ), }, + { + name: 'useRoundRobinSegmentAssignment', + type: 'boolean', + defaultValue: false, + info: ( + <> + Boolean flag for whether segments should be assigned to historicals in a round-robin + fashion. If enabled, this can speed up initial segment loading leaving segment balancing to + make cost-based decisions and find the optimal location of a segment. + + ), + }, { name: 'percentOfSegmentsToConsiderPerMove', type: 'number',