diff --git a/docs/configuration/index.md b/docs/configuration/index.md index e6650b1cd38a..30ae54224ee3 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -775,7 +775,8 @@ A sample Coordinator dynamic config JSON object is shown below: "killDataSourceWhitelist": ["wikipedia", "testDatasource"], "decommissioningNodes": ["localhost:8182", "localhost:8282"], "decommissioningMaxPercentOfMaxSegmentsToMove": 70, - "pauseCoordination": false + "pauseCoordination": false, + "replicateAfterLoadTimeout": false } ``` @@ -799,6 +800,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none| |`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70| |`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false| +|`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false| To view the audit history of Coordinator dynamic config issue a GET request to the URL - diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 320961d3b1d6..6e22b4ff91e5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -88,6 +88,14 @@ public class CoordinatorDynamicConfig private final int maxSegmentsInNodeLoadingQueue; private final boolean pauseCoordination; + /** + * This decides whether additional replication is needed for segments that have failed to load due to a load timeout. + * When enabled, the coordinator will attempt to replicate the failed segment on a different historical server. + * The historical which failed to load the segment may still load the segment later. Therefore, enabling this setting + * works better if there are a few slow historicals in the cluster and segment availability needs to be sped up. + */ + private final boolean replicateAfterLoadTimeout; + private static final Logger log = new Logger(CoordinatorDynamicConfig.class); @JsonCreator @@ -120,7 +128,8 @@ public CoordinatorDynamicConfig( @JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue, @JsonProperty("decommissioningNodes") Object decommissioningNodes, @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove, - @JsonProperty("pauseCoordination") boolean pauseCoordination + @JsonProperty("pauseCoordination") boolean pauseCoordination, + @JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -166,6 +175,7 @@ public CoordinatorDynamicConfig( ); } this.pauseCoordination = pauseCoordination; + this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; } private static Set parseJsonStringOrArray(Object jsonStringOrArray) @@ -320,6 +330,12 @@ public boolean getPauseCoordination() return pauseCoordination; } + @JsonProperty + public boolean getReplicateAfterLoadTimeout() + { + return replicateAfterLoadTimeout; + } + @Override public String toString() { @@ -341,6 +357,7 @@ public String toString() ", decommissioningNodes=" + decommissioningNodes + ", decommissioningMaxPercentOfMaxSegmentsToMove=" + decommissioningMaxPercentOfMaxSegmentsToMove + ", pauseCoordination=" + pauseCoordination + + ", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout + '}'; } @@ -402,6 +419,9 @@ public boolean equals(Object o) if (pauseCoordination != that.pauseCoordination) { return false; } + if (replicateAfterLoadTimeout != that.replicateAfterLoadTimeout) { + return false; + } return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove; } @@ -449,6 +469,7 @@ public static class Builder private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0; private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70; private static final boolean DEFAULT_PAUSE_COORDINATION = false; + private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false; private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments; private Long mergeBytesLimit; @@ -466,6 +487,7 @@ public static class Builder private Object decommissioningNodes; private Integer decommissioningMaxPercentOfMaxSegmentsToMove; private Boolean pauseCoordination; + private Boolean replicateAfterLoadTimeout; public Builder() { @@ -490,7 +512,8 @@ public Builder( @JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes, @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") @Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove, - @JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination + @JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination, + @JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout ) { this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments = @@ -510,6 +533,7 @@ public Builder( this.decommissioningNodes = decommissioningNodes; this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove; this.pauseCoordination = pauseCoordination; + this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; } public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis) @@ -602,6 +626,12 @@ public Builder withPauseCoordination(boolean pauseCoordination) return this; } + public Builder withReplicateAfterLoadTimeout(boolean replicateAfterLoadTimeout) + { + this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; + return this; + } + public CoordinatorDynamicConfig build() { return new CoordinatorDynamicConfig( @@ -629,7 +659,8 @@ public CoordinatorDynamicConfig build() decommissioningMaxPercentOfMaxSegmentsToMove == null ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT : decommissioningMaxPercentOfMaxSegmentsToMove, - pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination + pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination, + replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout ); } @@ -663,7 +694,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) decommissioningMaxPercentOfMaxSegmentsToMove == null ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove() : decommissioningMaxPercentOfMaxSegmentsToMove, - pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination + pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination, + replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java index 85573b6f1362..ff7e2fcc39ce 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java @@ -111,6 +111,15 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST ); + /** + * Needs to be thread safe since it can be concurrently accessed via + * {@link #failAssign(SegmentHolder, boolean, Exception)}, {@link #actionCompleted(SegmentHolder)}, + * {@link #getTimedOutSegments()} and {@link #stop()} + */ + private final ConcurrentSkipListSet timedOutSegments = new ConcurrentSkipListSet<>( + DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST + ); + CuratorLoadQueuePeon( CuratorFramework curator, String basePath, @@ -149,6 +158,12 @@ public Set getSegmentsMarkedToDrop() return segmentsMarkedToDrop; } + @Override + public Set getTimedOutSegments() + { + return timedOutSegments; + } + @Override public long getLoadQueueSize() { @@ -268,10 +283,10 @@ public void run() // This is expected when historicals haven't yet picked up processing this segment and coordinator // tries reassigning it to the same node. log.warn(ne, "ZK node already exists because segment change request hasn't yet been processed"); - failAssign(segmentHolder); + failAssign(segmentHolder, true); } catch (Exception e) { - failAssign(segmentHolder, e); + failAssign(segmentHolder, false, e); } } @@ -282,14 +297,21 @@ private ScheduledFuture scheduleNodeDeletedCheck(String path) () -> { try { if (curator.checkExists().forPath(path) != null) { - failAssign(segmentHolder, new ISE("%s was never removed! Failing this operation!", path)); + failAssign( + segmentHolder, + true, + new ISE("Failing this %s operation since it timed out and %s was never removed! These segments might still get processed", + segmentHolder.getType() == DROP ? "DROP" : "LOAD", + path + ) + ); } else { log.debug("%s detected to be removed. ", path); } } catch (Exception e) { log.error(e, "Exception caught and ignored when checking whether zk node was deleted"); - failAssign(segmentHolder, e); + failAssign(segmentHolder, false, e); } }, config.getLoadTimeoutDelay().getMillis(), @@ -307,10 +329,12 @@ private void actionCompleted(SegmentHolder segmentHolder) // See https://github.com/apache/druid/pull/10362 for more details. if (null != segmentsToLoad.remove(segmentHolder.getSegment())) { queuedSize.addAndGet(-segmentHolder.getSegmentSize()); + timedOutSegments.remove(segmentHolder.getSegment()); } break; case DROP: segmentsToDrop.remove(segmentHolder.getSegment()); + timedOutSegments.remove(segmentHolder.getSegment()); break; default: throw new UnsupportedOperationException(); @@ -337,6 +361,7 @@ public void stop() } segmentsToLoad.clear(); + timedOutSegments.clear(); queuedSize.set(0L); failedAssignCount.set(0); } @@ -361,21 +386,33 @@ private void entryRemoved(SegmentHolder segmentHolder, String path) ); } - private void failAssign(SegmentHolder segmentHolder) + private void failAssign(SegmentHolder segmentHolder, boolean handleTimeout) { - failAssign(segmentHolder, null); + failAssign(segmentHolder, handleTimeout, null); } - private void failAssign(SegmentHolder segmentHolder, Exception e) + private void failAssign(SegmentHolder segmentHolder, boolean handleTimeout, Exception e) { if (e != null) { log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, segmentHolder); } failedAssignCount.getAndIncrement(); - // Act like it was completed so that the coordinator gives it to someone else - actionCompleted(segmentHolder); - } + if (handleTimeout) { + // Avoid removing the segment entry from the load/drop list in case config.getLoadTimeoutDelay() expires. + // This is because the ZK Node is still present and it may be processed after this timeout and so the coordinator + // needs to take this into account. + log.debug( + "Skipping segment removal from [%s] queue, since ZK Node still exists!", + segmentHolder.getType() == DROP ? "DROP" : "LOAD" + ); + timedOutSegments.add(segmentHolder.getSegment()); + executeCallbacks(segmentHolder); + } else { + // This may have failed for a different reason and so act like it was completed. + actionCompleted(segmentHolder); + } + } private static class SegmentHolder { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 8414e50e1f1f..58af776863d3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -853,7 +853,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) startPeonsForNewServers(currentServers); final DruidCluster cluster = prepareCluster(params, currentServers); - segmentReplicantLookup = SegmentReplicantLookup.make(cluster); + segmentReplicantLookup = SegmentReplicantLookup.make(cluster, getDynamicConfigs().getReplicateAfterLoadTimeout()); stopPeonsForDisappearedServers(currentServers); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java index a05c5aac36f8..f00e5aaf0751 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java @@ -428,6 +428,12 @@ public Set getSegmentsToDrop() return Collections.unmodifiableSet(segmentsToDrop.keySet()); } + @Override + public Set getTimedOutSegments() + { + return Collections.emptySet(); + } + @Override public long getLoadQueueSize() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/LoadQueuePeon.java index 7fd5e853e25b..483fe7f6f160 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/LoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/LoadQueuePeon.java @@ -37,6 +37,8 @@ public abstract class LoadQueuePeon public abstract Set getSegmentsToDrop(); + public abstract Set getTimedOutSegments(); + public abstract void unmarkSegmentToDrop(DataSegment segmentToLoad); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java index b86ca0106d39..3a4bdb9f6274 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java @@ -36,9 +36,15 @@ */ public class SegmentReplicantLookup { - public static SegmentReplicantLookup make(DruidCluster cluster) + public static SegmentReplicantLookup make(DruidCluster cluster, boolean replicateAfterLoadTimeout) { final Table segmentsInCluster = HashBasedTable.create(); + + /** + * For each tier, this stores the number of replicants for all the segments presently queued to load in {@link cluster}. + * Segments that have failed to load due to the load timeout may not be present in this table if {@link replicateAfterLoadTimeout} is true. + * This is to enable additional replication of the timed out segments for improved availability. + */ final Table loadingSegments = HashBasedTable.create(); for (SortedSet serversByType : cluster.getSortedHistoricalsByTier()) { @@ -59,7 +65,11 @@ public static SegmentReplicantLookup make(DruidCluster cluster) if (numReplicants == null) { numReplicants = 0; } - loadingSegments.put(segment.getId(), server.getTier(), numReplicants + 1); + // Timed out segments need to be replicated in another server for faster availability. + // Therefore we skip incrementing numReplicants for timed out segments if replicateAfterLoadTimeout is enabled. + if (!replicateAfterLoadTimeout || !serverHolder.getPeon().getTimedOutSegments().contains(segment)) { + loadingSegments.put(segment.getId(), server.getTier(), numReplicants + 1); + } } } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRuntimeParamsTestHelpers.java b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRuntimeParamsTestHelpers.java index 82674ab5cf95..1dec00d11824 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRuntimeParamsTestHelpers.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRuntimeParamsTestHelpers.java @@ -32,7 +32,7 @@ public static DruidCoordinatorRuntimeParams.Builder newBuilder(DruidCluster drui { return newBuilder() .withDruidCluster(druidCluster) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)); + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false)); } private CoordinatorRuntimeParamsTestHelpers() diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java index 90202648186f..4a2a25f682c2 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java @@ -225,6 +225,7 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac Assert.assertEquals(6000, loadQueuePeon.getLoadQueueSize()); Assert.assertEquals(5, loadQueuePeon.getSegmentsToLoad().size()); Assert.assertEquals(5, loadQueuePeon.getSegmentsToDrop().size()); + Assert.assertEquals(0, loadQueuePeon.getTimedOutSegments().size()); for (DataSegment segment : segmentToDrop) { String dropRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString()); @@ -268,12 +269,63 @@ public void removeSegment(DataSegment segment, DataSegmentChangeCallback callbac } @Test - public void testFailAssign() throws Exception + public void testFailAssignForNonTimeoutFailures() throws Exception + { + final DataSegment segment = dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D"); + + final CountDownLatch segmentLoadedSignal = new CountDownLatch(1); + + loadQueuePeon = new CuratorLoadQueuePeon( + curator, + LOAD_QUEUE_PATH, + // This will fail inside SegmentChangeProcessor.run() + null, + Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"), + Execs.singleThreaded("test_load_queue_peon-%d"), + // set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly + new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration(1), + null, + null, + 10, + new Duration("PT1s") + ) + ); + + loadQueuePeon.start(); + + loadQueueCache.start(); + + loadQueuePeon.loadSegment( + segment, + new LoadPeonCallback() + { + @Override + public void execute() + { + segmentLoadedSignal.countDown(); + } + } + ); + + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal)); + Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size()); + Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize()); + Assert.assertEquals(0, loadQueuePeon.getTimedOutSegments().size()); + + } + + @Test + public void testFailAssignForLoadDropTimeout() throws Exception { final DataSegment segment = dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D"); final CountDownLatch loadRequestSignal = new CountDownLatch(1); final CountDownLatch segmentLoadedSignal = new CountDownLatch(1); + final CountDownLatch delayedSegmentLoadedSignal = new CountDownLatch(2); final CountDownLatch loadRequestRemoveSignal = new CountDownLatch(1); loadQueuePeon = new CuratorLoadQueuePeon( @@ -326,11 +378,13 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) public void execute() { segmentLoadedSignal.countDown(); + delayedSegmentLoadedSignal.countDown(); } } ); String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString()); + Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal)); Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath)); Assert.assertEquals( @@ -340,14 +394,20 @@ public void execute() .getSegment() ); - // don't simulate completion of load request here + // simulate incompletion of load request since request has timed out Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal)); - Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size()); - Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize()); + Assert.assertEquals(1, loadQueuePeon.getSegmentsToLoad().size()); + Assert.assertEquals(1200L, loadQueuePeon.getLoadQueueSize()); + Assert.assertEquals(1, loadQueuePeon.getTimedOutSegments().size()); + + // simulate completion of load request by historical after time out on coordinator curator.delete().guaranteed().forPath(loadRequestPath); + Assert.assertTrue(timing.forWaiting().awaitLatch(delayedSegmentLoadedSignal)); Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestRemoveSignal)); Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size()); Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize()); + Assert.assertEquals(0, loadQueuePeon.getTimedOutSegments().size()); + } private DataSegment dataSegmentWithInterval(String intervalStr) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index f3a54b41d37c..344fec40fe6a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -204,7 +204,7 @@ private DruidCoordinatorRuntimeParams.Builder makeCoordinatorRuntimeParams( ) { return createCoordinatorRuntimeParams(druidCluster, dataSegments) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false)) .withBalancerStrategy(balancerStrategy); } @@ -336,7 +336,7 @@ public void testRunTwoTiersWithExistingSegments() .addTier("normal", new ServerHolder(normServer.toImmutableDruidServer(), mockPeon)) .build(); - SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); @@ -446,7 +446,7 @@ public void testRunRuleDoesNotExist() .build(); DruidCoordinatorRuntimeParams params = createCoordinatorRuntimeParams(druidCluster) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster(), false)) .withEmitter(emitter) .build(); @@ -488,7 +488,7 @@ public void testDropRemove() .addTier("normal", new ServerHolder(server.toImmutableDruidServer(), mockPeon)) .build(); - SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); @@ -544,7 +544,7 @@ public void testDropTooManyInSameTier() ) .build(); - SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); @@ -604,7 +604,7 @@ public void testDropTooManyInDifferentTiers() .addTier("normal", new ServerHolder(server2.toImmutableDruidServer(), mockPeon)) .build(); - SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); @@ -656,7 +656,7 @@ public void testDontDropInDifferentTiers() .addTier("normal", new ServerHolder(server2.toImmutableDruidServer(), mockPeon)) .build(); - SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); @@ -722,7 +722,7 @@ public void testDropServerActuallyServesSegment() ) .build(); - SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); @@ -821,7 +821,7 @@ public void testReplicantThrottle() .withUsedSegmentsInTest(overFlowSegment) .withDatabaseRuleManager(databaseRuleManager) .withBalancerStrategy(balancerStrategy) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster(), false)) .build() ); stats = afterParams.getCoordinatorStats(); @@ -971,7 +971,7 @@ public void testDropReplicantThrottle() ) .build(); - SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster, false); ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec); @@ -1060,7 +1060,7 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() .withDruidCluster(druidCluster) .withUsedSegmentsInTest(usedSegments) .withDatabaseRuleManager(databaseRuleManager) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster(), false)) .withBalancerStrategy(balancerStrategy) .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) .build(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java index c2d4fd3d0004..08e0616ee417 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java @@ -308,7 +308,7 @@ private static DruidCoordinatorRuntimeParams makeCoordinartorRuntimeParams( return CoordinatorRuntimeParamsTestHelpers .newBuilder() .withDruidCluster(druidCluster) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false)) .withUsedSegmentsInTest(usedSegments) .build(); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java index 929a9ea1c544..46da5eddc762 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java @@ -179,7 +179,22 @@ private DruidCoordinatorRuntimeParams makeCoordinatorRuntimeParams( return CoordinatorRuntimeParamsTestHelpers .newBuilder() .withDruidCluster(druidCluster) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false)) + .withReplicationManager(throttler) + .withBalancerStrategy(mockBalancerStrategy) + .withUsedSegmentsInTest(usedSegments) + .build(); + } + + private DruidCoordinatorRuntimeParams makeCoordinatorRuntimeParamsWithLoadReplicationOnTimeout( + DruidCluster druidCluster, + DataSegment... usedSegments + ) + { + return CoordinatorRuntimeParamsTestHelpers + .newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, true)) .withReplicationManager(throttler) .withBalancerStrategy(mockBalancerStrategy) .withUsedSegmentsInTest(usedSegments) @@ -226,7 +241,7 @@ public void testLoadPrimaryAssignDoesNotOverAssign() Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); // ensure multiple runs don't assign primary segment again if at replication count - final LoadQueuePeon loadingPeon = createLoadingPeon(ImmutableList.of(segment)); + final LoadQueuePeon loadingPeon = createLoadingPeon(ImmutableList.of(segment), false); EasyMock.replay(loadingPeon); DruidCluster afterLoad = DruidClusterBuilder @@ -245,6 +260,125 @@ public void testLoadPrimaryAssignDoesNotOverAssign() EasyMock.verify(throttler, mockPeon, mockBalancerStrategy); } + @Test + public void testOverAssignForTimedOutSegments() + { + EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes(); + + final LoadQueuePeon emptyPeon = createEmptyPeon(); + emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + + LoadRule rule = createLoadRule(ImmutableMap.of( + "hot", 1 + )); + + final DataSegment segment = createDataSegment("foo"); + + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .anyTimes(); + + EasyMock.replay(throttler, emptyPeon, mockBalancerStrategy); + + ImmutableDruidServer server1 = + new DruidServer("serverHot", "hostHot", null, 1000, ServerType.HISTORICAL, "hot", 1).toImmutableDruidServer(); + ImmutableDruidServer server2 = + new DruidServer("serverHot2", "hostHot2", null, 1000, ServerType.HISTORICAL, "hot", 1).toImmutableDruidServer(); + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier("hot", new ServerHolder(server1, emptyPeon), new ServerHolder(server2, emptyPeon)) + .build(); + + CoordinatorStats stats = rule.run( + null, + makeCoordinatorRuntimeParamsWithLoadReplicationOnTimeout(druidCluster, segment), + segment + ); + + // Ensure that the segment is assigned to one of the historicals + Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + + // Ensure that the primary segment is assigned again in case the peon timed out on loading the segment + final LoadQueuePeon slowLoadingPeon = createLoadingPeon(ImmutableList.of(segment), true); + EasyMock.replay(slowLoadingPeon); + + DruidCluster withLoadTimeout = DruidClusterBuilder + .newBuilder() + .addTier("hot", new ServerHolder(server1, slowLoadingPeon), new ServerHolder(server2, emptyPeon)) + .build(); + + CoordinatorStats statsAfterLoadPrimary = rule.run( + null, + makeCoordinatorRuntimeParamsWithLoadReplicationOnTimeout(withLoadTimeout, segment), + segment + ); + + Assert.assertEquals(1L, statsAfterLoadPrimary.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + + EasyMock.verify(throttler, emptyPeon, mockBalancerStrategy); + } + + @Test + public void testSkipReplicationForTimedOutSegments() + { + EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes(); + + final LoadQueuePeon emptyPeon = createEmptyPeon(); + emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + + LoadRule rule = createLoadRule(ImmutableMap.of( + "hot", 1 + )); + + final DataSegment segment = createDataSegment("foo"); + + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .anyTimes(); + + EasyMock.replay(throttler, emptyPeon, mockBalancerStrategy); + + ImmutableDruidServer server1 = + new DruidServer("serverHot", "hostHot", null, 1000, ServerType.HISTORICAL, "hot", 1).toImmutableDruidServer(); + ImmutableDruidServer server2 = + new DruidServer("serverHot2", "hostHot2", null, 1000, ServerType.HISTORICAL, "hot", 1).toImmutableDruidServer(); + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier("hot", new ServerHolder(server1, emptyPeon), new ServerHolder(server2, emptyPeon)) + .build(); + + CoordinatorStats stats = rule.run( + null, + makeCoordinatorRuntimeParams(druidCluster, segment), + segment + ); + + // Ensure that the segment is assigned to one of the historicals + Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + + // Add the segment to the timed out list to simulate peon timeout on loading the segment + final LoadQueuePeon slowLoadingPeon = createLoadingPeon(ImmutableList.of(segment), true); + EasyMock.replay(slowLoadingPeon); + + DruidCluster withLoadTimeout = DruidClusterBuilder + .newBuilder() + .addTier("hot", new ServerHolder(server1, slowLoadingPeon), new ServerHolder(server2, emptyPeon)) + .build(); + + // Default behavior is to not replicate the timed out segments on other servers + CoordinatorStats statsAfterLoadPrimary = rule.run( + null, + makeCoordinatorRuntimeParams(withLoadTimeout, segment), + segment + ); + + Assert.assertEquals(0L, statsAfterLoadPrimary.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + + EasyMock.verify(throttler, emptyPeon, mockBalancerStrategy); + } + @Test public void testLoadPriority() { @@ -390,7 +524,7 @@ public void testLoadWithNonExistentTier() CoordinatorRuntimeParamsTestHelpers .newBuilder() .withDruidCluster(druidCluster) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster(), false)) .withReplicationManager(throttler) .withBalancerStrategy(mockBalancerStrategy) .withUsedSegmentsInTest(segment) @@ -471,7 +605,7 @@ public void testMaxLoadingQueueSize() DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers .newBuilder() .withDruidCluster(druidCluster) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false)) .withReplicationManager(throttler) .withBalancerStrategy(mockBalancerStrategy) .withUsedSegmentsInTest(dataSegment1, dataSegment2, dataSegment3) @@ -723,7 +857,7 @@ private static LoadQueuePeon createEmptyPeon() return mockPeon; } - private static LoadQueuePeon createLoadingPeon(List segments) + private static LoadQueuePeon createLoadingPeon(List segments, boolean slowLoading) { final Set segs = ImmutableSet.copyOf(segments); final long loadingSize = segs.stream().mapToLong(DataSegment::getSize).sum(); @@ -734,6 +868,12 @@ private static LoadQueuePeon createLoadingPeon(List segments) EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(loadingSize).anyTimes(); EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(segs.size()).anyTimes(); + if (slowLoading) { + EasyMock.expect(mockPeon.getTimedOutSegments()).andReturn(new HashSet<>(segments)).anyTimes(); + } else { + EasyMock.expect(mockPeon.getTimedOutSegments()).andReturn(new HashSet<>()).anyTimes(); + } + return mockPeon; } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index fe350d0f74ef..d95abcd482a5 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -53,7 +53,8 @@ public void testSerde() throws Exception + " \"maxSegmentsInNodeLoadingQueue\": 1,\n" + " \"decommissioningNodes\": [\"host1\", \"host2\"],\n" + " \"decommissioningMaxPercentOfMaxSegmentsToMove\": 9,\n" - + " \"pauseCoordination\": false\n" + + " \"pauseCoordination\": false,\n" + + " \"replicateAfterLoadTimeout\": false\n" + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( @@ -67,19 +68,23 @@ public void testSerde() throws Exception ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false); actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false); actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual); - assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true); + assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false); + + actual = CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual); + assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, true); + } @Test @@ -109,13 +114,13 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio ); ImmutableSet decommissioning = ImmutableSet.of(); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false); + assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false); + assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false, false); actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false); + assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false); } @Test @@ -160,6 +165,7 @@ public void testSerdeWithStringinKillDataSourceWhitelist() throws Exception 1, ImmutableSet.of(), 0, + false, false ); } @@ -260,7 +266,7 @@ public void testHandleMissingPercentOfSegmentsToConsiderPerMove() throws Excepti ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); - assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false); + assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false); } @Test @@ -290,7 +296,7 @@ public void testSerdeWithKillAllDataSources() throws Exception CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false, false); //ensure whitelist is empty when killAllDataSources is true try { @@ -337,7 +343,7 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti CoordinatorDynamicConfig.class ); - assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false); + assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false, false); } @Test @@ -361,6 +367,7 @@ public void testBuilderDefaults() 0, emptyList, 70, + false, false ); } @@ -376,7 +383,7 @@ public void testUpdate() Assert.assertEquals( current, new CoordinatorDynamicConfig - .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) + .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) .build(current) ); } @@ -406,7 +413,8 @@ private void assertConfig( int expectedMaxSegmentsInNodeLoadingQueue, Set decommissioningNodes, int decommissioningMaxPercentOfMaxSegmentsToMove, - boolean pauseCoordination + boolean pauseCoordination, + boolean replicateAfterLoadTimeout ) { Assert.assertEquals( @@ -433,5 +441,6 @@ private void assertConfig( config.getDecommissioningMaxPercentOfMaxSegmentsToMove() ); Assert.assertEquals(pauseCoordination, config.getPauseCoordination()); + Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout()); } }