diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 858c2a2dc24e..712e1b732755 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -780,8 +780,8 @@ A sample Coordinator dynamic config JSON object is shown below: "replicationThrottleLimit": 10, "emitBalancingStats": false, "killDataSourceWhitelist": ["wikipedia", "testDatasource"], - "historicalNodesInMaintenance": ["localhost:8182", "localhost:8282"], - "nodesInMaintenancePriority": 7 + "decommissioningNodes": ["localhost:8182", "localhost:8282"], + "decommissioningMaxPercentOfMaxSegmentsToMove": 70 } ``` @@ -795,14 +795,14 @@ Issuing a GET request at the same URL will return the spec that is currently in |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| -|`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segment starts to get stuck.|1| +|`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| |`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false| |`killDataSourceWhitelist`|List of dataSources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated dataSources or a JSON array.|none| |`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false| |`killPendingSegmentsSkipList`|List of dataSources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated dataSources or a JSON array.|none| -|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" processes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of processes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0| -|`historicalNodesInMaintenance`| List of Historical nodes in maintenance mode. Coordinator doesn't assign new segments on those nodes and moves segments from the nodes according to a specified priority.|none| -|`nodesInMaintenancePriority`| Priority of segments from servers in maintenance. Coordinator takes ceil(maxSegmentsToMove * (priority / 10)) from servers in maitenance during balancing phase, i.e.:
0 - no segments from servers in maintenance will be processed during balancing
5 - 50% segments from servers in maintenance
10 - 100% segments from servers in maintenance
By leveraging the priority an operator can prevent general nodes from overload or decrease maitenance time instead.|7| +|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0| +|`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none| +|`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70| To view the audit history of Coordinator dynamic config issue a GET request to the URL - diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index de034af5dfc2..14bf3395add3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -28,6 +28,8 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; import java.util.Collection; import java.util.HashSet; import java.util.Objects; @@ -56,8 +58,8 @@ public class CoordinatorDynamicConfig private final boolean emitBalancingStats; private final boolean killAllDataSources; private final Set killableDataSources; - private final Set historicalNodesInMaintenance; - private final int nodesInMaintenancePriority; + private final Set decommissioningNodes; + private final int decommissioningMaxPercentOfMaxSegmentsToMove; // The pending segments of the dataSources in this list are not killed. private final Set protectedPendingSegmentDatasources; @@ -88,8 +90,8 @@ public CoordinatorDynamicConfig( @JsonProperty("killAllDataSources") boolean killAllDataSources, @JsonProperty("killPendingSegmentsSkipList") Object protectedPendingSegmentDatasources, @JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue, - @JsonProperty("historicalNodesInMaintenance") Object historicalNodesInMaintenance, - @JsonProperty("nodesInMaintenancePriority") int nodesInMaintenancePriority + @JsonProperty("decommissioningNodes") Object decommissioningNodes, + @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove ) { this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; @@ -104,12 +106,12 @@ public CoordinatorDynamicConfig( this.killableDataSources = parseJsonStringOrArray(killableDataSources); this.protectedPendingSegmentDatasources = parseJsonStringOrArray(protectedPendingSegmentDatasources); this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; - this.historicalNodesInMaintenance = parseJsonStringOrArray(historicalNodesInMaintenance); + this.decommissioningNodes = parseJsonStringOrArray(decommissioningNodes); Preconditions.checkArgument( - nodesInMaintenancePriority >= 0 && nodesInMaintenancePriority <= 10, - "nodesInMaintenancePriority should be in range [0, 10]" + decommissioningMaxPercentOfMaxSegmentsToMove >= 0 && decommissioningMaxPercentOfMaxSegmentsToMove <= 100, + "decommissioningMaxPercentOfMaxSegmentsToMove should be in range [0, 100]" ); - this.nodesInMaintenancePriority = nodesInMaintenancePriority; + this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove; if (this.killAllDataSources && !this.killableDataSources.isEmpty()) { throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist"); @@ -231,32 +233,37 @@ public int getMaxSegmentsInNodeLoadingQueue() } /** - * Historical nodes list in maintenance mode. Coordinator doesn't assign new segments on those nodes and moves - * segments from those nodes according to a specified priority. + * List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, + * and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by + * {@link CoordinatorDynamicConfig#getDecommissioningMaxPercentOfMaxSegmentsToMove}. * * @return list of host:port entries */ @JsonProperty - public Set getHistoricalNodesInMaintenance() + public Set getDecommissioningNodes() { - return historicalNodesInMaintenance; + return decommissioningNodes; + } /** - * Priority of segments from servers in maintenance. Coordinator takes ceil(maxSegmentsToMove * (priority / 10)) - * from servers in maitenance during balancing phase, i.e.: - * 0 - no segments from servers in maintenance will be processed during balancing - * 5 - 50% segments from servers in maintenance - * 10 - 100% segments from servers in maintenance - * By leveraging the priority an operator can prevent general nodes from overload or decrease maitenance time - * instead. + * The percent of {@link CoordinatorDynamicConfig#getMaxSegmentsToMove()} that determines the maximum number of + * segments that may be moved away from 'decommissioning' servers (specified by + * {@link CoordinatorDynamicConfig#getDecommissioningNodes()}) to non-decommissioning servers during one Coordinator + * balancer run. If this value is 0, segments will neither be moved from or to 'decommissioning' servers, effectively + * putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. + * Decommissioning can also become stalled if there are no available active servers to place the segments. By + * adjusting this value, an operator can prevent active servers from overload by prioritizing balancing, or + * decrease decommissioning time instead. * - * @return number in range [0, 10] + * @return number in range [0, 100] */ + @Min(0) + @Max(100) @JsonProperty - public int getNodesInMaintenancePriority() + public int getDecommissioningMaxPercentOfMaxSegmentsToMove() { - return nodesInMaintenancePriority; + return decommissioningMaxPercentOfMaxSegmentsToMove; } @Override @@ -275,8 +282,8 @@ public String toString() ", killDataSourceWhitelist=" + killableDataSources + ", protectedPendingSegmentDatasources=" + protectedPendingSegmentDatasources + ", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue + - ", historicalNodesInMaintenance=" + historicalNodesInMaintenance + - ", nodesInMaintenancePriority=" + nodesInMaintenancePriority + + ", decommissioningNodes=" + decommissioningNodes + + ", decommissioningMaxPercentOfMaxSegmentsToMove=" + decommissioningMaxPercentOfMaxSegmentsToMove + '}'; } @@ -328,10 +335,10 @@ public boolean equals(Object o) if (!Objects.equals(protectedPendingSegmentDatasources, that.protectedPendingSegmentDatasources)) { return false; } - if (!Objects.equals(historicalNodesInMaintenance, that.historicalNodesInMaintenance)) { + if (!Objects.equals(decommissioningNodes, that.decommissioningNodes)) { return false; } - return nodesInMaintenancePriority == that.nodesInMaintenancePriority; + return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove; } @Override @@ -350,8 +357,8 @@ public int hashCode() maxSegmentsInNodeLoadingQueue, killableDataSources, protectedPendingSegmentDatasources, - historicalNodesInMaintenance, - nodesInMaintenancePriority + decommissioningNodes, + decommissioningMaxPercentOfMaxSegmentsToMove ); } @@ -372,7 +379,7 @@ public static class Builder private static final boolean DEFAULT_EMIT_BALANCING_STATS = false; private static final boolean DEFAULT_KILL_ALL_DATA_SOURCES = false; private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0; - private static final int DEFAULT_MAINTENANCE_MODE_SEGMENTS_PRIORITY = 7; + private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70; private Long millisToWaitBeforeDeleting; private Long mergeBytesLimit; @@ -386,8 +393,8 @@ public static class Builder private Boolean killAllDataSources; private Object killPendingSegmentsSkipList; private Integer maxSegmentsInNodeLoadingQueue; - private Object maintenanceList; - private Integer maintenanceModeSegmentsPriority; + private Object decommissioningNodes; + private Integer decommissioningMaxPercentOfMaxSegmentsToMove; public Builder() { @@ -407,8 +414,8 @@ public Builder( @JsonProperty("killAllDataSources") @Nullable Boolean killAllDataSources, @JsonProperty("killPendingSegmentsSkipList") @Nullable Object killPendingSegmentsSkipList, @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue, - @JsonProperty("historicalNodesInMaintenance") @Nullable Object maintenanceList, - @JsonProperty("nodesInMaintenancePriority") @Nullable Integer maintenanceModeSegmentsPriority + @JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes, + @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove ) { this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; @@ -423,8 +430,8 @@ public Builder( this.killableDataSources = killableDataSources; this.killPendingSegmentsSkipList = killPendingSegmentsSkipList; this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; - this.maintenanceList = maintenanceList; - this.maintenanceModeSegmentsPriority = maintenanceModeSegmentsPriority; + this.decommissioningNodes = decommissioningNodes; + this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove; } public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting) @@ -493,15 +500,15 @@ public Builder withMaxSegmentsInNodeLoadingQueue(int maxSegmentsInNodeLoadingQue return this; } - public Builder withMaintenanceList(Set list) + public Builder withDecommissioningNodes(Set decommissioning) { - this.maintenanceList = list; + this.decommissioningNodes = decommissioning; return this; } - public Builder withMaintenanceModeSegmentsPriority(Integer priority) + public Builder withDecommissioningMaxPercentOfMaxSegmentsToMove(Integer percent) { - this.maintenanceModeSegmentsPriority = priority; + this.decommissioningMaxPercentOfMaxSegmentsToMove = percent; return this; } @@ -522,10 +529,10 @@ public CoordinatorDynamicConfig build() maxSegmentsInNodeLoadingQueue == null ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE : maxSegmentsInNodeLoadingQueue, - maintenanceList, - maintenanceModeSegmentsPriority == null - ? DEFAULT_MAINTENANCE_MODE_SEGMENTS_PRIORITY - : maintenanceModeSegmentsPriority + decommissioningNodes, + decommissioningMaxPercentOfMaxSegmentsToMove == null + ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT + : decommissioningMaxPercentOfMaxSegmentsToMove ); } @@ -548,10 +555,10 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) maxSegmentsInNodeLoadingQueue == null ? defaults.getMaxSegmentsInNodeLoadingQueue() : maxSegmentsInNodeLoadingQueue, - maintenanceList == null ? defaults.getHistoricalNodesInMaintenance() : maintenanceList, - maintenanceModeSegmentsPriority == null - ? defaults.getNodesInMaintenancePriority() - : maintenanceModeSegmentsPriority + decommissioningNodes == null ? defaults.getDecommissioningNodes() : decommissioningNodes, + decommissioningMaxPercentOfMaxSegmentsToMove == null + ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove() + : decommissioningMaxPercentOfMaxSegmentsToMove ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index b92effc8651f..c20ae0c5d514 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -694,7 +694,7 @@ public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter) } // Find all historical servers, group them by subType and sort by ascending usage - Set nodesInMaintenance = params.getCoordinatorDynamicConfig().getHistoricalNodesInMaintenance(); + Set decommissioningServers = params.getCoordinatorDynamicConfig().getDecommissioningNodes(); final DruidCluster cluster = new DruidCluster(); for (ImmutableDruidServer server : servers) { if (!loadManagementPeons.containsKey(server.getName())) { @@ -709,7 +709,7 @@ public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter) new ServerHolder( server, loadManagementPeons.get(server.getName()), - nodesInMaintenance.contains(server.getHost()) + decommissioningServers.contains(server.getHost()) ) ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java index c7d7a86c825c..ba96566a4dfd 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java @@ -32,18 +32,18 @@ public class ServerHolder implements Comparable private static final Logger log = new Logger(ServerHolder.class); private final ImmutableDruidServer server; private final LoadQueuePeon peon; - private final boolean inMaintenance; + private final boolean isDecommissioning; public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon) { this(server, peon, false); } - public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon, boolean inMaintenance) + public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon, boolean isDecommissioning) { this.server = server; this.peon = peon; - this.inMaintenance = inMaintenance; + this.isDecommissioning = isDecommissioning; } public ImmutableDruidServer getServer() @@ -82,14 +82,15 @@ public double getPercentUsed() } /** - * Historical nodes can be placed in maintenance mode, which instructs Coordinator to move segments from them - * according to a specified priority. The mechanism allows to drain segments from nodes which are planned for - * replacement. - * @return true if the node is in maitenance mode + * Historical nodes can be 'decommissioned', which instructs Coordinator to move segments from them according to + * the percent of move operations diverted from normal balancer moves for this purpose by + * {@link CoordinatorDynamicConfig#getDecommissioningMaxPercentOfMaxSegmentsToMove()}. The mechanism allows draining + * segments from nodes which are planned for replacement. + * @return true if the node is decommissioning */ - public boolean isInMaintenance() + public boolean isDecommissioning() { - return inMaintenance; + return isDecommissioning; } public long getAvailableSize() diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 10499ac807a4..cf8d7253191f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -95,37 +95,41 @@ private void balanceTier( { if (params.getAvailableSegments().size() == 0) { - log.info("Metadata segments are not available. Cannot balance."); + log.warn("Metadata segments are not available. Cannot balance."); + // suppress emit zero stats return; } currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>()); if (!currentlyMovingSegments.get(tier).isEmpty()) { reduceLifetimes(tier); - log.info("[%s]: Still waiting on %,d segments to be moved", tier, currentlyMovingSegments.get(tier).size()); + log.info( + "[%s]: Still waiting on %,d segments to be moved. Skipping balance.", + tier, + currentlyMovingSegments.get(tier).size() + ); + // suppress emit zero stats return; } /* - Take as much segments from maintenance servers as priority allows and find the best location for them on - available servers. After that, balance segments within available servers pool. + Take as many segments from decommissioning servers as decommissioningMaxPercentOfMaxSegmentsToMove allows and find + the best location for them on active servers. After that, balance segments within active servers pool. */ Map> partitions = - servers.stream().collect(Collectors.partitioningBy(ServerHolder::isInMaintenance)); - final List maintenanceServers = partitions.get(true); - final List availableServers = partitions.get(false); + servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning)); + final List decommissioningServers = partitions.get(true); + final List activeServers = partitions.get(false); log.info( - "Found %d servers in maintenance, %d available servers servers", - maintenanceServers.size(), - availableServers.size() + "Found %d active servers, %d decommissioning servers", + activeServers.size(), + decommissioningServers.size() ); - if (maintenanceServers.isEmpty()) { - if (availableServers.size() <= 1) { - log.info("[%s]: %d available servers servers found. Cannot balance.", tier, availableServers.size()); - } - } else if (availableServers.isEmpty()) { - log.info("[%s]: no available servers servers found during maintenance. Cannot balance.", tier); + if ((decommissioningServers.isEmpty() && activeServers.size() <= 1) || activeServers.isEmpty()) { + log.warn("[%s]: insufficient active servers. Cannot balance.", tier); + // suppress emit zero stats + return; } int numSegments = 0; @@ -134,23 +138,30 @@ private void balanceTier( } if (numSegments == 0) { - log.info("No segments found. Cannot balance."); + log.info("No segments found. Cannot balance."); + // suppress emit zero stats return; } final int maxSegmentsToMove = Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), numSegments); - int priority = params.getCoordinatorDynamicConfig().getNodesInMaintenancePriority(); - int maxMaintenanceSegmentsToMove = (int) Math.ceil(maxSegmentsToMove * priority / 10.0); - log.info("Processing %d segments from servers in maintenance mode", maxMaintenanceSegmentsToMove); - Pair maintenanceResult = - balanceServers(params, maintenanceServers, availableServers, maxMaintenanceSegmentsToMove); - int maxGeneralSegmentsToMove = maxSegmentsToMove - maintenanceResult.lhs; - log.info("Processing %d segments from servers in general mode", maxGeneralSegmentsToMove); + int decommissioningMaxPercentOfMaxSegmentsToMove = + params.getCoordinatorDynamicConfig().getDecommissioningMaxPercentOfMaxSegmentsToMove(); + int maxSegmentsToMoveFromDecommissioningNodes = + (int) Math.ceil(maxSegmentsToMove * (decommissioningMaxPercentOfMaxSegmentsToMove / 100.0)); + log.info( + "Processing %d segments for moving from decommissioning servers", + maxSegmentsToMoveFromDecommissioningNodes + ); + Pair decommissioningResult = + balanceServers(params, decommissioningServers, activeServers, maxSegmentsToMoveFromDecommissioningNodes); + + int maxGeneralSegmentsToMove = maxSegmentsToMove - decommissioningResult.lhs; + log.info("Processing %d segments for balancing between active servers", maxGeneralSegmentsToMove); Pair generalResult = - balanceServers(params, availableServers, availableServers, maxGeneralSegmentsToMove); + balanceServers(params, activeServers, activeServers, maxGeneralSegmentsToMove); - int moved = generalResult.lhs + maintenanceResult.lhs; - int unmoved = generalResult.rhs + maintenanceResult.rhs; + int moved = generalResult.lhs + decommissioningResult.lhs; + int unmoved = generalResult.rhs + decommissioningResult.rhs; if (unmoved == maxSegmentsToMove) { // Cluster should be alive and constantly adjusting log.info("No good moves found in tier [%s]", tier); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java index b28f569e55b4..658171236adc 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java @@ -46,7 +46,7 @@ public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntim } else { params.getDruidCluster().getAllServers().forEach( eachHolder -> { - if (!eachHolder.isInMaintenance() + if (!eachHolder.isDecommissioning() && colocatedDataSources.stream() .anyMatch(source -> eachHolder.getServer().getDataSource(source) != null)) { loadServerHolders.add(eachHolder); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java index 3de93cfdaf99..1de3479fe30e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java @@ -153,8 +153,8 @@ private static List getFilteredHolders( log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit(); return Collections.emptyList(); } - Predicate isNotInMaintenance = s -> !s.isInMaintenance(); - return queue.stream().filter(isNotInMaintenance.and(predicate)).collect(Collectors.toList()); + Predicate isActive = s -> !s.isDecommissioning(); + return queue.stream().filter(isActive.and(predicate)).collect(Collectors.toList()); } /** @@ -385,14 +385,14 @@ private static int dropForTier( Map> holders = holdersInTier.stream() .filter(s -> s.isServingSegment(segment)) .collect(Collectors.partitioningBy( - ServerHolder::isInMaintenance, + ServerHolder::isDecommissioning, Collectors.toCollection(TreeSet::new) )); - TreeSet maintenanceServers = holders.get(true); - TreeSet availableServers = holders.get(false); - int left = dropSegmentFromServers(balancerStrategy, segment, maintenanceServers, numToDrop); + TreeSet decommissioningServers = holders.get(true); + TreeSet activeServers = holders.get(false); + int left = dropSegmentFromServers(balancerStrategy, segment, decommissioningServers, numToDrop); if (left > 0) { - left = dropSegmentFromServers(balancerStrategy, segment, availableServers, left); + left = dropSegmentFromServers(balancerStrategy, segment, activeServers, left); } if (left != 0) { log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getId()); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerTest.java index 67521e397e00..dbd3048e5396 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerTest.java @@ -201,14 +201,14 @@ public void testMoveToEmptyServerBalancer() /** * Server 1 has 2 segments. - * Server 2 (maintenance) has 2 segments. + * Server 2 (decommissioning) has 2 segments. * Server 3 is empty. - * Maintenance has priority 7. + * Decommissioning percent is 60. * Max segments to move is 3. * 2 (of 2) segments should be moved from Server 2 and 1 (of 2) from Server 1. */ @Test - public void testMoveMaintenancePriority() + public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove() { mockDruidServer(druidServer1, "1", "normal", 30L, 100L, Arrays.asList(segment1, segment2)); mockDruidServer(druidServer2, "2", "normal", 30L, 100L, Arrays.asList(segment3, segment4)); @@ -239,8 +239,8 @@ public void testMoveMaintenancePriority() .withDynamicConfigs( CoordinatorDynamicConfig.builder() .withMaxSegmentsToMove(3) - .withMaintenanceModeSegmentsPriority(6) - .build() // ceil(3 * 0.6) = 2 segments from servers in maintenance + .withDecommissioningMaxPercentOfMaxSegmentsToMove(60) + .build() // ceil(3 * 0.6) = 2 segments from decommissioning servers ) .withBalancerStrategy(strategy) .build(); @@ -251,28 +251,28 @@ public void testMoveMaintenancePriority() } @Test - public void testZeroMaintenancePriority() + public void testZeroDecommissioningMaxPercentOfMaxSegmentsToMove() { - DruidCoordinatorRuntimeParams params = setupParamsForMaintenancePriority(0); + DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(0); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); Assert.assertThat(peon3.getSegmentsToLoad(), is(equalTo(ImmutableSet.of(segment1)))); } @Test - public void testMaxMaintenancePriority() + public void testMaxDecommissioningMaxPercentOfMaxSegmentsToMove() { - DruidCoordinatorRuntimeParams params = setupParamsForMaintenancePriority(10); + DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(10); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); Assert.assertThat(peon3.getSegmentsToLoad(), is(equalTo(ImmutableSet.of(segment2)))); } /** - * Should balance segments as usual (ignoring priority) with empty maintenanceList. + * Should balance segments as usual (ignoring percent) with empty decommissioningNodes. */ @Test - public void testMoveMaintenancePriorityWithNoMaintenance() + public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissioning() { mockDruidServer(druidServer1, "1", "normal", 30L, 100L, Arrays.asList(segment1, segment2)); mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Arrays.asList(segment3, segment4)); @@ -300,7 +300,7 @@ public void testMoveMaintenancePriorityWithNoMaintenance() ImmutableList.of(false, false, false) ) .withDynamicConfigs( - CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(3).withMaintenanceModeSegmentsPriority(9).build() + CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(3).withDecommissioningMaxPercentOfMaxSegmentsToMove(9).build() ) .withBalancerStrategy(strategy) .build(); @@ -311,10 +311,10 @@ public void testMoveMaintenancePriorityWithNoMaintenance() } /** - * Shouldn't move segments to a server in maintenance mode. + * Shouldn't move segments to a decommissioning server. */ @Test - public void testMoveToServerInMaintenance() + public void testMoveToDecommissioningServer() { mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments); mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList()); @@ -347,7 +347,7 @@ public void testMoveToServerInMaintenance() } @Test - public void testMoveFromServerInMaintenance() + public void testMoveFromDecommissioningServer() { mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments); mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList()); @@ -512,7 +512,7 @@ private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder( private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder( List druidServers, List peons, - List maintenance + List decommissioning ) { return DruidCoordinatorRuntimeParams @@ -524,7 +524,7 @@ private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder( "normal", IntStream .range(0, druidServers.size()) - .mapToObj(i -> new ServerHolder(druidServers.get(i), peons.get(i), maintenance.get(i))) + .mapToObj(i -> new ServerHolder(druidServers.get(i), peons.get(i), decommissioning.get(i))) .collect(Collectors.toSet()) ) ) @@ -622,7 +622,7 @@ public void emitStats(String tier, CoordinatorStats stats, List se } } - private DruidCoordinatorRuntimeParams setupParamsForMaintenancePriority(int priority) + private DruidCoordinatorRuntimeParams setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(int percent) { mockDruidServer(druidServer1, "1", "normal", 30L, 100L, Arrays.asList(segment1, segment3)); mockDruidServer(druidServer2, "2", "normal", 30L, 100L, Arrays.asList(segment2, segment3)); @@ -632,7 +632,7 @@ private DruidCoordinatorRuntimeParams setupParamsForMaintenancePriority(int prio mockCoordinator(coordinator); - // either maintenance servers list or general ones (ie servers list is [2] or [1, 3]) + // either decommissioning servers list or acitve ones (ie servers list is [2] or [1, 3]) BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true)))) .andReturn(new BalancerSegmentHolder(druidServer2, segment2)); @@ -651,7 +651,7 @@ private DruidCoordinatorRuntimeParams setupParamsForMaintenancePriority(int prio .withDynamicConfigs( CoordinatorDynamicConfig.builder() .withMaxSegmentsToMove(1) - .withMaintenanceModeSegmentsPriority(priority) + .withDecommissioningMaxPercentOfMaxSegmentsToMove(percent) .build() ) .withBalancerStrategy(strategy) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java index 359fb6895cce..83398d7f6dc4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java @@ -58,9 +58,9 @@ public class BroadcastDistributionRuleTest private final List largeSegments2 = new ArrayList<>(); private DataSegment smallSegment; private DruidCluster secondCluster; - private ServerHolder generalServer; - private ServerHolder maintenanceServer2; - private ServerHolder maintenanceServer1; + private ServerHolder activeServer; + private ServerHolder decommissioningServer1; + private ServerHolder decommissioningServer2; @Before public void setUp() @@ -200,9 +200,9 @@ public void setUp() ) ); - generalServer = new ServerHolder( + activeServer = new ServerHolder( new DruidServer( - "general", + "active", "host1", null, 100, @@ -214,9 +214,9 @@ public void setUp() new LoadQueuePeonTester() ); - maintenanceServer1 = new ServerHolder( + decommissioningServer1 = new ServerHolder( new DruidServer( - "maintenance1", + "decommissioning1", "host2", null, 100, @@ -229,9 +229,9 @@ public void setUp() true ); - maintenanceServer2 = new ServerHolder( + decommissioningServer2 = new ServerHolder( new DruidServer( - "maintenance2", + "decommissioning2", "host3", null, 100, @@ -267,9 +267,9 @@ public void setUp() ImmutableMap.of( "tier1", Stream.of( - generalServer, - maintenanceServer1, - maintenanceServer2 + activeServer, + decommissioningServer1, + decommissioningServer2 ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) ) ); @@ -315,19 +315,19 @@ public void testBroadcastToSingleDataSource() /** * Servers: - * name | segments - * -------------+-------------- - * general | large segment - * maintenance1 | small segment - * maintenance2 | large segment + * name | segments + * -----------------+-------------- + * active | large segment + * decommissioning1 | small segment + * decommissioning2 | large segment * * After running the rule for the small segment: - * general | large & small segments - * maintenance1 | - * maintenance2 | large segment + * active | large & small segments + * decommissioning1 | + * decommissionint2 | large segment */ @Test - public void testBroadcastWithMaintenance() + public void testBroadcastDecommissioning() { final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(ImmutableList.of("large_source")); @@ -348,9 +348,9 @@ public void testBroadcastWithMaintenance() assertEquals(1L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT)); assertEquals(false, stats.hasPerTierStats()); - assertEquals(1, generalServer.getPeon().getSegmentsToLoad().size()); - assertEquals(1, maintenanceServer1.getPeon().getSegmentsToDrop().size()); - assertEquals(0, maintenanceServer2.getPeon().getSegmentsToLoad().size()); + assertEquals(1, activeServer.getPeon().getSegmentsToLoad().size()); + assertEquals(1, decommissioningServer1.getPeon().getSegmentsToDrop().size()); + assertEquals(0, decommissioningServer2.getPeon().getSegmentsToLoad().size()); } @Test diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java index 9a9bcb170a75..a8793b2a6474 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java @@ -687,11 +687,11 @@ public void testMaxLoadingQueueSize() } /** - * 2 servers in different tiers, the first is in maitenance mode. - * Should not load a segment to the server in maintenance mode. + * 2 servers in different tiers, the first is decommissioning. + * Should not load a segment to the server that is decommissioning */ @Test - public void testLoadDuringMaitenance() + public void testLoadDecommissioning() { final LoadQueuePeon mockPeon1 = createEmptyPeon(); final LoadQueuePeon mockPeon2 = createOneCallPeonMock(); @@ -737,11 +737,11 @@ public void testLoadDuringMaitenance() } /** - * 2 tiers, 2 servers each, 1 server of the second tier is in maintenance. - * Should not load a segment to the server in maintenance mode. + * 2 tiers, 2 servers each, 1 server of the second tier is decommissioning. + * Should not load a segment to the server that is decommssioning. */ @Test - public void testLoadReplicaDuringMaitenance() + public void testLoadReplicaDuringDecommissioning() { EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes(); @@ -796,11 +796,11 @@ public void testLoadReplicaDuringMaitenance() } /** - * 2 servers with a segment, one server in maintenance mode. + * 2 servers with a segment, one server decommissioning. * Should drop a segment from both. */ @Test - public void testDropDuringMaintenance() + public void testDropDuringDecommissioning() { final LoadQueuePeon mockPeon = createEmptyPeon(); mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject()); @@ -859,12 +859,12 @@ public void testDropDuringMaintenance() /** * 3 servers hosting 3 replicas of the segment. - * 1 servers is in maitenance. + * 1 servers is decommissioning. * 1 replica is redundant. - * Should drop from the server in maintenance. + * Should drop from the decommissioning server. */ @Test - public void testRedundantReplicaDropDuringMaintenance() + public void testRedundantReplicaDropDuringDecommissioning() { final LoadQueuePeon mockPeon1 = new LoadQueuePeonTester(); final LoadQueuePeon mockPeon2 = new LoadQueuePeonTester(); @@ -1019,12 +1019,12 @@ private static LoadQueuePeon createOneCallPeonMock() return mockPeon2; } - private static ServerHolder createServerHolder(String tier, LoadQueuePeon mockPeon1, boolean maintenance) + private static ServerHolder createServerHolder(String tier, LoadQueuePeon mockPeon1, boolean isDecommissioning) { return new ServerHolder( createServer(tier).toImmutableDruidServer(), mockPeon1, - maintenance + isDecommissioning ); } } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index e0979252012c..af97906f9cc7 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -50,8 +50,8 @@ public void testSerde() throws Exception + " \"emitBalancingStats\": true,\n" + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" + " \"maxSegmentsInNodeLoadingQueue\": 1,\n" - + " \"historicalNodesInMaintenance\": [\"host1\", \"host2\"],\n" - + " \"nodesInMaintenancePriority\": 9\n" + + " \"decommissioningNodes\": [\"host1\", \"host2\"],\n" + + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9\n" + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( @@ -63,19 +63,19 @@ public void testSerde() throws Exception ), CoordinatorDynamicConfig.class ); - ImmutableSet maintenance = ImmutableSet.of("host1", "host2"); + ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, maintenance, 9); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9); - actual = CoordinatorDynamicConfig.builder().withMaintenanceList(ImmutableSet.of("host1")).build(actual); + actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9); - actual = CoordinatorDynamicConfig.builder().withMaintenanceModeSegmentsPriority(5).build(actual); + actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5); } @Test - public void testMaintenanceParametersBackwardCompatibility() throws Exception + public void testDecommissioningParametersBackwardCompatibility() throws Exception { String jsonStr = "{\n" + " \"millisToWaitBeforeDeleting\": 1,\n" @@ -99,14 +99,14 @@ public void testMaintenanceParametersBackwardCompatibility() throws Exception ), CoordinatorDynamicConfig.class ); - ImmutableSet maintenance = ImmutableSet.of(); + ImmutableSet decommissioning = ImmutableSet.of(); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, maintenance, 0); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0); - actual = CoordinatorDynamicConfig.builder().withMaintenanceList(ImmutableSet.of("host1")).build(actual); + actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0); - actual = CoordinatorDynamicConfig.builder().withMaintenanceModeSegmentsPriority(5).build(actual); + actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); assertConfig(actual, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5); } @@ -217,7 +217,7 @@ public void testBuilderDefaults() { CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder().build(); ImmutableSet emptyList = ImmutableSet.of(); - assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, emptyList, false, 0, emptyList, 7); + assertConfig(defaultConfig, 900000, 524288000, 100, 5, 15, 10, 1, false, emptyList, false, 0, emptyList, 70); } @Test @@ -257,8 +257,8 @@ private void assertConfig( Set expectedKillableDatasources, boolean expectedKillAllDataSources, int expectedMaxSegmentsInNodeLoadingQueue, - Set maintenanceList, - int maintenancePriority + Set decommissioning, + int decommissioningMaxPercentOfMaxSegmentsToMove ) { Assert.assertEquals(expectedMillisToWaitBeforeDeleting, config.getMillisToWaitBeforeDeleting()); @@ -272,7 +272,7 @@ private void assertConfig( Assert.assertEquals(expectedKillableDatasources, config.getKillableDataSources()); Assert.assertEquals(expectedKillAllDataSources, config.isKillAllDataSources()); Assert.assertEquals(expectedMaxSegmentsInNodeLoadingQueue, config.getMaxSegmentsInNodeLoadingQueue()); - Assert.assertEquals(maintenanceList, config.getHistoricalNodesInMaintenance()); - Assert.assertEquals(maintenancePriority, config.getNodesInMaintenancePriority()); + Assert.assertEquals(decommissioning, config.getDecommissioningNodes()); + Assert.assertEquals(decommissioningMaxPercentOfMaxSegmentsToMove, config.getDecommissioningMaxPercentOfMaxSegmentsToMove()); } }