From 08fad737dcc473e380f646e5b98fa742d2c1c94f Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Wed, 10 Jul 2024 10:03:04 -0700
Subject: [PATCH 01/11] Round-robin iterator for datasources to kill.
Currently there's a fairness problem in the KillUnusedSegments duty
where the duty consistently selects the same set of datasources as discovered
from the metadata store or dynamic config params. This is a problem especially
when there are multiple unused. In a medium to large cluster, while we can increase
the task slots to increase the likelihood of broader coverage. This patch adds a simple
round-robin iterator to select datasources and has the following properties:
1. Starts with an initial random cursor position in an ordered list of candidates.
2. Consecutive {@code next()} iterations from {@link #getIterator()} are guaranteed to be deterministic
unless the set of candidates change when {@link #updateCandidates(Set)} is called.
3. Guarantees that no duplicate candidates are returned in two consecutive {@code next()} iterations.
---
.../metadata/SegmentsMetadataManager.java | 2 +-
.../coordinator/duty/KillUnusedSegments.java | 99 ++++++--
.../coordinator/duty/RoundRobinIterator.java | 120 +++++++++
.../duty/KillUnusedSegmentsTest.java | 127 +++++++++-
.../duty/RoundRobinIteratorTest.java | 229 ++++++++++++++++++
5 files changed, 549 insertions(+), 28 deletions(-)
create mode 100644 server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
index b0aaa54d5bac..1bf942df9f80 100644
--- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
@@ -201,7 +201,7 @@ Iterable iterateAllUnusedSegmentsForDatasource(
*/
List getUnusedSegmentIntervals(
String dataSource,
- DateTime minStartTime,
+ @Nullable DateTime minStartTime,
DateTime maxEndTime,
int limit,
DateTime maxUsedStatusLastUpdatedTime
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 575c320b9e0a..acfc2aa4b81d 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator.duty;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatusPlus;
@@ -40,10 +41,11 @@
import org.joda.time.Interval;
import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -75,14 +77,23 @@ public class KillUnusedSegments implements CoordinatorDuty
private final Duration durationToRetain;
private final boolean ignoreDurationToRetain;
private final int maxSegmentsToKill;
+ private final Duration bufferPeriod;
/**
* Used to keep track of the last interval end time that was killed for each
* datasource.
*/
private final Map datasourceToLastKillIntervalEnd;
+
+ /**
+ * State that is maintained in the duty to determine if the duty needs to run or not.
+ */
private DateTime lastKillTime;
- private final Duration bufferPeriod;
+
+ /**
+ * Round-robin iterator of the datasources to kill. It's updated in every run by the duty.
+ */
+ private final RoundRobinIterator datasourceIterator;
private final SegmentsMetadataManager segmentsMetadataManager;
private final OverlordClient overlordClient;
@@ -93,8 +104,18 @@ public KillUnusedSegments(
KillUnusedSegmentsConfig killConfig
)
{
- this.period = killConfig.getCleanupPeriod();
+ this(segmentsMetadataManager, overlordClient, killConfig, new RoundRobinIterator());
+ }
+ @VisibleForTesting
+ KillUnusedSegments(
+ SegmentsMetadataManager segmentsMetadataManager,
+ OverlordClient overlordClient,
+ KillUnusedSegmentsConfig killConfig,
+ RoundRobinIterator robinUniqueIterator
+ )
+ {
+ this.period = killConfig.getCleanupPeriod();
this.maxSegmentsToKill = killConfig.getMaxSegments();
this.ignoreDurationToRetain = killConfig.isIgnoreDurationToRetain();
this.durationToRetain = killConfig.getDurationToRetain();
@@ -107,8 +128,6 @@ public KillUnusedSegments(
}
this.bufferPeriod = killConfig.getBufferPeriod();
- datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();
-
log.info(
"Kill task scheduling enabled with period[%s], durationToRetain[%s], bufferPeriod[%s], maxSegmentsToKill[%s]",
this.period,
@@ -119,6 +138,8 @@ public KillUnusedSegments(
this.segmentsMetadataManager = segmentsMetadataManager;
this.overlordClient = overlordClient;
+ this.datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();
+ this.datasourceIterator = robinUniqueIterator;
}
@Override
@@ -127,7 +148,7 @@ public DruidCoordinatorRuntimeParams run(final DruidCoordinatorRuntimeParams par
if (canDutyRun()) {
return runInternal(params);
} else {
- log.debug(
+ log.info(
"Skipping KillUnusedSegments until period[%s] has elapsed after lastKillTime[%s].",
period, lastKillTime
);
@@ -141,18 +162,24 @@ private DruidCoordinatorRuntimeParams runInternal(final DruidCoordinatorRuntimeP
final CoordinatorRunStats stats = params.getCoordinatorStats();
final int availableKillTaskSlots = getAvailableKillTaskSlots(dynamicConfig, stats);
- Collection dataSourcesToKill = dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
-
- if (availableKillTaskSlots > 0) {
- // If no datasource has been specified, all are eligible for killing unused segments
- if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
- dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
- }
+ if (availableKillTaskSlots <= 0) {
+ log.info("Skipping KillUnusedSegments because there are no available kill task slots.");
+ return params;
+ }
- lastKillTime = DateTimes.nowUtc();
- killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
+ final Set dataSourcesToKill;
+ if (!CollectionUtils.isNullOrEmpty(dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn())) {
+ dataSourcesToKill = dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
+ } else {
+ // If no datasource has been specified, all are eligible for killing unused segments by default
+ dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
}
+ datasourceIterator.updateCandidates(dataSourcesToKill);
+ lastKillTime = DateTimes.nowUtc();
+
+ killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
+
// any datasources that are no longer being considered for kill should have their
// last kill interval removed from map.
datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
@@ -163,30 +190,44 @@ private DruidCoordinatorRuntimeParams runInternal(final DruidCoordinatorRuntimeP
* Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto {@code availableKillTaskSlots}.
*/
private void killUnusedSegments(
- @Nullable final Collection dataSourcesToKill,
+ final Set dataSourcesToKill,
final int availableKillTaskSlots,
final CoordinatorRunStats stats
)
{
- if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || availableKillTaskSlots <= 0) {
+ if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
+ log.info("Skipping KillUnusedSegments because there are no datasources to kill.");
stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
return;
}
+ final Iterator dataSourcesToKillIterator = this.datasourceIterator.getIterator();
+ final Set remainingDatasourcesToKill = new HashSet<>(dataSourcesToKill);
+ final Set datasourcesKilled = new HashSet<>();
- final Collection remainingDatasourcesToKill = new ArrayList<>(dataSourcesToKill);
int submittedTasks = 0;
- for (String dataSource : dataSourcesToKill) {
+ while (dataSourcesToKillIterator.hasNext()) {
+ if (remainingDatasourcesToKill.size() == 0) {
+ log.info(
+ "Submitted [%d] kill tasks for [%d] datasources. No more datasource to kill in this cycle.",
+ submittedTasks, datasourcesKilled.size()
+ );
+ break;
+ }
+
if (submittedTasks >= availableKillTaskSlots) {
log.info(
- "Submitted [%d] kill tasks and reached kill task slot limit [%d].",
- submittedTasks, availableKillTaskSlots
+ "Submitted [%d] kill tasks for [%d] datasources and reached kill task slot limit [%d].",
+ submittedTasks, datasourcesKilled.size(), availableKillTaskSlots
);
break;
}
+
+ final String dataSource = dataSourcesToKillIterator.next();
final DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus(bufferPeriod);
final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime, stats);
if (intervalToKill == null) {
datasourceToLastKillIntervalEnd.remove(dataSource);
+ remainingDatasourcesToKill.remove(dataSource);
continue;
}
@@ -203,8 +244,9 @@ private void killUnusedSegments(
true
);
++submittedTasks;
- datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
+ datasourcesKilled.add(dataSource);
remainingDatasourcesToKill.remove(dataSource);
+ datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
}
catch (Exception ex) {
log.error(ex, "Failed to submit kill task for dataSource[%s] in interval[%s]", dataSource, intervalToKill);
@@ -216,8 +258,8 @@ private void killUnusedSegments(
}
log.info(
- "Submitted [%d] kill tasks for [%d] datasources. Remaining datasources to kill: %s",
- submittedTasks, dataSourcesToKill.size() - remainingDatasourcesToKill.size(), remainingDatasourcesToKill
+ "Submitted [%d] kill tasks for [%d] datasources: [%s]. Remaining datasources to kill: [%s]",
+ submittedTasks, datasourcesKilled.size(), datasourcesKilled, remainingDatasourcesToKill
);
stats.add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
@@ -230,13 +272,14 @@ private Interval findIntervalForKill(
final CoordinatorRunStats stats
)
{
+ final DateTime minStartTime = datasourceToLastKillIntervalEnd.get(dataSource);
final DateTime maxEndTime = ignoreDurationToRetain
? DateTimes.COMPARE_DATE_AS_STRING_MAX
: DateTimes.nowUtc().minus(durationToRetain);
final List unusedSegmentIntervals = segmentsMetadataManager.getUnusedSegmentIntervals(
dataSource,
- datasourceToLastKillIntervalEnd.get(dataSource),
+ minStartTime,
maxEndTime,
maxSegmentsToKill,
maxUsedStatusLastUpdatedTime
@@ -249,6 +292,10 @@ private Interval findIntervalForKill(
stats.add(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, datasourceKey, unusedSegmentIntervals.size());
if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
+ log.info(
+ "No segments found to kill for datasource[%s] in [%s/%s].",
+ dataSource, minStartTime, maxEndTime
+ );
return null;
} else if (unusedSegmentIntervals.size() == 1) {
return unusedSegmentIntervals.get(0);
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
new file mode 100644
index 000000000000..d43b0424f52c
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * A round-robin iterator that has the following properties:
+ *
+ * - Starts with an initial random cursor position in an ordered list of candidates.
+ * - Consecutive {@code next()} iterations from {@link #getIterator()} are guaranteed to be deterministic
+ * unless the set of candidates change when {@link #updateCandidates(Set)} is called.
+ * - Guarantees that no duplicate candidates are returned in two consecutive {@code next()} iterations.
+ *
+ */
+public class RoundRobinIterator
+{
+ private final List candidates = new ArrayList<>();
+ private int currentPosition;
+ private String previousCandidate;
+
+ /**
+ * Update the candidates with the supplied input if the input set is different from the current candidates.
+ * If the input set is the same as the existing candidates, the cursor position remains unchanged so the iteration order
+ * is determistic and resumed. If the input set is different, the cursor position is reset to a random location in
+ * the new list of sorted candidates.
+ */
+ public void updateCandidates(final Set input)
+ {
+ if (new HashSet<>(candidates).equals(input)) {
+ return;
+ }
+ this.candidates.clear();
+ this.candidates.addAll(input);
+ Collections.sort(this.candidates);
+ this.currentPosition = getInitialCursorPosition(input.size());
+ }
+
+ public Iterator getIterator()
+ {
+ return new Iterator()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return candidates.size() > 0;
+ }
+
+ @Override
+ public String next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ // If the next candidate to be served is the same as the previous candidate served (duplicate),
+ // advance the cursor twice. Otherwise, advance the cursor only once.
+ String nextCandidate = peakNextCandiate();
+ advanceCursor();
+ if (nextCandidate.equals(previousCandidate)) {
+ nextCandidate = peakNextCandiate();
+ advanceCursor();
+ }
+ previousCandidate = nextCandidate;
+ return nextCandidate;
+ }
+
+ private String peakNextCandiate()
+ {
+ final int nextPosition = currentPosition < candidates.size() ? currentPosition : 0;
+ return candidates.get(nextPosition);
+ }
+
+ private void advanceCursor()
+ {
+ if (++currentPosition >= candidates.size()) {
+ currentPosition = 0;
+ }
+ }
+ };
+ }
+
+ @VisibleForTesting
+ int getInitialCursorPosition(int maxSize)
+ {
+ return maxSize <= 0 ? 0 : ThreadLocalRandom.current().nextInt(0, maxSize);
+ }
+
+ @VisibleForTesting
+ int getCurrentPosition()
+ {
+ return this.currentPosition;
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 9d0f752869e2..969c295a5006 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -225,6 +225,74 @@ public void testKillWithMultipleDatasources()
validateLastKillStateAndReset(DS2, null);
}
+ @Test
+ public void testKillWithMultipleDatasourcesRoundRobin()
+ {
+ configBuilder.withIgnoreDurationToRetain(true)
+ .withMaxSegmentsToKill(2);
+ dynamicConfigBuilder.withMaxKillTaskSlots(2);
+
+ createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS1, NEXT_DAY, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS1, NEXT_MONTH, VERSION, NOW.minusDays(1));
+
+ createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));
+
+ createAndAddUnusedSegment(DS3, YEAR_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS3, DAY_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS3, NEXT_DAY, VERSION, NOW.minusDays(1));
+
+ initDuty();
+ CoordinatorRunStats stats = runDutyAndGetStats();
+
+ Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
+ Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
+
+ validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
+ validateLastKillStateAndReset(DS2, new Interval(YEAR_OLD.getStart(), DAY_OLD.getEnd()));
+
+ stats = runDutyAndGetStats();
+
+ Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(4, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(4, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
+ Assert.assertEquals(4, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
+
+ validateLastKillStateAndReset(DS3, new Interval(YEAR_OLD.getStart(), DAY_OLD.getEnd()));
+ validateLastKillStateAndReset(DS1, new Interval(DAY_OLD.getStart(), NEXT_DAY.getEnd()));
+
+ stats = runDutyAndGetStats();
+
+ Assert.assertEquals(6, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(6, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(6, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
+ Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
+
+ validateLastKillStateAndReset(DS2, NEXT_DAY);
+ validateLastKillStateAndReset(DS3, NEXT_DAY);
+
+ stats = runDutyAndGetStats();
+
+ Assert.assertEquals(8, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(7, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(8, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(5, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
+ Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
+
+ validateLastKillStateAndReset(DS1, NEXT_MONTH);
+ validateLastKillStateAndReset(DS2, null);
+ validateLastKillStateAndReset(DS3, null);
+ }
+
/**
* The {@code DAY_OLD} and {@code HOUR_OLD} segments are "more recent" in terms of last updated time.
* Even though they fall within the umbrella kill interval computed by the duty, the kill task will narrow down to
@@ -407,6 +475,36 @@ public void testLowerMaxSegmentsToKill()
validateLastKillStateAndReset(DS1, YEAR_OLD);
}
+ @Test
+ public void testDatasourcesFoundButNoUnusedSegments()
+ {
+ configBuilder.withMaxSegmentsToKill(1);
+
+ // create a datasource but no unused segments yet.
+ createAndAddUsedSegment(DS1, YEAR_OLD, VERSION);
+
+ initDuty();
+ CoordinatorRunStats stats = runDutyAndGetStats();
+
+ Assert.assertEquals(10, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(0, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(10, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(0, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
+
+ createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(10));
+ createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(10));
+ createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(2));
+
+ stats = runDutyAndGetStats();
+
+ Assert.assertEquals(20, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(20, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
+
+ validateLastKillStateAndReset(DS1, YEAR_OLD);
+ }
+
/**
* The kill period is honored after the first indexing run.
*/
@@ -723,6 +821,21 @@ private void validateLastKillStateAndReset(final String dataSource, @Nullable fi
overlordClient.deleteLastKillInterval(dataSource);
}
+ private void createAndAddUsedSegment(
+ final String dataSource,
+ final Interval interval,
+ final String version
+ )
+ {
+ final DataSegment segment = createSegment(dataSource, interval, version);
+ try {
+ SqlSegmentsMetadataManagerTestBase.publishSegment(connector, config, TestHelper.makeJsonMapper(), segment);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private void createAndAddUnusedSegment(
final String dataSource,
final Interval interval,
@@ -758,7 +871,19 @@ private DataSegment createSegment(final String dataSource, final Interval interv
private void initDuty()
{
- killDuty = new KillUnusedSegments(sqlSegmentsMetadataManager, overlordClient, configBuilder.build());
+ killDuty = new KillUnusedSegments(
+ sqlSegmentsMetadataManager,
+ overlordClient,
+ configBuilder.build(),
+ new RoundRobinIterator()
+ {
+ @Override
+ int getInitialCursorPosition(int maxSize)
+ {
+ return 0;
+ }
+ }
+ );
}
private CoordinatorRunStats runDutyAndGetStats()
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
new file mode 100644
index 000000000000..b49eb08241bf
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.collect.ImmutableSet;
+import junitparams.converters.Nullable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class RoundRobinIteratorTest
+{
+ @Test
+ public void testIterationOrder()
+ {
+ final RoundRobinIterator rrIterator = new RoundRobinIterator()
+ {
+ @Override
+ int getInitialCursorPosition(int maxSize)
+ {
+ return 1;
+ }
+ };
+ rrIterator.updateCandidates(ImmutableSet.of("a", "b", "c"));
+
+ final Iterator it = rrIterator.getIterator();
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("b", it.next());
+
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("c", it.next());
+
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("a", it.next());
+
+ rrIterator.updateCandidates(ImmutableSet.of("x", "y"));
+
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("y", it.next());
+
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("x", it.next());
+ }
+
+ @Test
+ public void testNoConsecutiveDuplicates()
+ {
+ final RoundRobinIterator rrIterator = new RoundRobinIterator()
+ {
+ @Override
+ int getInitialCursorPosition(int maxSize)
+ {
+ return 0;
+ }
+ };
+ rrIterator.updateCandidates(ImmutableSet.of("a"));
+
+ final Iterator it = rrIterator.getIterator();
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("a", it.next());
+
+ rrIterator.updateCandidates(ImmutableSet.of("a", "b"));
+
+ // next() should skip the duplicate "a" as it was the last value served
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("b", it.next());
+
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("a", it.next());
+
+ rrIterator.updateCandidates(ImmutableSet.of("b", "c"));
+
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("b", it.next());
+
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("c", it.next());
+ }
+
+ @Test
+ public void testMultipleUpdatesWithSameCandidates()
+ {
+ final Set input = ImmutableSet.of("1", "2", "3");
+ final RoundRobinIterator rrIterator = new RoundRobinIterator()
+ {
+ @Override
+ int getInitialCursorPosition(int maxSize)
+ {
+ return 0;
+ }
+ };
+ rrIterator.updateCandidates(input);
+ final Iterator it = rrIterator.getIterator();
+
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("1", it.next());
+
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("2", it.next());
+
+ rrIterator.updateCandidates(input);
+
+ // The cursor should just resume from the last position instead of resetting to 0.
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("3", it.next());
+
+ Assert.assertTrue(it.hasNext());
+ Assert.assertEquals("1", it.next());
+ }
+
+ @Test
+ public void testEmpty()
+ {
+ final RoundRobinIterator rrIterator = new RoundRobinIterator();
+ final Iterator it = rrIterator.getIterator();
+
+ Assert.assertFalse(it.hasNext());
+ Assert.assertThrows(NoSuchElementException.class, it::next);
+ }
+
+ /**
+ * Similar to the above tests but with the built-in randomness.
+ */
+ @Test
+ public void testMultipleUpdatesWithRandomCursorPositions()
+ {
+ final RoundRobinIterator rrIterator = new RoundRobinIterator();
+
+ Set input = ImmutableSet.of("a", "b", "c");
+ rrIterator.updateCandidates(input);
+
+ List expectedList = getExpectedList(input, rrIterator.getCurrentPosition(), null);
+ List actualList = getActualList(rrIterator.getIterator(), input.size());
+ Assert.assertEquals(expectedList, actualList);
+ String lastValue = actualList.get(actualList.size() - 1);
+
+ // Second update
+ input = ImmutableSet.of("c", "d");
+ rrIterator.updateCandidates(input);
+
+ expectedList = getExpectedList(input, rrIterator.getCurrentPosition(), lastValue);
+ actualList = getActualList(rrIterator.getIterator(), input.size());
+ Assert.assertEquals(expectedList, actualList);
+
+ Assert.assertNotEquals(lastValue, actualList.get(0));
+ lastValue = actualList.get(actualList.size() - 1);
+
+ // Third update
+ input = ImmutableSet.of("d", "e");
+ rrIterator.updateCandidates(ImmutableSet.of("d", "e"));
+
+ expectedList = getExpectedList(input, rrIterator.getCurrentPosition(), lastValue);
+ actualList = getActualList(rrIterator.getIterator(), input.size());
+ Assert.assertEquals(expectedList, actualList);
+
+ Assert.assertNotEquals(lastValue, actualList.get(0));
+ lastValue = actualList.get(actualList.size() - 1);
+
+ // Fourth update
+ input = ImmutableSet.of("e", "f", "h");
+ rrIterator.updateCandidates(input);
+
+ expectedList = getExpectedList(input, rrIterator.getCurrentPosition(), lastValue);
+ actualList = getActualList(rrIterator.getIterator(), input.size());
+ Assert.assertEquals(expectedList, actualList);
+ Assert.assertNotEquals(lastValue, actualList.get(0));
+ }
+
+ private List getExpectedList(
+ final Set input,
+ final int cursorPosition,
+ @Nullable final String previousValue
+ )
+ {
+ final List sortedList = input.stream().sorted().collect(Collectors.toList());
+ final List expectedList = new ArrayList<>();
+
+ // Adjust the cursor position if the value at the cursor position from the sorted list
+ // is the same as the previous value.
+ final int n = sortedList.size();
+ int adjustedCursorPosition = cursorPosition;
+ if (sortedList.get(adjustedCursorPosition % n).equals(previousValue)) {
+ adjustedCursorPosition = (adjustedCursorPosition + 1) % n;
+ }
+
+ for (int i = 0; i < n; i++) {
+ int index = (adjustedCursorPosition + i) % n;
+ String val = sortedList.get(index);
+ expectedList.add(val);
+ }
+ return expectedList;
+ }
+
+ private List getActualList(final Iterator it, final int maxSize)
+ {
+ final List actualList = new ArrayList<>();
+ int cnt = 0;
+ while (it.hasNext()) {
+ actualList.add(it.next());
+ if (++cnt >= maxSize) {
+ break;
+ }
+ }
+ return actualList;
+ }
+}
From 5115e8e64c8030c26d5238514286e02796d934e5 Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Wed, 10 Jul 2024 10:59:13 -0700
Subject: [PATCH 02/11] Renames in RoundRobinIteratorTest.
---
.../duty/RoundRobinIteratorTest.java | 59 +++++++++----------
1 file changed, 29 insertions(+), 30 deletions(-)
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
index b49eb08241bf..497d78ccc2bf 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
@@ -152,78 +152,77 @@ public void testMultipleUpdatesWithRandomCursorPositions()
Set input = ImmutableSet.of("a", "b", "c");
rrIterator.updateCandidates(input);
- List expectedList = getExpectedList(input, rrIterator.getCurrentPosition(), null);
- List actualList = getActualList(rrIterator.getIterator(), input.size());
- Assert.assertEquals(expectedList, actualList);
- String lastValue = actualList.get(actualList.size() - 1);
+ List exepctedCandidates = getExepctedCandidates(input, rrIterator.getCurrentPosition(), null);
+ List actualCandidates = getObservedCandidates(rrIterator.getIterator(), input.size());
+ Assert.assertEquals(exepctedCandidates, actualCandidates);
+ String lastValue = actualCandidates.get(actualCandidates.size() - 1);
// Second update
input = ImmutableSet.of("c", "d");
rrIterator.updateCandidates(input);
- expectedList = getExpectedList(input, rrIterator.getCurrentPosition(), lastValue);
- actualList = getActualList(rrIterator.getIterator(), input.size());
- Assert.assertEquals(expectedList, actualList);
+ exepctedCandidates = getExepctedCandidates(input, rrIterator.getCurrentPosition(), lastValue);
+ actualCandidates = getObservedCandidates(rrIterator.getIterator(), input.size());
+ Assert.assertEquals(exepctedCandidates, actualCandidates);
- Assert.assertNotEquals(lastValue, actualList.get(0));
- lastValue = actualList.get(actualList.size() - 1);
+ Assert.assertNotEquals(lastValue, actualCandidates.get(0));
+ lastValue = actualCandidates.get(actualCandidates.size() - 1);
// Third update
input = ImmutableSet.of("d", "e");
rrIterator.updateCandidates(ImmutableSet.of("d", "e"));
- expectedList = getExpectedList(input, rrIterator.getCurrentPosition(), lastValue);
- actualList = getActualList(rrIterator.getIterator(), input.size());
- Assert.assertEquals(expectedList, actualList);
+ exepctedCandidates = getExepctedCandidates(input, rrIterator.getCurrentPosition(), lastValue);
+ actualCandidates = getObservedCandidates(rrIterator.getIterator(), input.size());
+ Assert.assertEquals(exepctedCandidates, actualCandidates);
- Assert.assertNotEquals(lastValue, actualList.get(0));
- lastValue = actualList.get(actualList.size() - 1);
+ Assert.assertNotEquals(lastValue, actualCandidates.get(0));
+ lastValue = actualCandidates.get(actualCandidates.size() - 1);
// Fourth update
input = ImmutableSet.of("e", "f", "h");
rrIterator.updateCandidates(input);
- expectedList = getExpectedList(input, rrIterator.getCurrentPosition(), lastValue);
- actualList = getActualList(rrIterator.getIterator(), input.size());
- Assert.assertEquals(expectedList, actualList);
- Assert.assertNotEquals(lastValue, actualList.get(0));
+ exepctedCandidates = getExepctedCandidates(input, rrIterator.getCurrentPosition(), lastValue);
+ actualCandidates = getObservedCandidates(rrIterator.getIterator(), input.size());
+ Assert.assertEquals(exepctedCandidates, actualCandidates);
+ Assert.assertNotEquals(lastValue, actualCandidates.get(0));
}
- private List getExpectedList(
+ private List getExepctedCandidates(
final Set input,
final int cursorPosition,
@Nullable final String previousValue
)
{
- final List sortedList = input.stream().sorted().collect(Collectors.toList());
- final List expectedList = new ArrayList<>();
+ final List sortedCandidates = input.stream().sorted().collect(Collectors.toList());
+ final List expectedCandidates = new ArrayList<>();
// Adjust the cursor position if the value at the cursor position from the sorted list
// is the same as the previous value.
- final int n = sortedList.size();
+ final int n = sortedCandidates.size();
int adjustedCursorPosition = cursorPosition;
- if (sortedList.get(adjustedCursorPosition % n).equals(previousValue)) {
+ if (sortedCandidates.get(adjustedCursorPosition % n).equals(previousValue)) {
adjustedCursorPosition = (adjustedCursorPosition + 1) % n;
}
for (int i = 0; i < n; i++) {
int index = (adjustedCursorPosition + i) % n;
- String val = sortedList.get(index);
- expectedList.add(val);
+ expectedCandidates.add(sortedCandidates.get(index));
}
- return expectedList;
+ return expectedCandidates;
}
- private List getActualList(final Iterator it, final int maxSize)
+ private List getObservedCandidates(final Iterator it, final int maxSize)
{
- final List actualList = new ArrayList<>();
+ final List observedCandidates = new ArrayList<>();
int cnt = 0;
while (it.hasNext()) {
- actualList.add(it.next());
+ observedCandidates.add(it.next());
if (++cnt >= maxSize) {
break;
}
}
- return actualList;
+ return observedCandidates;
}
}
From d9c572bc1409b36fa1d63e97fc2c2168bf0640dc Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Wed, 10 Jul 2024 13:19:20 -0700
Subject: [PATCH 03/11] Address review comments.
1. Clarify javadocs on the ordered list. Also flesh out the details a bit more.
2. Rename the test hooks to make intent clearer and fix typo.
3. Add NotThreadSafe annotation.
4. Remove one potentially noisy log that's in the path of iteration.
---
.../coordinator/duty/KillUnusedSegments.java | 4 ---
.../coordinator/duty/RoundRobinIterator.java | 29 +++++++++++--------
.../duty/KillUnusedSegmentsTest.java | 2 +-
.../duty/RoundRobinIteratorTest.java | 24 +++++++--------
4 files changed, 30 insertions(+), 29 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index acfc2aa4b81d..5ca691fe1c92 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -292,10 +292,6 @@ private Interval findIntervalForKill(
stats.add(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, datasourceKey, unusedSegmentIntervals.size());
if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) {
- log.info(
- "No segments found to kill for datasource[%s] in [%s/%s].",
- dataSource, minStartTime, maxEndTime
- );
return null;
} else if (unusedSegmentIntervals.size() == 1) {
return unusedSegmentIntervals.get(0);
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
index d43b0424f52c..038e518ada97 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
@@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
+import javax.annotation.concurrent.NotThreadSafe;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -31,18 +32,22 @@
import java.util.concurrent.ThreadLocalRandom;
/**
- * A round-robin iterator that has the following properties:
+ * A round-robin iterator that is always backed by an ordered list of candidates containing no duplicates.
+ * The iterator has the following properties:
*
* - Starts with an initial random cursor position in an ordered list of candidates.
- * - Consecutive {@code next()} iterations from {@link #getIterator()} are guaranteed to be deterministic
- * unless the set of candidates change when {@link #updateCandidates(Set)} is called.
+ * - Invoking {@code next()} on {@link #getIterator()} is guaranteed to be deterministic
+ * unless the set of candidates change when {@link #updateCandidates(Set)} is called. When the candidates change,
+ * the cursor is reset to a random position in the new list of ordered candidates.
* - Guarantees that no duplicate candidates are returned in two consecutive {@code next()} iterations.
*
+ *
*/
+@NotThreadSafe
public class RoundRobinIterator
{
private final List candidates = new ArrayList<>();
- private int currentPosition;
+ private int cursorPosition;
private String previousCandidate;
/**
@@ -59,7 +64,7 @@ public void updateCandidates(final Set input)
this.candidates.clear();
this.candidates.addAll(input);
Collections.sort(this.candidates);
- this.currentPosition = getInitialCursorPosition(input.size());
+ this.cursorPosition = generateRandomCursorPosition(input.size());
}
public Iterator getIterator()
@@ -93,28 +98,28 @@ public String next()
private String peakNextCandiate()
{
- final int nextPosition = currentPosition < candidates.size() ? currentPosition : 0;
+ final int nextPosition = cursorPosition < candidates.size() ? cursorPosition : 0;
return candidates.get(nextPosition);
}
private void advanceCursor()
{
- if (++currentPosition >= candidates.size()) {
- currentPosition = 0;
+ if (++cursorPosition >= candidates.size()) {
+ cursorPosition = 0;
}
}
};
}
@VisibleForTesting
- int getInitialCursorPosition(int maxSize)
+ int generateRandomCursorPosition(final int maxBound)
{
- return maxSize <= 0 ? 0 : ThreadLocalRandom.current().nextInt(0, maxSize);
+ return maxBound <= 0 ? 0 : ThreadLocalRandom.current().nextInt(0, maxBound);
}
@VisibleForTesting
- int getCurrentPosition()
+ int getCurrentCursorPosition()
{
- return this.currentPosition;
+ return this.cursorPosition;
}
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 969c295a5006..eb53c43bf91d 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -878,7 +878,7 @@ private void initDuty()
new RoundRobinIterator()
{
@Override
- int getInitialCursorPosition(int maxSize)
+ int generateRandomCursorPosition(final int maxBound)
{
return 0;
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
index 497d78ccc2bf..56f6e1ee11c8 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
@@ -39,7 +39,7 @@ public void testIterationOrder()
final RoundRobinIterator rrIterator = new RoundRobinIterator()
{
@Override
- int getInitialCursorPosition(int maxSize)
+ int generateRandomCursorPosition(final int maxBound)
{
return 1;
}
@@ -71,7 +71,7 @@ public void testNoConsecutiveDuplicates()
final RoundRobinIterator rrIterator = new RoundRobinIterator()
{
@Override
- int getInitialCursorPosition(int maxSize)
+ int generateRandomCursorPosition(final int maxBound)
{
return 0;
}
@@ -107,7 +107,7 @@ public void testMultipleUpdatesWithSameCandidates()
final RoundRobinIterator rrIterator = new RoundRobinIterator()
{
@Override
- int getInitialCursorPosition(int maxSize)
+ int generateRandomCursorPosition(final int maxBound)
{
return 0;
}
@@ -152,18 +152,18 @@ public void testMultipleUpdatesWithRandomCursorPositions()
Set input = ImmutableSet.of("a", "b", "c");
rrIterator.updateCandidates(input);
- List exepctedCandidates = getExepctedCandidates(input, rrIterator.getCurrentPosition(), null);
+ List expectedCandidates = getexpectedCandidates(input, rrIterator.getCurrentCursorPosition(), null);
List actualCandidates = getObservedCandidates(rrIterator.getIterator(), input.size());
- Assert.assertEquals(exepctedCandidates, actualCandidates);
+ Assert.assertEquals(expectedCandidates, actualCandidates);
String lastValue = actualCandidates.get(actualCandidates.size() - 1);
// Second update
input = ImmutableSet.of("c", "d");
rrIterator.updateCandidates(input);
- exepctedCandidates = getExepctedCandidates(input, rrIterator.getCurrentPosition(), lastValue);
+ expectedCandidates = getexpectedCandidates(input, rrIterator.getCurrentCursorPosition(), lastValue);
actualCandidates = getObservedCandidates(rrIterator.getIterator(), input.size());
- Assert.assertEquals(exepctedCandidates, actualCandidates);
+ Assert.assertEquals(expectedCandidates, actualCandidates);
Assert.assertNotEquals(lastValue, actualCandidates.get(0));
lastValue = actualCandidates.get(actualCandidates.size() - 1);
@@ -172,9 +172,9 @@ public void testMultipleUpdatesWithRandomCursorPositions()
input = ImmutableSet.of("d", "e");
rrIterator.updateCandidates(ImmutableSet.of("d", "e"));
- exepctedCandidates = getExepctedCandidates(input, rrIterator.getCurrentPosition(), lastValue);
+ expectedCandidates = getexpectedCandidates(input, rrIterator.getCurrentCursorPosition(), lastValue);
actualCandidates = getObservedCandidates(rrIterator.getIterator(), input.size());
- Assert.assertEquals(exepctedCandidates, actualCandidates);
+ Assert.assertEquals(expectedCandidates, actualCandidates);
Assert.assertNotEquals(lastValue, actualCandidates.get(0));
lastValue = actualCandidates.get(actualCandidates.size() - 1);
@@ -183,13 +183,13 @@ public void testMultipleUpdatesWithRandomCursorPositions()
input = ImmutableSet.of("e", "f", "h");
rrIterator.updateCandidates(input);
- exepctedCandidates = getExepctedCandidates(input, rrIterator.getCurrentPosition(), lastValue);
+ expectedCandidates = getexpectedCandidates(input, rrIterator.getCurrentCursorPosition(), lastValue);
actualCandidates = getObservedCandidates(rrIterator.getIterator(), input.size());
- Assert.assertEquals(exepctedCandidates, actualCandidates);
+ Assert.assertEquals(expectedCandidates, actualCandidates);
Assert.assertNotEquals(lastValue, actualCandidates.get(0));
}
- private List getExepctedCandidates(
+ private List getexpectedCandidates(
final Set input,
final int cursorPosition,
@Nullable final String previousValue
From 91b0ddab900a2a083e2f35ace7c3b91b6506272c Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Wed, 10 Jul 2024 13:28:10 -0700
Subject: [PATCH 04/11] Add null check to input candidates.
---
.../server/coordinator/duty/RoundRobinIterator.java | 10 +++++++---
.../coordinator/duty/RoundRobinIteratorTest.java | 10 +++++++++-
2 files changed, 16 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
index 038e518ada97..cdc9c575e517 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
@@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator.duty;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.error.InvalidInput;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.ArrayList;
@@ -53,11 +54,14 @@ public class RoundRobinIterator
/**
* Update the candidates with the supplied input if the input set is different from the current candidates.
* If the input set is the same as the existing candidates, the cursor position remains unchanged so the iteration order
- * is determistic and resumed. If the input set is different, the cursor position is reset to a random location in
- * the new list of sorted candidates.
+ * is determistic and resumed. If the input set is different, the cursor position is reset to a random position in
+ * the new list of sorted candidates. The supplied input must be non-null.
*/
public void updateCandidates(final Set input)
{
+ if (input == null) {
+ throw InvalidInput.exception("Supplied input candidates must be non-null.");
+ }
if (new HashSet<>(candidates).equals(input)) {
return;
}
@@ -114,7 +118,7 @@ private void advanceCursor()
@VisibleForTesting
int generateRandomCursorPosition(final int maxBound)
{
- return maxBound <= 0 ? 0 : ThreadLocalRandom.current().nextInt(0, maxBound);
+ return maxBound <= 0 ? 0 : ThreadLocalRandom.current().nextInt(maxBound);
}
@VisibleForTesting
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
index 56f6e1ee11c8..7ad3363e4eaf 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableSet;
import junitparams.converters.Nullable;
+import org.apache.druid.error.DruidException;
import org.junit.Assert;
import org.junit.Test;
@@ -132,7 +133,7 @@ int generateRandomCursorPosition(final int maxBound)
}
@Test
- public void testEmpty()
+ public void testEmptyIterator()
{
final RoundRobinIterator rrIterator = new RoundRobinIterator();
final Iterator it = rrIterator.getIterator();
@@ -141,6 +142,13 @@ public void testEmpty()
Assert.assertThrows(NoSuchElementException.class, it::next);
}
+ @Test
+ public void testUpdateWithNullCandidate()
+ {
+ final RoundRobinIterator rrIterator = new RoundRobinIterator();
+ Assert.assertThrows(DruidException.class, () -> rrIterator.updateCandidates(null));
+ }
+
/**
* Similar to the above tests but with the built-in randomness.
*/
From fd61ce8a7a2d8c6a69eeb4b6b462517f77364b73 Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Wed, 10 Jul 2024 19:05:42 -0700
Subject: [PATCH 05/11] More commentary.
---
.../server/coordinator/duty/KillUnusedSegments.java | 7 ++++++-
.../server/coordinator/duty/RoundRobinIterator.java | 10 ++++++++--
.../coordinator/duty/KillUnusedSegmentsTest.java | 6 +++++-
3 files changed, 19 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 5ca691fe1c92..a84ff008a981 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -60,6 +60,10 @@
* as there can be multiple unused segments with different {@code used_status_last_updated} time.
*
*
+ * The datasources to kill during each cycle is picked using {@link #datasourceIterator}, which is refreshed as needed
+ * during each cycle of this duty.
+ *
+ *
* See {@link org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}.
*
*/
@@ -91,7 +95,8 @@ public class KillUnusedSegments implements CoordinatorDuty
private DateTime lastKillTime;
/**
- * Round-robin iterator of the datasources to kill. It's updated in every run by the duty.
+ * Round-robin iterator of the datasources to kill. It's updated in every run by the duty and must be used
+ * from a single thread.
*/
private final RoundRobinIterator datasourceIterator;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
index cdc9c575e517..f8b37a0f2488 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
@@ -33,16 +33,19 @@
import java.util.concurrent.ThreadLocalRandom;
/**
- * A round-robin iterator that is always backed by an ordered list of candidates containing no duplicates.
+ * A round-robin iterator that is backed by an ordered list of candidates containing no duplicates. The iterator
+ * iterates endlessly over all the candidates. The caller must explicitly terminate it.
* The iterator has the following properties:
*
* - Starts with an initial random cursor position in an ordered list of candidates.
- * - Invoking {@code next()} on {@link #getIterator()} is guaranteed to be deterministic
+ *
- Invoking {@code next()} on {@link #getIterator()} is guaranteed to result in a deterministic order
* unless the set of candidates change when {@link #updateCandidates(Set)} is called. When the candidates change,
* the cursor is reset to a random position in the new list of ordered candidates.
* - Guarantees that no duplicate candidates are returned in two consecutive {@code next()} iterations.
*
*
+ *
+ * This class is not thread-safe and must be used from a single thread.
*/
@NotThreadSafe
public class RoundRobinIterator
@@ -71,6 +74,9 @@ public void updateCandidates(final Set input)
this.cursorPosition = generateRandomCursorPosition(input.size());
}
+ /**
+ * Returns a round-robin iterator over the list of candidates. The caller must explicitly terminate it.
+ */
public Iterator getIterator()
{
return new Iterator()
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index eb53c43bf91d..b6ef02a650c4 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -225,8 +225,12 @@ public void testKillWithMultipleDatasources()
validateLastKillStateAndReset(DS2, null);
}
+ /**
+ * Set up multiple datasources {@link #DS1}, {@link #DS2} and {@link #DS3} with unused segments with 2 kill task
+ * slots. Running the kill duty each time should pick a different set of datasources in a round-robin manner.
+ */
@Test
- public void testKillWithMultipleDatasourcesRoundRobin()
+ public void testKillMultipleDatasourcesInRoundRobinManner()
{
configBuilder.withIgnoreDurationToRetain(true)
.withMaxSegmentsToKill(2);
From 75a6744085802f9223c689fcc3070a821efc0b62 Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Thu, 11 Jul 2024 07:20:31 -0700
Subject: [PATCH 06/11] Addres review feedback: downgrade some new info logs to
debug; invert condition.
Remove redundant comments.
Remove rendundant variable tracking.
---
.../coordinator/duty/KillUnusedSegments.java | 40 +++++++------------
1 file changed, 15 insertions(+), 25 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index a84ff008a981..7f7233aa7c1a 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
+import com.google.common.collect.Sets;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
@@ -89,15 +90,7 @@ public class KillUnusedSegments implements CoordinatorDuty
*/
private final Map datasourceToLastKillIntervalEnd;
- /**
- * State that is maintained in the duty to determine if the duty needs to run or not.
- */
private DateTime lastKillTime;
-
- /**
- * Round-robin iterator of the datasources to kill. It's updated in every run by the duty and must be used
- * from a single thread.
- */
private final RoundRobinIterator datasourceIterator;
private final SegmentsMetadataManager segmentsMetadataManager;
@@ -153,7 +146,7 @@ public DruidCoordinatorRuntimeParams run(final DruidCoordinatorRuntimeParams par
if (canDutyRun()) {
return runInternal(params);
} else {
- log.info(
+ log.debug(
"Skipping KillUnusedSegments until period[%s] has elapsed after lastKillTime[%s].",
period, lastKillTime
);
@@ -168,16 +161,15 @@ private DruidCoordinatorRuntimeParams runInternal(final DruidCoordinatorRuntimeP
final int availableKillTaskSlots = getAvailableKillTaskSlots(dynamicConfig, stats);
if (availableKillTaskSlots <= 0) {
- log.info("Skipping KillUnusedSegments because there are no available kill task slots.");
+ log.debug("Skipping KillUnusedSegments because there are no available kill task slots.");
return params;
}
final Set dataSourcesToKill;
- if (!CollectionUtils.isNullOrEmpty(dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn())) {
- dataSourcesToKill = dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
- } else {
- // If no datasource has been specified, all are eligible for killing unused segments by default
+ if (CollectionUtils.isNullOrEmpty(dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn())) {
dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames();
+ } else {
+ dataSourcesToKill = dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
}
datasourceIterator.updateCandidates(dataSourcesToKill);
@@ -201,28 +193,23 @@ private void killUnusedSegments(
)
{
if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
- log.info("Skipping KillUnusedSegments because there are no datasources to kill.");
+ log.debug("Skipping KillUnusedSegments because there are no datasources to kill.");
stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
return;
}
final Iterator dataSourcesToKillIterator = this.datasourceIterator.getIterator();
final Set remainingDatasourcesToKill = new HashSet<>(dataSourcesToKill);
- final Set datasourcesKilled = new HashSet<>();
int submittedTasks = 0;
while (dataSourcesToKillIterator.hasNext()) {
if (remainingDatasourcesToKill.size() == 0) {
- log.info(
- "Submitted [%d] kill tasks for [%d] datasources. No more datasource to kill in this cycle.",
- submittedTasks, datasourcesKilled.size()
- );
break;
}
if (submittedTasks >= availableKillTaskSlots) {
log.info(
- "Submitted [%d] kill tasks for [%d] datasources and reached kill task slot limit [%d].",
- submittedTasks, datasourcesKilled.size(), availableKillTaskSlots
+ "Submitted [%d] kill tasks and reached kill task slot limit [%d].",
+ submittedTasks, availableKillTaskSlots
);
break;
}
@@ -249,7 +236,6 @@ private void killUnusedSegments(
true
);
++submittedTasks;
- datasourcesKilled.add(dataSource);
remainingDatasourcesToKill.remove(dataSource);
datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
}
@@ -263,8 +249,12 @@ private void killUnusedSegments(
}
log.info(
- "Submitted [%d] kill tasks for [%d] datasources: [%s]. Remaining datasources to kill: [%s]",
- submittedTasks, datasourcesKilled.size(), datasourcesKilled, remainingDatasourcesToKill
+ "Submitted [%d] kill tasks for [%d] datasources: [%s]. Remaining [%d] datasources to kill: [%s].",
+ submittedTasks,
+ dataSourcesToKill.size() - remainingDatasourcesToKill.size(),
+ Sets.difference(dataSourcesToKill, remainingDatasourcesToKill),
+ remainingDatasourcesToKill.size(),
+ remainingDatasourcesToKill
);
stats.add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
From d0158b96d6865029a0f74d63cef3f6f203afde06 Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Sun, 21 Jul 2024 17:27:54 -0700
Subject: [PATCH 07/11] CircularList adjustments.
---
.../druid/collections/CircularList.java | 96 +++++++++++++++++++
.../coordinator/duty/KillUnusedSegments.java | 56 ++++++++---
.../duty/KillUnusedSegmentsTest.java | 8 +-
3 files changed, 144 insertions(+), 16 deletions(-)
create mode 100644 processing/src/main/java/org/apache/druid/collections/CircularList.java
diff --git a/processing/src/main/java/org/apache/druid/collections/CircularList.java b/processing/src/main/java/org/apache/druid/collections/CircularList.java
new file mode 100644
index 000000000000..3decc0114186
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/collections/CircularList.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ *
+ * @param
+ */
+public class CircularList implements Iterable
+{
+ private final List collection = new ArrayList<>();
+ private final Comparator super T> comparator;
+ private int currentPosition;
+
+ public CircularList(final Set elements, Comparator super T> comparator)
+ {
+ this.collection.addAll(elements);
+ this.comparator = comparator;
+ this.collection.sort(comparator);
+ }
+
+ @Override
+ public Iterator iterator()
+ {
+ return new Iterator()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return collection.size() > 0;
+ }
+
+ @Override
+ public T next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ T nextCandidate = peekNext();
+ advanceCursor();
+ return nextCandidate;
+ }
+
+ T peekNext()
+ {
+ int nextPosition = currentPosition < collection.size() ? currentPosition : 0;
+ return collection.get(nextPosition);
+ }
+ };
+ }
+
+ public void advanceCursor()
+ {
+ if (++currentPosition >= collection.size()) {
+ currentPosition = 0;
+ }
+ }
+
+ public void resetCursor(final int position)
+ {
+ this.currentPosition = position;
+ }
+
+ public boolean equalsSet(final Set s)
+ {
+ final List x = new ArrayList<>(s);
+ x.sort(comparator);
+ return collection.equals(x);
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 7f7233aa7c1a..510da3c4f4d6 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Sets;
+import org.apache.druid.collections.CircularList;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
@@ -42,12 +43,14 @@
import org.joda.time.Interval;
import javax.annotation.Nullable;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
/**
*
@@ -96,6 +99,9 @@ public class KillUnusedSegments implements CoordinatorDuty
private final SegmentsMetadataManager segmentsMetadataManager;
private final OverlordClient overlordClient;
+ private String prevDatasourceKilled;
+ private CircularList datasourceCircularKillList;
+
public KillUnusedSegments(
SegmentsMetadataManager segmentsMetadataManager,
OverlordClient overlordClient,
@@ -172,7 +178,14 @@ private DruidCoordinatorRuntimeParams runInternal(final DruidCoordinatorRuntimeP
dataSourcesToKill = dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
}
- datasourceIterator.updateCandidates(dataSourcesToKill);
+ if (datasourceCircularKillList == null ||
+ !datasourceCircularKillList.equalsSet(dataSourcesToKill)) {
+ datasourceCircularKillList = new CircularList<>(dataSourcesToKill, Comparator.naturalOrder());
+
+ final int randomPosition = generateRandomCursorPosition(dataSourcesToKill.size());
+ datasourceCircularKillList.resetCursor(randomPosition);
+ }
+
lastKillTime = DateTimes.nowUtc();
killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
@@ -183,6 +196,12 @@ private DruidCoordinatorRuntimeParams runInternal(final DruidCoordinatorRuntimeP
return params;
}
+ @VisibleForTesting
+ int generateRandomCursorPosition(final int maxBound)
+ {
+ return maxBound <= 0 ? 0 : ThreadLocalRandom.current().nextInt(maxBound);
+ }
+
/**
* Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto {@code availableKillTaskSlots}.
*/
@@ -197,29 +216,23 @@ private void killUnusedSegments(
stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
return;
}
- final Iterator dataSourcesToKillIterator = this.datasourceIterator.getIterator();
+
final Set remainingDatasourcesToKill = new HashSet<>(dataSourcesToKill);
int submittedTasks = 0;
- while (dataSourcesToKillIterator.hasNext()) {
- if (remainingDatasourcesToKill.size() == 0) {
- break;
- }
-
- if (submittedTasks >= availableKillTaskSlots) {
- log.info(
- "Submitted [%d] kill tasks and reached kill task slot limit [%d].",
- submittedTasks, availableKillTaskSlots
- );
- break;
+ for (String dataSource : datasourceCircularKillList) {
+ if (dataSource.equals(prevDatasourceKilled)) {
+ datasourceCircularKillList.advanceCursor();
}
- final String dataSource = dataSourcesToKillIterator.next();
final DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus(bufferPeriod);
final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime, stats);
if (intervalToKill == null) {
datasourceToLastKillIntervalEnd.remove(dataSource);
remainingDatasourcesToKill.remove(dataSource);
+ if (remainingDatasourcesToKill.size() == 0) {
+ break;
+ }
continue;
}
@@ -235,9 +248,22 @@ private void killUnusedSegments(
),
true
);
+ datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
+ prevDatasourceKilled = dataSource;
++submittedTasks;
remainingDatasourcesToKill.remove(dataSource);
- datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
+
+ if (remainingDatasourcesToKill.size() == 0) {
+ break;
+ }
+
+ if (submittedTasks >= availableKillTaskSlots) {
+ log.info(
+ "Submitted [%d] kill tasks and reached kill task slot limit [%d].",
+ submittedTasks, availableKillTaskSlots
+ );
+ break;
+ }
}
catch (Exception ex) {
log.error(ex, "Failed to submit kill task for dataSource[%s] in interval[%s]", dataSource, intervalToKill);
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index b6ef02a650c4..298b76bdebd3 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -887,7 +887,13 @@ int generateRandomCursorPosition(final int maxBound)
return 0;
}
}
- );
+ ) {
+ @Override
+ int generateRandomCursorPosition(final int maxBound)
+ {
+ return 0;
+ }
+ };
}
private CoordinatorRunStats runDutyAndGetStats()
From 5fa342c164699b824c79a68e0839d101e31ac14f Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Thu, 25 Jul 2024 22:15:36 -0700
Subject: [PATCH 08/11] Updates to CircularList and cleanup
RoundRobinInterator.
---
.../druid/collections/CircularList.java | 30 ++-
.../druid/collections/CircularListTest.java | 123 +++++++++
.../coordinator/duty/KillUnusedSegments.java | 30 +--
.../coordinator/duty/RoundRobinIterator.java | 135 ----------
.../duty/KillUnusedSegmentsTest.java | 30 +--
.../duty/RoundRobinIteratorTest.java | 236 ------------------
6 files changed, 147 insertions(+), 437 deletions(-)
create mode 100644 processing/src/test/java/org/apache/druid/collections/CircularListTest.java
delete mode 100644 server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
delete mode 100644 server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
diff --git a/processing/src/main/java/org/apache/druid/collections/CircularList.java b/processing/src/main/java/org/apache/druid/collections/CircularList.java
index 3decc0114186..737bb3c060e2 100644
--- a/processing/src/main/java/org/apache/druid/collections/CircularList.java
+++ b/processing/src/main/java/org/apache/druid/collections/CircularList.java
@@ -19,8 +19,8 @@
package org.apache.druid.collections;
+import javax.annotation.concurrent.NotThreadSafe;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -28,9 +28,12 @@
import java.util.Set;
/**
- *
- * @param
+ * A circular list that is backed by an ordered list of elements containing no duplicates. The list is ordered by the
+ * supplied comparator. Callers are responsible for terminating the iterator explicitly.
+ *
+ * This class is not thread-safe and must be used from a single thread.
*/
+@NotThreadSafe
public class CircularList implements Iterable
{
private final List collection = new ArrayList<>();
@@ -67,7 +70,7 @@ public T next()
return nextCandidate;
}
- T peekNext()
+ private T peekNext()
{
int nextPosition = currentPosition < collection.size() ? currentPosition : 0;
return collection.get(nextPosition);
@@ -75,6 +78,9 @@ T peekNext()
};
}
+ /**
+ * Advances the cursor position in the circular list. If the position reaches the end of the list, it wraps around.
+ */
public void advanceCursor()
{
if (++currentPosition >= collection.size()) {
@@ -82,15 +88,13 @@ public void advanceCursor()
}
}
- public void resetCursor(final int position)
- {
- this.currentPosition = position;
- }
-
- public boolean equalsSet(final Set s)
+ /**
+ * @return true if the supplied set is equal to the set used to instantiate this circular list, otherwise false.
+ */
+ public boolean equalsSet(final Set inputSet)
{
- final List x = new ArrayList<>(s);
- x.sort(comparator);
- return collection.equals(x);
+ final List sortedList = new ArrayList<>(inputSet);
+ sortedList.sort(comparator);
+ return collection.equals(sortedList);
}
}
diff --git a/processing/src/test/java/org/apache/druid/collections/CircularListTest.java b/processing/src/test/java/org/apache/druid/collections/CircularListTest.java
new file mode 100644
index 000000000000..40a38be612dd
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/collections/CircularListTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+public class CircularListTest
+{
+ @Test
+ public void testIterateInNaturalOrder()
+ {
+ final Set input = ImmutableSet.of("b", "a", "c");
+ final CircularList circularList = new CircularList<>(input, Comparator.naturalOrder());
+ final List observedElements = new ArrayList<>();
+ int cnt = 0;
+ for (String x : circularList) {
+ observedElements.add(x);
+ if (++cnt >= input.size()) {
+ break;
+ }
+ }
+ Assert.assertEquals(ImmutableList.of("a", "b", "c"), observedElements);
+ }
+
+ @Test
+ public void testIterateInReverseOrder()
+ {
+ final Set input = ImmutableSet.of(-1, 100, 0, -4);
+ final CircularList circularList = new CircularList<>(input, Comparator.reverseOrder());
+ final List observedElements = new ArrayList<>();
+ int cnt = 0;
+ for (Integer x : circularList) {
+ observedElements.add(x);
+ if (++cnt >= 2 * input.size()) {
+ break;
+ }
+ }
+
+ Assert.assertEquals(ImmutableList.of(100, 0, -1, -4, 100, 0, -1, -4), observedElements);
+ }
+
+ @Test
+ public void testAdvanceCursor()
+ {
+ final Set input = ImmutableSet.of(1, 10, 35, 50);
+ final CircularList circularList = new CircularList<>(input, Comparator.naturalOrder());
+ final Iterator iterator = circularList.iterator();
+
+ circularList.advanceCursor();
+ Assert.assertTrue(iterator.hasNext());
+ Assert.assertEquals((Integer) 10, iterator.next());
+
+ circularList.advanceCursor();
+ circularList.advanceCursor();
+ Assert.assertTrue(iterator.hasNext());
+ Assert.assertEquals((Integer) 1, iterator.next());
+ }
+
+ @Test
+ public void testEqualsSet()
+ {
+ final Set input = ImmutableSet.of("a", "b", "c");
+ final CircularList circularList = new CircularList<>(input, Comparator.naturalOrder());
+ Assert.assertTrue(circularList.equalsSet(ImmutableSet.of("b", "a", "c")));
+ Assert.assertFalse(circularList.equalsSet(ImmutableSet.of("c")));
+ Assert.assertFalse(circularList.equalsSet(ImmutableSet.of("a", "c")));
+ }
+
+ @Test
+ public void testEmptyIterator()
+ {
+ final Set input = ImmutableSet.of();
+ final CircularList circularList = new CircularList<>(input, Comparator.naturalOrder());
+ final List observedElements = new ArrayList<>();
+
+ int cnt = 0;
+ for (String x : circularList) {
+ observedElements.add(x);
+ if (++cnt >= input.size()) {
+ break;
+ }
+ }
+ Assert.assertEquals(ImmutableList.of(), observedElements);
+ }
+
+ @Test
+ public void testNextOnEmptyIteratorThrowsException()
+ {
+ final Set input = ImmutableSet.of();
+ final CircularList circularList = new CircularList<>(input, Comparator.naturalOrder());
+
+ final Iterator iterator = circularList.iterator();
+ Assert.assertFalse(iterator.hasNext());
+ Assert.assertThrows(NoSuchElementException.class, iterator::next);
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 510da3c4f4d6..3b2ebd45aec4 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -19,7 +19,6 @@
package org.apache.druid.server.coordinator.duty;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Sets;
import org.apache.druid.collections.CircularList;
@@ -45,12 +44,10 @@
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
/**
*
@@ -64,8 +61,9 @@
* as there can be multiple unused segments with different {@code used_status_last_updated} time.
*
*
- * The datasources to kill during each cycle is picked using {@link #datasourceIterator}, which is refreshed as needed
- * during each cycle of this duty.
+ * The datasources to be killed during each cycle are selected from {@link #datasourceCircularKillList}. This state is
+ * refreshed in a run if the set of datasources to be killed changes. Consecutive duplicate datasources are avoided
+ * across runs, provided there are other datasources to be killed.
*
*
* See {@link org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}.
@@ -94,7 +92,6 @@ public class KillUnusedSegments implements CoordinatorDuty
private final Map datasourceToLastKillIntervalEnd;
private DateTime lastKillTime;
- private final RoundRobinIterator datasourceIterator;
private final SegmentsMetadataManager segmentsMetadataManager;
private final OverlordClient overlordClient;
@@ -107,17 +104,6 @@ public KillUnusedSegments(
OverlordClient overlordClient,
KillUnusedSegmentsConfig killConfig
)
- {
- this(segmentsMetadataManager, overlordClient, killConfig, new RoundRobinIterator());
- }
-
- @VisibleForTesting
- KillUnusedSegments(
- SegmentsMetadataManager segmentsMetadataManager,
- OverlordClient overlordClient,
- KillUnusedSegmentsConfig killConfig,
- RoundRobinIterator robinUniqueIterator
- )
{
this.period = killConfig.getCleanupPeriod();
this.maxSegmentsToKill = killConfig.getMaxSegments();
@@ -143,7 +129,6 @@ public KillUnusedSegments(
this.segmentsMetadataManager = segmentsMetadataManager;
this.overlordClient = overlordClient;
this.datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>();
- this.datasourceIterator = robinUniqueIterator;
}
@Override
@@ -181,9 +166,6 @@ private DruidCoordinatorRuntimeParams runInternal(final DruidCoordinatorRuntimeP
if (datasourceCircularKillList == null ||
!datasourceCircularKillList.equalsSet(dataSourcesToKill)) {
datasourceCircularKillList = new CircularList<>(dataSourcesToKill, Comparator.naturalOrder());
-
- final int randomPosition = generateRandomCursorPosition(dataSourcesToKill.size());
- datasourceCircularKillList.resetCursor(randomPosition);
}
lastKillTime = DateTimes.nowUtc();
@@ -196,12 +178,6 @@ private DruidCoordinatorRuntimeParams runInternal(final DruidCoordinatorRuntimeP
return params;
}
- @VisibleForTesting
- int generateRandomCursorPosition(final int maxBound)
- {
- return maxBound <= 0 ? 0 : ThreadLocalRandom.current().nextInt(maxBound);
- }
-
/**
* Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto {@code availableKillTaskSlots}.
*/
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
deleted file mode 100644
index f8b37a0f2488..000000000000
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RoundRobinIterator.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator.duty;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.druid.error.InvalidInput;
-
-import javax.annotation.concurrent.NotThreadSafe;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * A round-robin iterator that is backed by an ordered list of candidates containing no duplicates. The iterator
- * iterates endlessly over all the candidates. The caller must explicitly terminate it.
- * The iterator has the following properties:
- *
- * - Starts with an initial random cursor position in an ordered list of candidates.
- * - Invoking {@code next()} on {@link #getIterator()} is guaranteed to result in a deterministic order
- * unless the set of candidates change when {@link #updateCandidates(Set)} is called. When the candidates change,
- * the cursor is reset to a random position in the new list of ordered candidates.
- * - Guarantees that no duplicate candidates are returned in two consecutive {@code next()} iterations.
- *
- *
- *
- * This class is not thread-safe and must be used from a single thread.
- */
-@NotThreadSafe
-public class RoundRobinIterator
-{
- private final List candidates = new ArrayList<>();
- private int cursorPosition;
- private String previousCandidate;
-
- /**
- * Update the candidates with the supplied input if the input set is different from the current candidates.
- * If the input set is the same as the existing candidates, the cursor position remains unchanged so the iteration order
- * is determistic and resumed. If the input set is different, the cursor position is reset to a random position in
- * the new list of sorted candidates. The supplied input must be non-null.
- */
- public void updateCandidates(final Set input)
- {
- if (input == null) {
- throw InvalidInput.exception("Supplied input candidates must be non-null.");
- }
- if (new HashSet<>(candidates).equals(input)) {
- return;
- }
- this.candidates.clear();
- this.candidates.addAll(input);
- Collections.sort(this.candidates);
- this.cursorPosition = generateRandomCursorPosition(input.size());
- }
-
- /**
- * Returns a round-robin iterator over the list of candidates. The caller must explicitly terminate it.
- */
- public Iterator getIterator()
- {
- return new Iterator()
- {
- @Override
- public boolean hasNext()
- {
- return candidates.size() > 0;
- }
-
- @Override
- public String next()
- {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- // If the next candidate to be served is the same as the previous candidate served (duplicate),
- // advance the cursor twice. Otherwise, advance the cursor only once.
- String nextCandidate = peakNextCandiate();
- advanceCursor();
- if (nextCandidate.equals(previousCandidate)) {
- nextCandidate = peakNextCandiate();
- advanceCursor();
- }
- previousCandidate = nextCandidate;
- return nextCandidate;
- }
-
- private String peakNextCandiate()
- {
- final int nextPosition = cursorPosition < candidates.size() ? cursorPosition : 0;
- return candidates.get(nextPosition);
- }
-
- private void advanceCursor()
- {
- if (++cursorPosition >= candidates.size()) {
- cursorPosition = 0;
- }
- }
- };
- }
-
- @VisibleForTesting
- int generateRandomCursorPosition(final int maxBound)
- {
- return maxBound <= 0 ? 0 : ThreadLocalRandom.current().nextInt(maxBound);
- }
-
- @VisibleForTesting
- int getCurrentCursorPosition()
- {
- return this.cursorPosition;
- }
-}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 298b76bdebd3..8bdfc63e213d 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -227,7 +227,7 @@ public void testKillWithMultipleDatasources()
/**
* Set up multiple datasources {@link #DS1}, {@link #DS2} and {@link #DS3} with unused segments with 2 kill task
- * slots. Running the kill duty each time should pick a different set of datasources in a round-robin manner.
+ * slots. Running the kill duty each time should pick at least one unique datasource in a round-robin manner.
*/
@Test
public void testKillMultipleDatasourcesInRoundRobinManner()
@@ -567,10 +567,10 @@ public void testKillTaskSlotAtCapacity()
initDuty();
final CoordinatorRunStats stats = runDutyAndGetStats();
-
Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
+
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
@@ -825,11 +825,7 @@ private void validateLastKillStateAndReset(final String dataSource, @Nullable fi
overlordClient.deleteLastKillInterval(dataSource);
}
- private void createAndAddUsedSegment(
- final String dataSource,
- final Interval interval,
- final String version
- )
+ private void createAndAddUsedSegment(final String dataSource, final Interval interval, final String version)
{
final DataSegment segment = createSegment(dataSource, interval, version);
try {
@@ -875,25 +871,7 @@ private DataSegment createSegment(final String dataSource, final Interval interv
private void initDuty()
{
- killDuty = new KillUnusedSegments(
- sqlSegmentsMetadataManager,
- overlordClient,
- configBuilder.build(),
- new RoundRobinIterator()
- {
- @Override
- int generateRandomCursorPosition(final int maxBound)
- {
- return 0;
- }
- }
- ) {
- @Override
- int generateRandomCursorPosition(final int maxBound)
- {
- return 0;
- }
- };
+ killDuty = new KillUnusedSegments(sqlSegmentsMetadataManager, overlordClient, configBuilder.build());
}
private CoordinatorRunStats runDutyAndGetStats()
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
deleted file mode 100644
index 7ad3363e4eaf..000000000000
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/RoundRobinIteratorTest.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator.duty;
-
-import com.google.common.collect.ImmutableSet;
-import junitparams.converters.Nullable;
-import org.apache.druid.error.DruidException;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class RoundRobinIteratorTest
-{
- @Test
- public void testIterationOrder()
- {
- final RoundRobinIterator rrIterator = new RoundRobinIterator()
- {
- @Override
- int generateRandomCursorPosition(final int maxBound)
- {
- return 1;
- }
- };
- rrIterator.updateCandidates(ImmutableSet.of("a", "b", "c"));
-
- final Iterator it = rrIterator.getIterator();
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("b", it.next());
-
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("c", it.next());
-
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("a", it.next());
-
- rrIterator.updateCandidates(ImmutableSet.of("x", "y"));
-
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("y", it.next());
-
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("x", it.next());
- }
-
- @Test
- public void testNoConsecutiveDuplicates()
- {
- final RoundRobinIterator rrIterator = new RoundRobinIterator()
- {
- @Override
- int generateRandomCursorPosition(final int maxBound)
- {
- return 0;
- }
- };
- rrIterator.updateCandidates(ImmutableSet.of("a"));
-
- final Iterator it = rrIterator.getIterator();
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("a", it.next());
-
- rrIterator.updateCandidates(ImmutableSet.of("a", "b"));
-
- // next() should skip the duplicate "a" as it was the last value served
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("b", it.next());
-
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("a", it.next());
-
- rrIterator.updateCandidates(ImmutableSet.of("b", "c"));
-
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("b", it.next());
-
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("c", it.next());
- }
-
- @Test
- public void testMultipleUpdatesWithSameCandidates()
- {
- final Set input = ImmutableSet.of("1", "2", "3");
- final RoundRobinIterator rrIterator = new RoundRobinIterator()
- {
- @Override
- int generateRandomCursorPosition(final int maxBound)
- {
- return 0;
- }
- };
- rrIterator.updateCandidates(input);
- final Iterator it = rrIterator.getIterator();
-
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("1", it.next());
-
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("2", it.next());
-
- rrIterator.updateCandidates(input);
-
- // The cursor should just resume from the last position instead of resetting to 0.
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("3", it.next());
-
- Assert.assertTrue(it.hasNext());
- Assert.assertEquals("1", it.next());
- }
-
- @Test
- public void testEmptyIterator()
- {
- final RoundRobinIterator rrIterator = new RoundRobinIterator();
- final Iterator it = rrIterator.getIterator();
-
- Assert.assertFalse(it.hasNext());
- Assert.assertThrows(NoSuchElementException.class, it::next);
- }
-
- @Test
- public void testUpdateWithNullCandidate()
- {
- final RoundRobinIterator rrIterator = new RoundRobinIterator();
- Assert.assertThrows(DruidException.class, () -> rrIterator.updateCandidates(null));
- }
-
- /**
- * Similar to the above tests but with the built-in randomness.
- */
- @Test
- public void testMultipleUpdatesWithRandomCursorPositions()
- {
- final RoundRobinIterator rrIterator = new RoundRobinIterator();
-
- Set input = ImmutableSet.of("a", "b", "c");
- rrIterator.updateCandidates(input);
-
- List expectedCandidates = getexpectedCandidates(input, rrIterator.getCurrentCursorPosition(), null);
- List actualCandidates = getObservedCandidates(rrIterator.getIterator(), input.size());
- Assert.assertEquals(expectedCandidates, actualCandidates);
- String lastValue = actualCandidates.get(actualCandidates.size() - 1);
-
- // Second update
- input = ImmutableSet.of("c", "d");
- rrIterator.updateCandidates(input);
-
- expectedCandidates = getexpectedCandidates(input, rrIterator.getCurrentCursorPosition(), lastValue);
- actualCandidates = getObservedCandidates(rrIterator.getIterator(), input.size());
- Assert.assertEquals(expectedCandidates, actualCandidates);
-
- Assert.assertNotEquals(lastValue, actualCandidates.get(0));
- lastValue = actualCandidates.get(actualCandidates.size() - 1);
-
- // Third update
- input = ImmutableSet.of("d", "e");
- rrIterator.updateCandidates(ImmutableSet.of("d", "e"));
-
- expectedCandidates = getexpectedCandidates(input, rrIterator.getCurrentCursorPosition(), lastValue);
- actualCandidates = getObservedCandidates(rrIterator.getIterator(), input.size());
- Assert.assertEquals(expectedCandidates, actualCandidates);
-
- Assert.assertNotEquals(lastValue, actualCandidates.get(0));
- lastValue = actualCandidates.get(actualCandidates.size() - 1);
-
- // Fourth update
- input = ImmutableSet.of("e", "f", "h");
- rrIterator.updateCandidates(input);
-
- expectedCandidates = getexpectedCandidates(input, rrIterator.getCurrentCursorPosition(), lastValue);
- actualCandidates = getObservedCandidates(rrIterator.getIterator(), input.size());
- Assert.assertEquals(expectedCandidates, actualCandidates);
- Assert.assertNotEquals(lastValue, actualCandidates.get(0));
- }
-
- private List getexpectedCandidates(
- final Set input,
- final int cursorPosition,
- @Nullable final String previousValue
- )
- {
- final List sortedCandidates = input.stream().sorted().collect(Collectors.toList());
- final List expectedCandidates = new ArrayList<>();
-
- // Adjust the cursor position if the value at the cursor position from the sorted list
- // is the same as the previous value.
- final int n = sortedCandidates.size();
- int adjustedCursorPosition = cursorPosition;
- if (sortedCandidates.get(adjustedCursorPosition % n).equals(previousValue)) {
- adjustedCursorPosition = (adjustedCursorPosition + 1) % n;
- }
-
- for (int i = 0; i < n; i++) {
- int index = (adjustedCursorPosition + i) % n;
- expectedCandidates.add(sortedCandidates.get(index));
- }
- return expectedCandidates;
- }
-
- private List getObservedCandidates(final Iterator it, final int maxSize)
- {
- final List observedCandidates = new ArrayList<>();
- int cnt = 0;
- while (it.hasNext()) {
- observedCandidates.add(it.next());
- if (++cnt >= maxSize) {
- break;
- }
- }
- return observedCandidates;
- }
-}
From 3c186a2afa32e37f08b0e2c90e4ba55c15dd1de9 Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Fri, 26 Jul 2024 00:12:23 -0700
Subject: [PATCH 09/11] One more case and add more tests.
---
.../coordinator/duty/KillUnusedSegments.java | 27 ++---
.../duty/KillUnusedSegmentsTest.java | 100 +++++++++++++++---
2 files changed, 99 insertions(+), 28 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 3b2ebd45aec4..3b552f00acf2 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -197,19 +197,23 @@ private void killUnusedSegments(
int submittedTasks = 0;
for (String dataSource : datasourceCircularKillList) {
- if (dataSource.equals(prevDatasourceKilled)) {
- datasourceCircularKillList.advanceCursor();
+ if (dataSource.equals(prevDatasourceKilled) && remainingDatasourcesToKill.size() > 1) {
+ // Skip this dataSource if it's the same as the previous one and there are others left to kill.
+ continue;
+ } else {
+ prevDatasourceKilled = dataSource;
+ remainingDatasourcesToKill.remove(dataSource);
}
final DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus(bufferPeriod);
final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime, stats);
if (intervalToKill == null) {
datasourceToLastKillIntervalEnd.remove(dataSource);
- remainingDatasourcesToKill.remove(dataSource);
- if (remainingDatasourcesToKill.size() == 0) {
+ if (remainingDatasourcesToKill.isEmpty()) {
break;
}
continue;
+
}
try {
@@ -224,20 +228,11 @@ private void killUnusedSegments(
),
true
);
- datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
- prevDatasourceKilled = dataSource;
++submittedTasks;
- remainingDatasourcesToKill.remove(dataSource);
-
- if (remainingDatasourcesToKill.size() == 0) {
- break;
- }
+ datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
- if (submittedTasks >= availableKillTaskSlots) {
- log.info(
- "Submitted [%d] kill tasks and reached kill task slot limit [%d].",
- submittedTasks, availableKillTaskSlots
- );
+ // Check for termination conditions.
+ if (remainingDatasourcesToKill.isEmpty() || submittedTasks >= availableKillTaskSlots) {
break;
}
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index 8bdfc63e213d..e2cbbdf5e434 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -259,9 +259,6 @@ public void testKillMultipleDatasourcesInRoundRobinManner()
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
- validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
- validateLastKillStateAndReset(DS2, new Interval(YEAR_OLD.getStart(), DAY_OLD.getEnd()));
-
stats = runDutyAndGetStats();
Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS));
@@ -270,9 +267,6 @@ public void testKillMultipleDatasourcesInRoundRobinManner()
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
Assert.assertEquals(4, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
- validateLastKillStateAndReset(DS3, new Interval(YEAR_OLD.getStart(), DAY_OLD.getEnd()));
- validateLastKillStateAndReset(DS1, new Interval(DAY_OLD.getStart(), NEXT_DAY.getEnd()));
-
stats = runDutyAndGetStats();
Assert.assertEquals(6, stats.get(Stats.Kill.AVAILABLE_SLOTS));
@@ -281,9 +275,6 @@ public void testKillMultipleDatasourcesInRoundRobinManner()
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS3_STAT_KEY));
- validateLastKillStateAndReset(DS2, NEXT_DAY);
- validateLastKillStateAndReset(DS3, NEXT_DAY);
-
stats = runDutyAndGetStats();
Assert.assertEquals(8, stats.get(Stats.Kill.AVAILABLE_SLOTS));
@@ -291,10 +282,95 @@ public void testKillMultipleDatasourcesInRoundRobinManner()
Assert.assertEquals(8, stats.get(Stats.Kill.MAX_SLOTS));
Assert.assertEquals(5, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
+ }
+
+ /**
+ * The set of datasources to kill change in consecutive runs. The kill duty should avoid selecting two
+ * consecutive datasources across runs as long as there are other datasources to kill.
+ */
+ @Test
+ public void testKillInRoundRobinMannerWhenDatasourcesChange()
+ {
+ configBuilder.withIgnoreDurationToRetain(true)
+ .withMaxSegmentsToKill(2);
+ dynamicConfigBuilder.withMaxKillTaskSlots(1);
+
+ createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
+
+ initDuty();
+ CoordinatorRunStats stats = runDutyAndGetStats();
+
+ Assert.assertEquals(1, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(1, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
+
+ validateLastKillStateAndReset(DS1, new Interval(YEAR_OLD.getStart(), MONTH_OLD.getEnd()));
+
+ createAndAddUnusedSegment(DS2, YEAR_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS2, DAY_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS2, NEXT_DAY, VERSION, NOW.minusDays(1));
+
+ stats = runDutyAndGetStats();
+
+ Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
+
+ stats = runDutyAndGetStats();
+
+ Assert.assertEquals(3, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(3, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(3, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
+
+ stats = runDutyAndGetStats();
+
+ Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(4, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(4, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
+ }
+
+ /**
+ * There is a single datasource to kill across multiple runs. The duty should keep picking the same datasource.
+ */
+ @Test
+ public void testKillSingleDatasourceMultipleRuns()
+ {
+ configBuilder.withIgnoreDurationToRetain(true)
+ .withMaxSegmentsToKill(2);
+ dynamicConfigBuilder.withMaxKillTaskSlots(2);
+
+ createAndAddUnusedSegment(DS1, YEAR_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS1, MONTH_OLD, VERSION, NOW.minusDays(1));
+ createAndAddUnusedSegment(DS1, DAY_OLD, VERSION, NOW.minusDays(1));
+
+ initDuty();
+ CoordinatorRunStats stats = runDutyAndGetStats();
+
+ Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(1, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
+
+ stats = runDutyAndGetStats();
+
+ Assert.assertEquals(4, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(4, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
+
+ stats = runDutyAndGetStats();
+
+ Assert.assertEquals(6, stats.get(Stats.Kill.AVAILABLE_SLOTS));
+ Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
+ Assert.assertEquals(6, stats.get(Stats.Kill.MAX_SLOTS));
+ Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
- validateLastKillStateAndReset(DS1, NEXT_MONTH);
- validateLastKillStateAndReset(DS2, null);
- validateLastKillStateAndReset(DS3, null);
}
/**
From 52dc479ae3494f8c402cb62ac2b1be8366231a01 Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Fri, 26 Jul 2024 00:45:07 -0700
Subject: [PATCH 10/11] Make advanceCursor private for now.
---
.../apache/druid/collections/CircularList.java | 17 +++++++----------
.../druid/collections/CircularListTest.java | 17 -----------------
.../coordinator/duty/KillUnusedSegments.java | 1 -
.../duty/KillUnusedSegmentsTest.java | 4 ++--
4 files changed, 9 insertions(+), 30 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/collections/CircularList.java b/processing/src/main/java/org/apache/druid/collections/CircularList.java
index 737bb3c060e2..338340062b39 100644
--- a/processing/src/main/java/org/apache/druid/collections/CircularList.java
+++ b/processing/src/main/java/org/apache/druid/collections/CircularList.java
@@ -75,17 +75,14 @@ private T peekNext()
int nextPosition = currentPosition < collection.size() ? currentPosition : 0;
return collection.get(nextPosition);
}
- };
- }
- /**
- * Advances the cursor position in the circular list. If the position reaches the end of the list, it wraps around.
- */
- public void advanceCursor()
- {
- if (++currentPosition >= collection.size()) {
- currentPosition = 0;
- }
+ private void advanceCursor()
+ {
+ if (++currentPosition >= collection.size()) {
+ currentPosition = 0;
+ }
+ }
+ };
}
/**
diff --git a/processing/src/test/java/org/apache/druid/collections/CircularListTest.java b/processing/src/test/java/org/apache/druid/collections/CircularListTest.java
index 40a38be612dd..ab66aeb0bcb3 100644
--- a/processing/src/test/java/org/apache/druid/collections/CircularListTest.java
+++ b/processing/src/test/java/org/apache/druid/collections/CircularListTest.java
@@ -66,23 +66,6 @@ public void testIterateInReverseOrder()
Assert.assertEquals(ImmutableList.of(100, 0, -1, -4, 100, 0, -1, -4), observedElements);
}
- @Test
- public void testAdvanceCursor()
- {
- final Set input = ImmutableSet.of(1, 10, 35, 50);
- final CircularList circularList = new CircularList<>(input, Comparator.naturalOrder());
- final Iterator iterator = circularList.iterator();
-
- circularList.advanceCursor();
- Assert.assertTrue(iterator.hasNext());
- Assert.assertEquals((Integer) 10, iterator.next());
-
- circularList.advanceCursor();
- circularList.advanceCursor();
- Assert.assertTrue(iterator.hasNext());
- Assert.assertEquals((Integer) 1, iterator.next());
- }
-
@Test
public void testEqualsSet()
{
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 3b552f00acf2..77abee91c980 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -231,7 +231,6 @@ private void killUnusedSegments(
++submittedTasks;
datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
- // Check for termination conditions.
if (remainingDatasourcesToKill.isEmpty() || submittedTasks >= availableKillTaskSlots) {
break;
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index e2cbbdf5e434..bdb7a773b09d 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -556,7 +556,7 @@ public void testLowerMaxSegmentsToKill()
}
@Test
- public void testDatasourcesFoundButNoUnusedSegments()
+ public void testKillDatasourceWithNoUnusedSegmentsInInitialRun()
{
configBuilder.withMaxSegmentsToKill(1);
@@ -643,10 +643,10 @@ public void testKillTaskSlotAtCapacity()
initDuty();
final CoordinatorRunStats stats = runDutyAndGetStats();
+
Assert.assertEquals(2, stats.get(Stats.Kill.AVAILABLE_SLOTS));
Assert.assertEquals(2, stats.get(Stats.Kill.SUBMITTED_TASKS));
Assert.assertEquals(2, stats.get(Stats.Kill.MAX_SLOTS));
-
Assert.assertEquals(2, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS1_STAT_KEY));
Assert.assertEquals(1, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
From 9263b9c72c67e84d0e7fc4603fd1c23e9475eea3 Mon Sep 17 00:00:00 2001
From: Abhishek Balaji Radhakrishnan
Date: Fri, 26 Jul 2024 08:45:18 -0700
Subject: [PATCH 11/11] Review comments.
---
.../druid/collections/CircularList.java | 32 +++++++------------
.../druid/collections/CircularListTest.java | 29 +++++++++++++++++
.../coordinator/duty/KillUnusedSegments.java | 5 +--
.../duty/KillUnusedSegmentsTest.java | 18 +++--------
4 files changed, 49 insertions(+), 35 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/collections/CircularList.java b/processing/src/main/java/org/apache/druid/collections/CircularList.java
index 338340062b39..7cf551d6cf88 100644
--- a/processing/src/main/java/org/apache/druid/collections/CircularList.java
+++ b/processing/src/main/java/org/apache/druid/collections/CircularList.java
@@ -22,6 +22,7 @@
import javax.annotation.concurrent.NotThreadSafe;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@@ -29,22 +30,22 @@
/**
* A circular list that is backed by an ordered list of elements containing no duplicates. The list is ordered by the
- * supplied comparator. Callers are responsible for terminating the iterator explicitly.
+ * supplied comparator. The iterator keeps track of the current position, so iterating the list multiple times will
+ * resume from the last location and continue until a caller explicitly terminates it.
*
* This class is not thread-safe and must be used from a single thread.
*/
@NotThreadSafe
public class CircularList implements Iterable
{
- private final List collection = new ArrayList<>();
- private final Comparator super T> comparator;
+ private final List elements = new ArrayList<>();
private int currentPosition;
- public CircularList(final Set elements, Comparator super T> comparator)
+ public CircularList(final Set elements, final Comparator super T> comparator)
{
- this.collection.addAll(elements);
- this.comparator = comparator;
- this.collection.sort(comparator);
+ this.elements.addAll(elements);
+ this.elements.sort(comparator);
+ this.currentPosition = -1;
}
@Override
@@ -55,7 +56,7 @@ public Iterator iterator()
@Override
public boolean hasNext()
{
- return collection.size() > 0;
+ return elements.size() > 0;
}
@Override
@@ -65,20 +66,13 @@ public T next()
throw new NoSuchElementException();
}
- T nextCandidate = peekNext();
advanceCursor();
- return nextCandidate;
- }
-
- private T peekNext()
- {
- int nextPosition = currentPosition < collection.size() ? currentPosition : 0;
- return collection.get(nextPosition);
+ return elements.get(currentPosition);
}
private void advanceCursor()
{
- if (++currentPosition >= collection.size()) {
+ if (++currentPosition >= elements.size()) {
currentPosition = 0;
}
}
@@ -90,8 +84,6 @@ private void advanceCursor()
*/
public boolean equalsSet(final Set inputSet)
{
- final List sortedList = new ArrayList<>(inputSet);
- sortedList.sort(comparator);
- return collection.equals(sortedList);
+ return new HashSet<>(elements).equals(inputSet);
}
}
diff --git a/processing/src/test/java/org/apache/druid/collections/CircularListTest.java b/processing/src/test/java/org/apache/druid/collections/CircularListTest.java
index ab66aeb0bcb3..51518a254e34 100644
--- a/processing/src/test/java/org/apache/druid/collections/CircularListTest.java
+++ b/processing/src/test/java/org/apache/druid/collections/CircularListTest.java
@@ -66,6 +66,35 @@ public void testIterateInReverseOrder()
Assert.assertEquals(ImmutableList.of(100, 0, -1, -4, 100, 0, -1, -4), observedElements);
}
+ @Test
+ public void testIteratorResumesFromLastPosition()
+ {
+ final Set input = ImmutableSet.of("a", "b", "c", "d", "e", "f");
+ final CircularList circularList = new CircularList<>(input, Comparator.naturalOrder());
+
+ List observedElements = new ArrayList<>();
+ int cnt = 0;
+ for (String element : circularList) {
+ observedElements.add(element);
+ if (++cnt >= input.size() / 2) {
+ break;
+ }
+ }
+
+ Assert.assertEquals(ImmutableList.of("a", "b", "c"), observedElements);
+
+ observedElements = new ArrayList<>();
+ for (String element : circularList) {
+ observedElements.add(element);
+ // Resume and go till the end, and add two more elements looping around
+ if (++cnt == input.size() + 2) {
+ break;
+ }
+ }
+
+ Assert.assertEquals(ImmutableList.of("d", "e", "f", "a", "b"), observedElements);
+ }
+
@Test
public void testEqualsSet()
{
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
index 77abee91c980..64b61df5e539 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java
@@ -198,7 +198,7 @@ private void killUnusedSegments(
int submittedTasks = 0;
for (String dataSource : datasourceCircularKillList) {
if (dataSource.equals(prevDatasourceKilled) && remainingDatasourcesToKill.size() > 1) {
- // Skip this dataSource if it's the same as the previous one and there are others left to kill.
+ // Skip this dataSource if it's the same as the previous one and there are remaining datasources to kill.
continue;
} else {
prevDatasourceKilled = dataSource;
@@ -209,11 +209,11 @@ private void killUnusedSegments(
final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime, stats);
if (intervalToKill == null) {
datasourceToLastKillIntervalEnd.remove(dataSource);
+ // If no interval is found for this datasource, either terminate or continue based on remaining datasources to kill.
if (remainingDatasourcesToKill.isEmpty()) {
break;
}
continue;
-
}
try {
@@ -231,6 +231,7 @@ private void killUnusedSegments(
++submittedTasks;
datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd());
+ // Termination conditions.
if (remainingDatasourcesToKill.isEmpty() || submittedTasks >= availableKillTaskSlots) {
break;
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
index bdb7a773b09d..f4e8cdb3cd4b 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java
@@ -230,7 +230,7 @@ public void testKillWithMultipleDatasources()
* slots. Running the kill duty each time should pick at least one unique datasource in a round-robin manner.
*/
@Test
- public void testKillMultipleDatasourcesInRoundRobinManner()
+ public void testRoundRobinKillMultipleDatasources()
{
configBuilder.withIgnoreDurationToRetain(true)
.withMaxSegmentsToKill(2);
@@ -289,7 +289,7 @@ public void testKillMultipleDatasourcesInRoundRobinManner()
* consecutive datasources across runs as long as there are other datasources to kill.
*/
@Test
- public void testKillInRoundRobinMannerWhenDatasourcesChange()
+ public void testRoundRobinKillWhenDatasourcesChange()
{
configBuilder.withIgnoreDurationToRetain(true)
.withMaxSegmentsToKill(2);
@@ -335,9 +335,6 @@ public void testKillInRoundRobinMannerWhenDatasourcesChange()
Assert.assertEquals(3, stats.get(Stats.Kill.ELIGIBLE_UNUSED_SEGMENTS, DS2_STAT_KEY));
}
- /**
- * There is a single datasource to kill across multiple runs. The duty should keep picking the same datasource.
- */
@Test
public void testKillSingleDatasourceMultipleRuns()
{
@@ -901,7 +898,7 @@ private void validateLastKillStateAndReset(final String dataSource, @Nullable fi
overlordClient.deleteLastKillInterval(dataSource);
}
- private void createAndAddUsedSegment(final String dataSource, final Interval interval, final String version)
+ private DataSegment createAndAddUsedSegment(final String dataSource, final Interval interval, final String version)
{
final DataSegment segment = createSegment(dataSource, interval, version);
try {
@@ -910,6 +907,7 @@ private void createAndAddUsedSegment(final String dataSource, final Interval int
catch (IOException e) {
throw new RuntimeException(e);
}
+ return segment;
}
private void createAndAddUnusedSegment(
@@ -919,13 +917,7 @@ private void createAndAddUnusedSegment(
final DateTime lastUpdatedTime
)
{
- final DataSegment segment = createSegment(dataSource, interval, version);
- try {
- SqlSegmentsMetadataManagerTestBase.publishSegment(connector, config, TestHelper.makeJsonMapper(), segment);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
+ final DataSegment segment = createAndAddUsedSegment(dataSource, interval, version);
sqlSegmentsMetadataManager.markSegmentsAsUnused(ImmutableSet.of(segment.getId()));
derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment.getId().toString(), lastUpdatedTime);
}