diff --git a/docs/api-reference/dynamic-configuration-api.md b/docs/api-reference/dynamic-configuration-api.md index 971aa81d206a..cad61e4b88fa 100644 --- a/docs/api-reference/dynamic-configuration-api.md +++ b/docs/api-reference/dynamic-configuration-api.md @@ -106,7 +106,9 @@ Host: http://ROUTER_IP:ROUTER_PORT "useRoundRobinSegmentAssignment": true, "smartSegmentLoading": true, "debugDimensions": null, - "turboLoadingNodes": [] + "turboLoadingNodes": [], + "cloneServers": {} + } ``` @@ -174,7 +176,8 @@ curl "http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/config" \ "replicateAfterLoadTimeout": false, "maxNonPrimaryReplicantsToLoad": 2147483647, "useRoundRobinSegmentAssignment": true, - "turboLoadingNodes": [] + "turboLoadingNodes": [], + "cloneServers": {} }' ``` @@ -206,7 +209,8 @@ Content-Length: 683 "replicateAfterLoadTimeout": false, "maxNonPrimaryReplicantsToLoad": 2147483647, "useRoundRobinSegmentAssignment": true, - "turboLoadingNodes": [] + "turboLoadingNodes": [], + "cloneServers": {} } ``` diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 91168d9471a1..328ab3508251 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -954,6 +954,7 @@ The following table shows the dynamic configuration properties for the Coordinat |`pauseCoordination`|Boolean flag for whether or not the Coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` interface. Such duties include: segment balancing, segment compaction, submitting kill tasks for unused segments (if enabled), logging of used segments in the cluster, marking of newly unused or overshadowed segments, matching and execution of load/drop rules for used segments, unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS name nodes with downtime and don't want the Coordinator to be directing Historical nodes to hit the name node with API requests until maintenance is done and the deep store is declared healthy for use again.|false| |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow Historicals in the cluster. However, the slow Historical may still load the segment later and the Coordinator may issue drop requests if the segment is over-replicated.|false| |`turboLoadingNodes`| Experimental. List of Historical servers to place in turbo loading mode. These servers use a larger thread-pool to load segments faster but at the cost of query performance. For servers specified in `turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator uses the value of the respective `numLoadingThreads` instead.
Please use this config with caution. All servers should eventually be removed from this list once the segment loading on the respective historicals is finished. |none| +|`cloneServers`| Experimental. Map from target Historical server to source Historical server which should be cloned by the target. The target Historical does not participate in regular segment assignment or balancing. Instead, the Coordinator mirrors any segment assignment made to the source Historical onto the target Historical, so that the target becomes an exact copy of the source. Segments on the target Historical do not count towards replica counts either. If the source disappears, the target remains in the last known state of the source server until removed from the configuration.
Use this config with caution. All servers should eventually be removed from this list once the desired state on the respective Historicals is achieved. |none| ##### Smart segment loading diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index d3ee720f8149..8a219484acf1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -74,6 +74,7 @@ public class CoordinatorDynamicConfig private final Map validDebugDimensions; private final Set turboLoadingNodes; + private final Map cloneServers; /** * Stale pending segments belonging to the data sources in this list are not killed by {@code @@ -124,7 +125,8 @@ public CoordinatorDynamicConfig( @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment, @JsonProperty("smartSegmentLoading") @Nullable Boolean smartSegmentLoading, @JsonProperty("debugDimensions") @Nullable Map debugDimensions, - @JsonProperty("turboLoadingNodes") @Nullable Set turboLoadingNodes + @JsonProperty("turboLoadingNodes") @Nullable Set turboLoadingNodes, + @JsonProperty("cloneServers") @Nullable Map cloneServers ) { this.markSegmentAsUnusedDelayMillis = @@ -169,6 +171,7 @@ public CoordinatorDynamicConfig( this.debugDimensions = debugDimensions; this.validDebugDimensions = validateDebugDimensions(debugDimensions); this.turboLoadingNodes = Configs.valueOrDefault(turboLoadingNodes, Set.of()); + this.cloneServers = Configs.valueOrDefault(cloneServers, Map.of()); } private Map validateDebugDimensions(Map debugDimensions) @@ -322,6 +325,19 @@ public boolean getReplicateAfterLoadTimeout() return replicateAfterLoadTimeout; } + /** + * Map from target Historical server to source Historical server which should be cloned by the target. The target + * Historical does not participate in regular segment assignment or balancing. Instead, the Coordinator mirrors any + * segment assignment made to the source Historical onto the target Historical, so that the target becomes an exact + * copy of the source. Segments on the target Historical do not count towards replica counts either. If the source + * disappears, the target remains in the last known state of the source server until removed from the cloneServers. + */ + @JsonProperty + public Map getCloneServers() + { + return cloneServers; + } + /** * List of servers to put in turbo-loading mode. These servers will use a larger thread pool to load * segments. This causes decreases the average time taken to load segments. However, this also means less resources @@ -464,6 +480,7 @@ public static class Builder private Boolean useRoundRobinSegmentAssignment; private Boolean smartSegmentLoading; private Set turboLoadingNodes; + private Map cloneServers; public Builder() { @@ -487,7 +504,8 @@ public Builder( @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment, @JsonProperty("smartSegmentLoading") @Nullable Boolean smartSegmentLoading, @JsonProperty("debugDimensions") @Nullable Map debugDimensions, - @JsonProperty("turboLoadingNodes") @Nullable Set turboLoadingNodes + @JsonProperty("turboLoadingNodes") @Nullable Set turboLoadingNodes, + @JsonProperty("cloneServers") @Nullable Map cloneServers ) { this.markSegmentAsUnusedDelayMillis = markSegmentAsUnusedDelayMillis; @@ -507,6 +525,7 @@ public Builder( this.smartSegmentLoading = smartSegmentLoading; this.debugDimensions = debugDimensions; this.turboLoadingNodes = turboLoadingNodes; + this.cloneServers = cloneServers; } public Builder withMarkSegmentAsUnusedDelayMillis(long leadingTimeMillis) @@ -599,6 +618,12 @@ public Builder withUseRoundRobinSegmentAssignment(boolean useRoundRobinSegmentAs return this; } + public Builder withCloneServers(Map cloneServers) + { + this.cloneServers = cloneServers; + return this; + } + /** * Builds a CoordinatoryDynamicConfig using either the configured values, or * the default value if not configured. @@ -625,7 +650,8 @@ public CoordinatorDynamicConfig build() valueOrDefault(useRoundRobinSegmentAssignment, Defaults.USE_ROUND_ROBIN_ASSIGNMENT), valueOrDefault(smartSegmentLoading, Defaults.SMART_SEGMENT_LOADING), debugDimensions, - turboLoadingNodes + turboLoadingNodes, + cloneServers ); } @@ -656,7 +682,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) valueOrDefault(useRoundRobinSegmentAssignment, defaults.isUseRoundRobinSegmentAssignment()), valueOrDefault(smartSegmentLoading, defaults.isSmartSegmentLoading()), valueOrDefault(debugDimensions, defaults.getDebugDimensions()), - valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes()) + valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes()), + valueOrDefault(cloneServers, defaults.getCloneServers()) ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java index da0b8e8b2e70..5e3bcfd31de5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.NavigableSet; import java.util.Set; +import java.util.stream.Collectors; /** * Contains a representation of the current state of the cluster by tier. @@ -44,8 +45,9 @@ public class DruidCluster private final Set realtimes; private final Map> historicals; + private final Map> managedHistoricals; private final Set brokers; - private final List allServers; + private final List allManagedServers; private DruidCluster( Set realtimes, @@ -58,8 +60,18 @@ private DruidCluster( historicals, holders -> CollectionUtils.newTreeSet(Comparator.naturalOrder(), holders) ); + this.managedHistoricals = CollectionUtils.mapValues( + historicals, + holders -> { + List managedServers = holders.stream() + .filter(serverHolder -> !serverHolder.isUnmanaged()) + .collect(Collectors.toList()); + + return CollectionUtils.newTreeSet(Comparator.naturalOrder(), managedServers); + } + ); this.brokers = Collections.unmodifiableSet(brokers); - this.allServers = initAllServers(); + this.allManagedServers = initAllManagedServers(); } public Set getRealtimes() @@ -67,11 +79,23 @@ public Set getRealtimes() return realtimes; } + /** + * Return all historicals. + */ public Map> getHistoricals() { return historicals; } + /** + * Returns all managed historicals. Managed historicals are historicals which can participate in segment assignment, + * drop or balancing. + */ + public Map> getManagedHistoricals() + { + return managedHistoricals; + } + public Set getBrokers() { return brokers; @@ -82,26 +106,26 @@ public Iterable getTierNames() return historicals.keySet(); } - public NavigableSet getHistoricalsByTier(String tier) + public NavigableSet getManagedHistoricalsByTier(String tier) { - return historicals.get(tier); + return managedHistoricals.get(tier); } - public List getAllServers() + public List getAllManagedServers() { - return allServers; + return allManagedServers; } - private List initAllServers() + private List initAllManagedServers() { - final int historicalSize = historicals.values().stream().mapToInt(Collection::size).sum(); + final int historicalSize = managedHistoricals.values().stream().mapToInt(Collection::size).sum(); final int realtimeSize = realtimes.size(); - final List allServers = new ArrayList<>(historicalSize + realtimeSize); + final List allManagedServers = new ArrayList<>(historicalSize + realtimeSize); - historicals.values().forEach(allServers::addAll); - allServers.addAll(brokers); - allServers.addAll(realtimes); - return allServers; + managedHistoricals.values().forEach(allManagedServers::addAll); + allManagedServers.addAll(brokers); + allManagedServers.addAll(realtimes); + return allManagedServers; } public boolean isEmpty() diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 61b56e582298..8174d829df1c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -63,6 +63,7 @@ import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.config.KillUnusedSegmentsConfig; import org.apache.druid.server.coordinator.duty.BalanceSegments; +import org.apache.druid.server.coordinator.duty.CloneHistoricals; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups; @@ -558,6 +559,7 @@ private List makeHistoricalManagementDuties() new MarkOvershadowedSegmentsAsUnused(deleteSegments), new MarkEternityTombstonesAsUnused(deleteSegments), new BalanceSegments(config.getCoordinatorPeriod()), + new CloneHistoricals(loadQueueManager), new CollectLoadQueueStats() ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java index 5de1bd5ee060..b4a924dbf2c9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java @@ -32,9 +32,11 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; /** @@ -57,6 +59,7 @@ public class ServerHolder implements Comparable private final ImmutableDruidServer server; private final LoadQueuePeon peon; private final boolean isDecommissioning; + private final boolean isUnmanaged; private final int maxAssignmentsInRun; private final int maxLifetimeInQueue; @@ -73,7 +76,7 @@ public class ServerHolder implements Comparable */ private final Map queuedSegments = new HashMap<>(); - private final SegmentCountsPerInterval projectedSegments = new SegmentCountsPerInterval(); + private final SegmentCountsPerInterval projectedSegmentCounts = new SegmentCountsPerInterval(); public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon) { @@ -85,12 +88,25 @@ public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon, boolean isD this(server, peon, isDecommissioning, 0, 1); } + public ServerHolder( + ImmutableDruidServer server, + LoadQueuePeon peon, + boolean isDecommissioning, + int maxSegmentsInLoadQueue, + int maxLifetimeInQueue + ) + { + this(server, peon, isDecommissioning, false, maxSegmentsInLoadQueue, maxLifetimeInQueue); + } + /** * Creates a new ServerHolder valid for a single coordinator run. * * @param server Underlying Druid server * @param peon Load queue peon for this server * @param isDecommissioning Whether the server is decommissioning + * @param isUnmanaged Whether this server is unmanaged and should not participate in segment assignment, + * drop or balancing. * @param maxSegmentsInLoadQueue Max number of segments that can be present in * the load queue at any point. If this is 0, the * load queue can have an unlimited number of segments. @@ -101,6 +117,7 @@ public ServerHolder( ImmutableDruidServer server, LoadQueuePeon peon, boolean isDecommissioning, + boolean isUnmanaged, int maxSegmentsInLoadQueue, int maxLifetimeInQueue ) @@ -108,6 +125,7 @@ public ServerHolder( this.server = server; this.peon = peon; this.isDecommissioning = isDecommissioning; + this.isUnmanaged = isUnmanaged; this.maxAssignmentsInRun = maxSegmentsInLoadQueue == 0 ? Integer.MAX_VALUE @@ -128,7 +146,7 @@ private void initializeQueuedSegments( ) { for (DataSegment segment : server.iterateAllSegments()) { - projectedSegments.addSegment(segment); + projectedSegmentCounts.addSegment(segment); } final List expiredSegments = new ArrayList<>(); @@ -213,6 +231,14 @@ public boolean isDecommissioning() return isDecommissioning; } + /** + * Returns true if this server is unmanaged and should not participate in segment assignment, drop or balancing. + */ + public boolean isUnmanaged() + { + return isUnmanaged; + } + public boolean isLoadQueueFull() { return totalAssignmentsInRun >= maxAssignmentsInRun; @@ -264,11 +290,27 @@ public Map getQueuedSegments() } /** - * Segments that are expected to be loaded on this server once all the + * Counts for segments that are expected to be loaded on this server once all the * operations in progress have completed. */ - public SegmentCountsPerInterval getProjectedSegments() + public SegmentCountsPerInterval getProjectedSegmentCounts() { + return projectedSegmentCounts; + } + + /** + * Segments that are expected to be loaded on this server once all the operations in progress have completed. + */ + public Set getProjectedSegments() + { + final Set projectedSegments = new HashSet<>(getServedSegments()); + queuedSegments.forEach((segment, action) -> { + if (action.isLoad()) { + projectedSegments.add(segment); + } else { + projectedSegments.remove(segment); + } + }); return projectedSegments; } @@ -393,10 +435,10 @@ private void addToQueuedSegments(DataSegment segment, SegmentAction action) // Add to projected if load is started, remove from projected if drop has started if (action.isLoad()) { - projectedSegments.addSegment(segment); + projectedSegmentCounts.addSegment(segment); sizeOfLoadingSegments += segment.getSize(); } else { - projectedSegments.removeSegment(segment); + projectedSegmentCounts.removeSegment(segment); if (action == SegmentAction.DROP) { sizeOfDroppingSegments += segment.getSize(); } @@ -410,10 +452,10 @@ private void removeFromQueuedSegments(DataSegment segment, SegmentAction action) queuedSegments.remove(segment); if (action.isLoad()) { - projectedSegments.removeSegment(segment); + projectedSegmentCounts.removeSegment(segment); sizeOfLoadingSegments -= segment.getSize(); } else { - projectedSegments.addSegment(segment); + projectedSegmentCounts.addSegment(segment); if (action == SegmentAction.DROP) { sizeOfDroppingSegments -= segment.getSize(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java index 96a6ccccf5cb..d9c883bf6c53 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java @@ -284,7 +284,7 @@ protected double computePlacementCost(DataSegment proposalSegment, ServerHolder // Compute number of segments in each interval final Object2IntOpenHashMap intervalToSegmentCount = new Object2IntOpenHashMap<>(); - final SegmentCountsPerInterval projectedSegments = server.getProjectedSegments(); + final SegmentCountsPerInterval projectedSegments = server.getProjectedSegmentCounts(); projectedSegments.getIntervalToTotalSegmentCount().object2IntEntrySet().forEach(entry -> { final Interval interval = entry.getKey(); if (costComputeInterval.overlaps(interval)) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java index 6a1c6199911c..23db0ac22956 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java @@ -71,7 +71,7 @@ public static int computeNumSegmentsToMoveInTier( ) { final int totalSegments = historicals.stream().mapToInt( - server -> server.getProjectedSegments().getTotalSegmentCount() + server -> server.getProjectedSegmentCounts().getTotalSegmentCount() ).sum(); // Move at least some segments to ensure that the cluster is always balancing itself @@ -187,8 +187,8 @@ private static double getAverageSegmentSize(List servers) int totalSegmentCount = 0; long totalUsageBytes = 0; for (ServerHolder server : servers) { - totalSegmentCount += server.getProjectedSegments().getTotalSegmentCount(); - totalUsageBytes += server.getProjectedSegments().getTotalSegmentBytes(); + totalSegmentCount += server.getProjectedSegmentCounts().getTotalSegmentCount(); + totalUsageBytes += server.getProjectedSegmentCounts().getTotalSegmentBytes(); } if (totalSegmentCount <= 0 || totalUsageBytes <= 0) { @@ -209,7 +209,7 @@ static int computeSegmentsToMoveToBalanceCountsPerDatasource( { // Find all the datasources final Set datasources = servers.stream().flatMap( - s -> s.getProjectedSegments().getDatasourceToTotalSegmentCount().keySet().stream() + s -> s.getProjectedSegmentCounts().getDatasourceToTotalSegmentCount().keySet().stream() ).collect(Collectors.toSet()); if (datasources.isEmpty()) { return 0; @@ -220,7 +220,7 @@ static int computeSegmentsToMoveToBalanceCountsPerDatasource( final Object2IntMap datasourceToMinSegments = new Object2IntOpenHashMap<>(); for (ServerHolder server : servers) { final Object2IntMap datasourceToSegmentCount - = server.getProjectedSegments().getDatasourceToTotalSegmentCount(); + = server.getProjectedSegmentCounts().getDatasourceToTotalSegmentCount(); for (String datasource : datasources) { int count = datasourceToSegmentCount.getInt(datasource); datasourceToMaxSegments.mergeInt(datasource, count, Math::max); @@ -243,7 +243,7 @@ static int computeSegmentsToMoveToBalanceCountsPerDatasource( int minNumSegments = Integer.MAX_VALUE; int maxNumSegments = 0; for (ServerHolder server : servers) { - int countForSkewedDatasource = server.getProjectedSegments() + int countForSkewedDatasource = server.getProjectedSegmentCounts() .getDatasourceToTotalSegmentCount() .getInt(mostUnbalancedDatasource); @@ -276,7 +276,7 @@ private static int computeSegmentsToMoveToBalanceDiskUsage( long maxUsageBytes = 0; long minUsageBytes = Long.MAX_VALUE; for (ServerHolder server : servers) { - final SegmentCountsPerInterval projectedSegments = server.getProjectedSegments(); + final SegmentCountsPerInterval projectedSegments = server.getProjectedSegmentCounts(); // Track the maximum and minimum values long serverUsageBytes = projectedSegments.getTotalSegmentBytes(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java index 36be8a61de93..9643a547c991 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java @@ -205,7 +205,7 @@ private int getNumDecommSegmentsToMove(int maxSegmentsToMove) return 0; } else { final int decommSegmentsToMove = decommissioningServers.stream().mapToInt( - server -> server.getProjectedSegments().getTotalSegmentCount() + server -> server.getProjectedSegmentCounts().getTotalSegmentCount() ).sum(); return Math.min(decommSegmentsToMove, maxSegmentsToMove); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index c27f95a002de..e3791c9dc233 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -60,7 +60,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) return params; } - params.getDruidCluster().getHistoricals().forEach( + params.getDruidCluster().getManagedHistoricals().forEach( (tier, servers) -> new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run() ); @@ -113,7 +113,7 @@ private Pair getNumHistoricalsAndSegments(DruidCluster cluster int numHistoricals = 0; int numSegments = 0; - for (Set historicals : cluster.getHistoricals().values()) { + for (Set historicals : cluster.getManagedHistoricals().values()) { for (ServerHolder historical : historicals) { ++numHistoricals; numSegments += historical.getServer().getNumSegments() + historical.getNumQueuedSegments(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java new file mode 100644 index 000000000000..25534f1ddfbd --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.loading.SegmentAction; +import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.timeline.DataSegment; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Handles cloning of historicals. Given the historical to historical clone mappings, based on + * {@link CoordinatorDynamicConfig#getCloneServers()}, copies any segments load or unload requests from the source + * historical to the target historical. + */ +public class CloneHistoricals implements CoordinatorDuty +{ + private static final Logger log = new Logger(CloneHistoricals.class); + private final SegmentLoadQueueManager loadQueueManager; + + public CloneHistoricals(SegmentLoadQueueManager loadQueueManager) + { + this.loadQueueManager = loadQueueManager; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + final Map cloneServers = params.getCoordinatorDynamicConfig().getCloneServers(); + final CoordinatorRunStats stats = params.getCoordinatorStats(); + + if (cloneServers.isEmpty()) { + // No servers to be cloned. + return params; + } + + // Create a map of host to historical. + final Map historicalMap = params.getDruidCluster() + .getHistoricals() + .values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toMap( + serverHolder -> serverHolder.getServer().getHost(), + serverHolder -> serverHolder + )); + + for (Map.Entry entry : cloneServers.entrySet()) { + final String targetHistoricalName = entry.getKey(); + final ServerHolder targetServer = historicalMap.get(targetHistoricalName); + + final String sourceHistoricalName = entry.getValue(); + final ServerHolder sourceServer = historicalMap.get(sourceHistoricalName); + + if (sourceServer == null || targetServer == null) { + log.error( + "Could not process clone mapping[%s] as historical[%s] does not exist.", + entry, + (sourceServer == null ? sourceHistoricalName : targetHistoricalName) + ); + continue; + } + + final Set sourceProjectedSegments = sourceServer.getProjectedSegments(); + final Set targetProjectedSegments = targetServer.getProjectedSegments(); + // Load any segments missing in the clone target. + for (DataSegment segment : sourceProjectedSegments) { + if (!targetProjectedSegments.contains(segment) && loadQueueManager.loadSegment(segment, targetServer, SegmentAction.LOAD)) { + stats.add( + Stats.Segments.ASSIGNED_TO_CLONE, + RowKey.of(Dimension.SERVER, targetServer.getServer().getHost()), + 1L + ); + } + } + + // Drop any segments missing from the clone source. + for (DataSegment segment : targetProjectedSegments) { + if (!sourceProjectedSegments.contains(segment) && loadQueueManager.dropSegment(segment, targetServer)) { + stats.add( + Stats.Segments.DROPPED_FROM_CLONE, + RowKey.of(Dimension.SERVER, targetServer.getServer().getHost()), + 1L + ); + } + } + } + + return params; + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java index 596f9df7017f..dba11d2c2901 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java @@ -83,7 +83,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final DruidCluster cluster = params.getDruidCluster(); final Map timelines = new HashMap<>(); - cluster.getHistoricals().values().forEach( + cluster.getManagedHistoricals().values().forEach( historicals -> historicals.forEach( historical -> addSegmentsFromServer(historical, timelines) ) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java index a9e926ea4f7d..8e99d15c07ef 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java @@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -119,7 +120,7 @@ private void cancelLoadsOnDecommissioningServers(DruidCluster cluster) { final AtomicInteger cancelledCount = new AtomicInteger(0); final List decommissioningServers - = cluster.getAllServers().stream() + = cluster.getAllManagedServers().stream() .filter(ServerHolder::isDecommissioning) .collect(Collectors.toList()); @@ -152,6 +153,7 @@ private DruidCluster prepareCluster( ) { final Set decommissioningServers = dynamicConfig.getDecommissioningNodes(); + final Set unmanagedServers = new HashSet<>(dynamicConfig.getCloneServers().keySet()); final DruidCluster.Builder cluster = DruidCluster.builder(); for (ImmutableDruidServer server : currentServers) { cluster.add( @@ -159,6 +161,7 @@ private DruidCluster prepareCluster( server, taskMaster.getPeonForServer(server), decommissioningServers.contains(server.getHost()), + unmanagedServers.contains(server.getHost()), segmentLoadingConfig.getMaxSegmentsInLoadQueue(), segmentLoadingConfig.getMaxLifetimeInLoadQueue() ) @@ -173,7 +176,16 @@ private void collectHistoricalStats(DruidCluster cluster, CoordinatorRunStats st RowKey rowKey = RowKey.of(Dimension.TIER, tier); stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size()); - long totalCapacity = historicals.stream().mapToLong(ServerHolder::getMaxSize).sum(); + long totalCapacity = 0; + long cloneCount = 0; + for (ServerHolder holder : historicals) { + if (holder.isUnmanaged()) { + cloneCount += 1; + } else { + totalCapacity += holder.getMaxSize(); + } + } + stats.add(Stats.Tier.CLONE_COUNT, rowKey, cloneCount); stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity); }); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java index 2bd9fd295481..761e3383ede1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java @@ -62,7 +62,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) broadcastStatusByDatasource.put(broadcastDatasource, true); } - final List allServers = params.getDruidCluster().getAllServers(); + final List allServers = params.getDruidCluster().getAllManagedServers(); int numCancelledLoads = allServers.stream().mapToInt( server -> cancelLoadOfUnusedSegments(server, broadcastStatusByDatasource, params) ).sum(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java index 38eeb42413e7..e4fa849beb18 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java @@ -150,8 +150,7 @@ public HttpLoadQueuePeon( this.serverCapabilities = fetchSegmentLoadingCapabilities(); } - @VisibleForTesting - SegmentLoadingCapabilities fetchSegmentLoadingCapabilities() + private SegmentLoadingCapabilities fetchSegmentLoadingCapabilities() { try { final URL segmentLoadingCapabilitiesURL = new URL( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java index 1f9307cae56e..cc007449b578 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java @@ -51,7 +51,7 @@ public class RoundRobinServerSelector public RoundRobinServerSelector(DruidCluster cluster) { - cluster.getHistoricals().forEach( + cluster.getManagedHistoricals().forEach( (tier, servers) -> tierToServers.put(tier, new CircularServerList(servers)) ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java index 3d3e34072fba..241759f8b951 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCountMap.java @@ -46,7 +46,7 @@ static SegmentReplicaCountMap create(DruidCluster cluster) private void initReplicaCounts(DruidCluster cluster) { - cluster.getHistoricals().forEach( + cluster.getManagedHistoricals().forEach( (tier, historicals) -> historicals.forEach( serverHolder -> { // Add segments already loaded on this server diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java index a5e38eee7d4b..654fe42b220b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java @@ -88,7 +88,7 @@ public StrategicSegmentAssigner( this.useRoundRobinAssignment = loadingConfig.isUseRoundRobinSegmentAssignment(); this.serverSelector = useRoundRobinAssignment ? new RoundRobinServerSelector(cluster) : null; - cluster.getHistoricals().forEach( + cluster.getManagedHistoricals().forEach( (tier, historicals) -> tierToHistoricalCount.put(tier, historicals.size()) ); } @@ -275,7 +275,7 @@ private int updateReplicasInTier( } final SegmentStatusInTier segmentStatus = - new SegmentStatusInTier(segment, cluster.getHistoricalsByTier(tier)); + new SegmentStatusInTier(segment, cluster.getManagedHistoricalsByTier(tier)); // Cancel all moves in this tier if it does not need to have replicas if (shouldCancelMoves) { @@ -326,7 +326,7 @@ private void reportTierCapacityStats(DataSegment segment, int requiredReplicas, public void broadcastSegment(DataSegment segment) { final Object2IntOpenHashMap tierToRequiredReplicas = new Object2IntOpenHashMap<>(); - for (ServerHolder server : cluster.getAllServers()) { + for (ServerHolder server : cluster.getAllManagedServers()) { // Ignore servers which are not broadcast targets if (!server.getServer().getType().isSegmentBroadcastTarget()) { continue; @@ -577,7 +577,7 @@ private static ReplicationThrottler createReplicationThrottler( { final Map tierToLoadingReplicaCount = new HashMap<>(); - cluster.getHistoricals().forEach( + cluster.getManagedHistoricals().forEach( (tier, historicals) -> { int numLoadingReplicas = historicals.stream().mapToInt(ServerHolder::getNumLoadingReplicas).sum(); tierToLoadingReplicaCount.put(tier, numLoadingReplicas); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index 0bc3b609bd38..0571245c9853 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -65,6 +65,12 @@ public static class Segments // Values computed in a run public static final CoordinatorStat REPLICATION_THROTTLE_LIMIT = CoordinatorStat.toDebugOnly("replicationThrottleLimit"); + + // Cloned segments in a run + public static final CoordinatorStat ASSIGNED_TO_CLONE + = CoordinatorStat.toDebugAndEmit("cloneLoad", "segment/clone/assigned/count"); + public static final CoordinatorStat DROPPED_FROM_CLONE + = CoordinatorStat.toDebugAndEmit("cloneDrop", "segment/clone/dropped/count"); } public static class SegmentQueue @@ -98,6 +104,8 @@ public static class Tier = CoordinatorStat.toDebugAndEmit("maxRepFactor", "tier/replication/factor"); public static final CoordinatorStat HISTORICAL_COUNT = CoordinatorStat.toDebugAndEmit("numHistorical", "tier/historical/count"); + public static final CoordinatorStat CLONE_COUNT + = CoordinatorStat.toDebugAndEmit("numClones", "tier/historical/clone/count"); } public static class Compaction diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java index 17a4de1d73fb..d47cf2fb7419 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidClusterTest.java @@ -98,7 +98,7 @@ public void testAdd() } @Test - public void testGetAllServers() + public void testGetAllManagedServers() { clusterBuilder.add(NEW_REALTIME); clusterBuilder.add(NEW_HISTORICAL); @@ -107,7 +107,7 @@ public void testGetAllServers() final Set expectedRealtimes = cluster.getRealtimes(); final Map> expectedHistoricals = cluster.getHistoricals(); - final Collection allServers = cluster.getAllServers(); + final Collection allServers = cluster.getAllManagedServers(); Assert.assertEquals(4, allServers.size()); Assert.assertTrue(allServers.containsAll(cluster.getRealtimes())); Assert.assertTrue( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java index fba98bff3aec..cb4a8f1d7f3e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java @@ -73,10 +73,12 @@ public class HttpLoadQueuePeonTest private TestHttpClient httpClient; private HttpLoadQueuePeon httpLoadQueuePeon; + private SegmentLoadingCapabilities segmentLoadingCapabilities; @Before public void setUp() { + segmentLoadingCapabilities = new SegmentLoadingCapabilities(1, 3); httpClient = new TestHttpClient(); httpLoadQueuePeon = new HttpLoadQueuePeon( "http://dummy:4000", @@ -90,14 +92,7 @@ public void setUp() true ), httpClient.callbackExecutor - ) - { - @Override - SegmentLoadingCapabilities fetchSegmentLoadingCapabilities() - { - return new SegmentLoadingCapabilities(1, 3); - } - }; + ); httpLoadQueuePeon.start(); } @@ -344,14 +339,7 @@ public void testBatchSize() true ), httpClient.callbackExecutor - ) - { - @Override - SegmentLoadingCapabilities fetchSegmentLoadingCapabilities() - { - return new SegmentLoadingCapabilities(1, 3); - } - }; + ); Assert.assertEquals(1, httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL)); Assert.assertEquals(3, httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.TURBO)); @@ -362,7 +350,7 @@ private LoadPeonCallback markSegmentProcessed(DataSegment segment) return success -> httpClient.processedSegments.add(segment); } - private static class TestHttpClient implements HttpClient, DataSegmentChangeHandler + private class TestHttpClient implements HttpClient, DataSegmentChangeHandler { final BlockingExecutorService processingExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-%s"); final BlockingExecutorService callbackExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-cb"); @@ -379,6 +367,7 @@ public ListenableFuture go( } @Override + @SuppressWarnings("unchecked") public ListenableFuture go( Request request, HttpResponseHandler httpResponseHandler, @@ -388,7 +377,17 @@ public ListenableFuture go( HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); httpResponse.setContent(ChannelBuffers.buffer(0)); httpResponseHandler.handleResponse(httpResponse, null); + try { + if (request.getUrl().toString().contains("/loadCapabilities")) { + return (ListenableFuture) Futures.immediateFuture( + new ByteArrayInputStream( + MAPPER.writerFor(SegmentLoadingCapabilities.class) + .writeValueAsBytes(segmentLoadingCapabilities) + ) + ); + } + List changeRequests = MAPPER.readValue( request.getContent().array(), HttpLoadQueuePeon.REQUEST_ENTITY_TYPE_REF diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java new file mode 100644 index 000000000000..60be0e1496df --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/HistoricalCloningTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.segment.TestDataSource; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.stats.Stats; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class HistoricalCloningTest extends CoordinatorSimulationBaseTest +{ + private static final long SIZE_1TB = 1_000_000; + + private DruidServer historicalT11; + private DruidServer historicalT12; + private DruidServer historicalT13; + + private final String datasource = TestDataSource.WIKI; + + @Override + public void setUp() + { + // Setup historicals for 1 tier, size 1 TB each + historicalT11 = createHistorical(1, Tier.T1, SIZE_1TB); + historicalT12 = createHistorical(2, Tier.T1, SIZE_1TB); + historicalT13 = createHistorical(3, Tier.T1, SIZE_1TB); + } + + @Test + public void testSimpleCloning() + { + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(Segments.WIKI_10X1D) + .withServers(historicalT11, historicalT12) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .withDynamicConfig( + CoordinatorDynamicConfig.builder() + .withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost())) + .withSmartSegmentLoading(true) + .build() + ) + .withImmediateSegmentLoading(true) + .build(); + + startSimulation(sim); + runCoordinatorCycle(); + + verifyValue(Metric.ASSIGNED_COUNT, 10L); + verifyValue( + Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(), + Map.of("server", historicalT12.getName()), + 10L + ); + verifyValue( + Metric.SUCCESS_ACTIONS, + Map.of("server", historicalT11.getName(), "description", "LOAD: NORMAL"), + 10L + ); + verifyValue( + Metric.SUCCESS_ACTIONS, + Map.of("server", historicalT12.getName(), "description", "LOAD: NORMAL"), + 10L + ); + + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT11.getTotalSegments()); + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT12.getTotalSegments()); + Segments.WIKI_10X1D.forEach(segment -> { + Assert.assertEquals(segment, historicalT11.getSegment(segment.getId())); + Assert.assertEquals(segment, historicalT12.getSegment(segment.getId())); + }); + } + + @Test + public void testAddingNewHistorical() + { + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(Segments.WIKI_10X1D) + .withServers(historicalT11, historicalT12) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .withDynamicConfig( + CoordinatorDynamicConfig.builder() + .withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost())) + .withSmartSegmentLoading(true) + .build() + ) + .withImmediateSegmentLoading(true) + .build(); + + // Run 1: Current state is a historical and clone already in sync. + Segments.WIKI_10X1D.forEach(segment -> { + historicalT11.addDataSegment(segment); + historicalT12.addDataSegment(segment); + }); + + startSimulation(sim); + + runCoordinatorCycle(); + + // Confirm number of segments. + Assert.assertEquals(10, historicalT11.getTotalSegments()); + Assert.assertEquals(10, historicalT12.getTotalSegments()); + + // Add a new historical. + final DruidServer newHistorical = createHistorical(3, Tier.T1, 10_000); + addServer(newHistorical); + + // Run 2: Let the coordinator balance segments. + runCoordinatorCycle(); + + // Check that segments have been distributed to the new historical and have also been dropped by the clone + Assert.assertEquals(5, historicalT11.getTotalSegments()); + Assert.assertEquals(5, historicalT12.getTotalSegments()); + Assert.assertEquals(5, newHistorical.getTotalSegments()); + verifyValue( + Stats.Segments.DROPPED_FROM_CLONE.getMetricName(), + Map.of("server", historicalT12.getName()), + 5L + ); + } + + @Test + public void testCloningServerDisappearsAndRelaunched() + { + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(Segments.WIKI_10X1D) + .withServers(historicalT11, historicalT12) + .withRules(datasource, Load.on(Tier.T1, 2).forever()) + .withDynamicConfig( + CoordinatorDynamicConfig.builder() + .withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost())) + .withSmartSegmentLoading(true) + .build() + ) + .withImmediateSegmentLoading(true) + .build(); + + startSimulation(sim); + + // Run 1: All segments are loaded. + runCoordinatorCycle(); + Assert.assertEquals(10, historicalT11.getTotalSegments()); + Assert.assertEquals(10, historicalT12.getTotalSegments()); + + // Target server disappears, loses loaded segments. + removeServer(historicalT12); + Segments.WIKI_10X1D.forEach(segment -> historicalT12.removeDataSegment(segment.getId())); + + // Run 2: No change in source historical. + runCoordinatorCycle(); + + Assert.assertEquals(10, historicalT11.getTotalSegments()); + Assert.assertEquals(0, historicalT12.getTotalSegments()); + + // Server readded + addServer(historicalT12); + + // Run 3: Segments recloned. + runCoordinatorCycle(); + + Assert.assertEquals(10, historicalT11.getTotalSegments()); + Assert.assertEquals(10, historicalT12.getTotalSegments()); + verifyValue( + Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(), + Map.of("server", historicalT12.getName()), + 10L + ); + verifyValue( + Metric.SUCCESS_ACTIONS, + Map.of("server", historicalT12.getName(), "description", "LOAD: NORMAL"), + 10L + ); + + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT11.getTotalSegments()); + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT12.getTotalSegments()); + Segments.WIKI_10X1D.forEach(segment -> { + Assert.assertEquals(segment, historicalT11.getSegment(segment.getId())); + Assert.assertEquals(segment, historicalT12.getSegment(segment.getId())); + }); + } + + @Test + public void testClonedServerDoesNotFollowReplicationLimit() + { + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(Segments.WIKI_10X100D) + .withServers(historicalT11) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .withDynamicConfig( + CoordinatorDynamicConfig.builder() + .withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost())) + .withSmartSegmentLoading(true) + .withReplicationThrottleLimit(2) + .build() + ) + .withImmediateSegmentLoading(true) + .build(); + + Segments.WIKI_10X100D.forEach(segment -> historicalT11.addDataSegment(segment)); + startSimulation(sim); + + // Run 1: All segments are loaded on the source historical + runCoordinatorCycle(); + Assert.assertEquals(1000, historicalT11.getTotalSegments()); + Assert.assertEquals(0, historicalT12.getTotalSegments()); + + // Clone server now added. + addServer(historicalT12); + + // Run 2: Assigns all segments to the cloned historical + runCoordinatorCycle(); + + Assert.assertEquals(1000, historicalT11.getTotalSegments()); + Assert.assertEquals(1000, historicalT12.getTotalSegments()); + + verifyValue( + Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(), + Map.of("server", historicalT12.getName()), + 1000L + ); + + verifyValue( + Metric.SUCCESS_ACTIONS, + Map.of("server", historicalT12.getName(), "description", "LOAD: NORMAL"), + 1000L + ); + } + + @Test + public void testCloningHistoricalWithReplicationLimit() + { + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(Segments.WIKI_10X1D) + .withServers(historicalT11, historicalT12, historicalT13) + .withRules(datasource, Load.on(Tier.T1, 2).forever()) + .withImmediateSegmentLoading(true) + .withDynamicConfig( + CoordinatorDynamicConfig.builder() + .withCloneServers(Map.of(historicalT12.getHost(), historicalT11.getHost())) + .withSmartSegmentLoading(false) + .withReplicationThrottleLimit(2) + .withMaxSegmentsToMove(0) + .build() + ) + .withImmediateSegmentLoading(true) + .build(); + Segments.WIKI_10X1D.forEach(historicalT13::addDataSegment); + startSimulation(sim); + + // Check that only replication count segments are loaded each run and that the cloning server copies it. + while (historicalT11.getTotalSegments() < Segments.WIKI_10X1D.size()) { + runCoordinatorCycle(); + + // Check that all segments are cloned. + Assert.assertEquals(historicalT11.getTotalSegments(), historicalT12.getTotalSegments()); + + // Check that the replication throttling is respected. + verifyValue(Metric.ASSIGNED_COUNT, 2L); + verifyValue( + Stats.Segments.ASSIGNED_TO_CLONE.getMetricName(), + Map.of("server", historicalT12.getName()), + 2L + ); + } + + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT11.getTotalSegments()); + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT12.getTotalSegments()); + Assert.assertEquals(Segments.WIKI_10X1D.size(), historicalT13.getTotalSegments()); + Segments.WIKI_10X1D.forEach(segment -> { + Assert.assertEquals(segment, historicalT11.getSegment(segment.getId())); + Assert.assertEquals(segment, historicalT12.getSegment(segment.getId())); + Assert.assertEquals(segment, historicalT13.getSegment(segment.getId())); + }); + } +} diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 1be987ddc278..98324792ef7b 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -244,7 +244,8 @@ public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources() false, false, null, - ImmutableSet.of("host1") + ImmutableSet.of("host1"), + null ); Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty()); } @@ -269,7 +270,8 @@ public void testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegme false, false, null, - ImmutableSet.of("host1") + ImmutableSet.of("host1"), + null ); Assert.assertEquals(ImmutableSet.of("test1"), config.getSpecificDataSourcesToKillUnusedSegmentsIn()); }