diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 38e68e81c76b..9c9bbd22df32 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -332,6 +332,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina |`killTask/availableSlot/count`| Number of available task slots that can be used for auto kill tasks in the auto kill run. This is the max number of task slots minus any currently running auto kill tasks. | |Varies| |`killTask/maxSlot/count`| Maximum number of task slots available for auto kill tasks in the auto kill run. | |Varies| |`kill/task/count`| Number of tasks issued in the auto kill run. | |Varies| +|`kill/pendingSegments/count`|Number of stale pending segments deleted from the metadata store.|`dataSource`|Varies| |`segment/waitCompact/bytes`|Total bytes of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies| |`segment/waitCompact/count`|Total number of segments of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies| |`interval/waitCompact/count`|Total number of intervals of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies| diff --git a/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java b/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java index b9c8daff0d6b..2faebaec3309 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java @@ -20,7 +20,9 @@ package org.apache.druid.java.util.common; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Ordering; import io.netty.util.SuppressForbidden; +import org.apache.druid.java.util.common.guava.Comparators; import org.joda.time.Chronology; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -30,6 +32,7 @@ import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; +import java.util.Objects; import java.util.Set; import java.util.TimeZone; import java.util.regex.Pattern; @@ -194,6 +197,34 @@ public static boolean canCompareAsString(final DateTime dateTime) && ISOChronology.getInstanceUTC().equals(dateTime.getChronology()); } + /** + * Returns the earlier of the two given dates. When passed a null and a non-null + * date, this method simply returns the non-null value. + */ + public static DateTime earlierOf(DateTime a, DateTime b) + { + // Put nulls last to select the smaller non-null value + if (Objects.compare(a, b, Ordering.natural().nullsLast()) < 0) { + return a; + } else { + return b; + } + } + + /** + * Returns the later of the two given dates. When passed a null and a non-null + * date, this method simply returns the non-null value. + */ + public static DateTime laterOf(DateTime a, DateTime b) + { + // Put nulls first to select the bigger non-null value + if (Objects.compare(a, b, Comparators.naturalNullsFirst()) > 0) { + return a; + } else { + return b; + } + } + private DateTimes() { } diff --git a/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java b/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java index 55cc0b52a94d..867cf3facd48 100644 --- a/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java @@ -123,4 +123,34 @@ public void testCanCompareAsString() DateTimes.of("2000").withZone(DateTimes.inferTzFromString("America/Los_Angeles"))) ); } + + @Test + public void testEarlierOf() + { + Assert.assertNull(DateTimes.earlierOf(null, null)); + + final DateTime jan14 = DateTimes.of("2013-01-14"); + Assert.assertEquals(jan14, DateTimes.earlierOf(null, jan14)); + Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, null)); + Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, jan14)); + + final DateTime jan15 = DateTimes.of("2013-01-15"); + Assert.assertEquals(jan14, DateTimes.earlierOf(jan15, jan14)); + Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, jan15)); + } + + @Test + public void testLaterOf() + { + Assert.assertNull(DateTimes.laterOf(null, null)); + + final DateTime jan14 = DateTimes.of("2013-01-14"); + Assert.assertEquals(jan14, DateTimes.laterOf(null, jan14)); + Assert.assertEquals(jan14, DateTimes.laterOf(jan14, null)); + Assert.assertEquals(jan14, DateTimes.laterOf(jan14, jan14)); + + final DateTime jan15 = DateTimes.of("2013-01-15"); + Assert.assertEquals(jan15, DateTimes.laterOf(jan15, jan14)); + Assert.assertEquals(jan15, DateTimes.laterOf(jan14, jan15)); + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 0b39688dbf55..4f87c311f904 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -27,7 +27,6 @@ import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.error.InvalidInput; import org.apache.druid.server.coordinator.duty.KillUnusedSegments; -import org.apache.druid.server.coordinator.loading.LoadQueuePeon; import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.utils.JvmUtils; @@ -77,19 +76,13 @@ public class CoordinatorDynamicConfig private final Map validDebugDimensions; /** - * Stale pending segments belonging to the data sources in this list are not killed by {@link + * Stale pending segments belonging to the data sources in this list are not killed by {@code * KillStalePendingSegments}. In other words, segments in these data sources are "protected". - *

- * Pending segments are considered "stale" when their created_time is older than {@link - * KillStalePendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now. */ private final Set dataSourcesToNotKillStalePendingSegmentsIn; /** * The maximum number of segments that can be queued for loading to any given server. - * - * @see LoadQueuePeon - * @see org.apache.druid.server.coordinator.rules.LoadRule#run */ private final int maxSegmentsInNodeLoadingQueue; private final boolean pauseCoordination; @@ -576,6 +569,12 @@ public Builder withSpecificDataSourcesToKillUnusedSegmentsIn(Set dataSou return this; } + public Builder withDatasourcesToNotKillPendingSegmentsIn(Set datasources) + { + this.dataSourcesToNotKillStalePendingSegmentsIn = datasources; + return this; + } + public Builder withKillTaskSlotRatio(Double killTaskSlotRatio) { this.killTaskSlotRatio = killTaskSlotRatio; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java deleted file mode 100644 index c2cd2c1fe686..000000000000 --- a/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.coordinator; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.inject.Inject; -import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.indexer.TaskStatusPlus; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.guava.Comparators; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.rpc.indexing.OverlordClient; -import org.apache.druid.server.coordinator.duty.CoordinatorDuty; -import org.joda.time.DateTime; -import org.joda.time.Interval; -import org.joda.time.Period; - -import java.util.ArrayList; -import java.util.List; - -public class KillStalePendingSegments implements CoordinatorDuty -{ - private static final Logger log = new Logger(KillStalePendingSegments.class); - private static final Period KEEP_PENDING_SEGMENTS_OFFSET = new Period("P1D"); - - private final OverlordClient overlordClient; - - @Inject - public KillStalePendingSegments(OverlordClient overlordClient) - { - this.overlordClient = overlordClient; - } - - @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) - { - final List createdTimes = new ArrayList<>(); - - // Include one complete status so we can get the time of the last-created complete task. (The Overlord API returns - // complete tasks in descending order of created_date.) - final List statuses = - ImmutableList.copyOf(FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 1), true)); - createdTimes.add( - statuses - .stream() - .filter(status -> status.getStatusCode() == null || !status.getStatusCode().isComplete()) - .map(TaskStatusPlus::getCreatedTime) - .min(Comparators.naturalNullsFirst()) - .orElse(DateTimes.nowUtc()) // If there are no active tasks, this returns the current time. - ); - - final TaskStatusPlus completeTaskStatus = - statuses.stream() - .filter(status -> status != null && status.getStatusCode().isComplete()) - .findFirst() - .orElse(null); - if (completeTaskStatus != null) { - createdTimes.add(completeTaskStatus.getCreatedTime()); - } - createdTimes.sort(Comparators.naturalNullsFirst()); - - // There should be at least one createdTime because the current time is added to the 'createdTimes' list if there - // is no running/pending/waiting tasks. - Preconditions.checkState(!createdTimes.isEmpty(), "Failed to gather createdTimes of tasks"); - - // If there is no running/pending/waiting/complete tasks, stalePendingSegmentsCutoffCreationTime is - // (DateTimes.nowUtc() - KEEP_PENDING_SEGMENTS_OFFSET). - final DateTime stalePendingSegmentsCutoffCreationTime = createdTimes.get(0).minus(KEEP_PENDING_SEGMENTS_OFFSET); - for (String dataSource : params.getUsedSegmentsTimelinesPerDataSource().keySet()) { - if (!params.getCoordinatorDynamicConfig().getDataSourcesToNotKillStalePendingSegmentsIn().contains(dataSource)) { - final int pendingSegmentsKilled = FutureUtils.getUnchecked( - overlordClient.killPendingSegments( - dataSource, - new Interval(DateTimes.MIN, stalePendingSegmentsCutoffCreationTime) - ), - true - ); - log.info( - "Killed [%d] pendingSegments created until [%s] for dataSource[%s]", - pendingSegmentsKilled, - stalePendingSegmentsCutoffCreationTime, - dataSource - ); - } - } - return params; - } -} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java new file mode 100644 index 000000000000..da730e20c0ba --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Duty to kill stale pending segments which are not needed anymore. Pending segments + * are created when appending realtime or batch tasks allocate segments to build + * incremental indexes. Under normal operation, these pending segments get committed + * when the task completes and become regular segments. But in case of task failures, + * some pending segments might be left around and cause clutter in the metadata store. + *

+ * While cleaning up, this duty ensures that the following pending segments are + * retained for at least {@link #DURATION_TO_RETAIN}: + *

+ */ +public class KillStalePendingSegments implements CoordinatorDuty +{ + private static final Logger log = new Logger(KillStalePendingSegments.class); + private static final Period DURATION_TO_RETAIN = new Period("P1D"); + + private final OverlordClient overlordClient; + + @Inject + public KillStalePendingSegments(OverlordClient overlordClient) + { + this.overlordClient = overlordClient; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + final Set killDatasources = new HashSet<>( + params.getUsedSegmentsTimelinesPerDataSource().keySet() + ); + killDatasources.removeAll( + params.getCoordinatorDynamicConfig() + .getDataSourcesToNotKillStalePendingSegmentsIn() + ); + + final DateTime minCreatedTime = getMinCreatedTimeToRetain(); + for (String dataSource : killDatasources) { + int pendingSegmentsKilled = FutureUtils.getUnchecked( + overlordClient.killPendingSegments( + dataSource, + new Interval(DateTimes.MIN, minCreatedTime) + ), + true + ); + if (pendingSegmentsKilled > 0) { + log.info( + "Killed [%d] pendingSegments created before [%s] for datasource[%s].", + pendingSegmentsKilled, minCreatedTime, dataSource + ); + params.getCoordinatorStats().add( + Stats.Kill.PENDING_SEGMENTS, + RowKey.of(Dimension.DATASOURCE, dataSource), + pendingSegmentsKilled + ); + } + } + return params; + } + + /** + * Computes the minimum created time of retainable pending segments. Any pending + * segment created before this time is considered stale and can be safely deleted. + * The limit is determined to ensure that pending segments created by any active + * task and the latest completed task (across all datasources) are retained for + * at least {@link #DURATION_TO_RETAIN}. + */ + private DateTime getMinCreatedTimeToRetain() + { + // Fetch the statuses of all active tasks and the latest completed task + // (The Overlord API returns complete tasks in descending order of created_date.) + final List statuses = ImmutableList.copyOf( + FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 1), true) + ); + + DateTime earliestActiveTaskStart = DateTimes.nowUtc(); + DateTime latestCompletedTaskStart = null; + for (TaskStatusPlus status : statuses) { + if (status.getStatusCode() == null) { + // Unknown status + } else if (status.getStatusCode().isComplete()) { + latestCompletedTaskStart = DateTimes.laterOf( + latestCompletedTaskStart, + status.getCreatedTime() + ); + } else { + earliestActiveTaskStart = DateTimes.earlierOf( + earliestActiveTaskStart, + status.getCreatedTime() + ); + } + } + + return DateTimes.earlierOf(latestCompletedTaskStart, earliestActiveTaskStart) + .minus(DURATION_TO_RETAIN); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index 84f2d471b079..539d58d5594d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -147,6 +147,8 @@ public static class Kill = CoordinatorStat.toDebugAndEmit("killMaxSlots", "killTask/maxSlot/count"); public static final CoordinatorStat SUBMITTED_TASKS = CoordinatorStat.toDebugAndEmit("killTasks", "kill/task/count"); + public static final CoordinatorStat PENDING_SEGMENTS + = CoordinatorStat.toDebugAndEmit("killPendingSegs", "kill/pendingSegments/count"); } public static class Balancer diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.java new file mode 100644 index 000000000000..11ea5bd4b57b --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class KillStalePendingSegmentsTest +{ + private TestOverlordClient overlordClient; + private KillStalePendingSegments killDuty; + + @Before + public void setup() + { + this.overlordClient = new TestOverlordClient(); + this.killDuty = new KillStalePendingSegments(overlordClient); + } + + @Test + public void testRetentionStarts1DayBeforeNowWhenNoKnownTask() + { + DruidCoordinatorRuntimeParams params = createParamsWithDatasources(DS.WIKI).build(); + killDuty.run(params); + + final Interval observedKillInterval = overlordClient.observedKillIntervals.get(DS.WIKI); + Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart()); + + // Verify that the cutoff time is no later than 1 day ago from now + DateTime expectedCutoffTime = DateTimes.nowUtc().minusDays(1); + Assert.assertTrue( + expectedCutoffTime.getMillis() - observedKillInterval.getEnd().getMillis() <= 100 + ); + } + + @Test + public void testRetentionStarts1DayBeforeEarliestActiveTask() + { + final DateTime startOfEarliestActiveTask = DateTimes.of("2023-01-01"); + overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask, TaskState.RUNNING); + overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask.plusHours(2), TaskState.RUNNING); + overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask.plusDays(1), TaskState.RUNNING); + overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask.plusHours(3), TaskState.RUNNING); + + DruidCoordinatorRuntimeParams params = createParamsWithDatasources(DS.WIKI).build(); + killDuty.run(params); + + final Interval observedKillInterval = overlordClient.observedKillIntervals.get(DS.WIKI); + Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart()); + Assert.assertEquals(startOfEarliestActiveTask.minusDays(1), observedKillInterval.getEnd()); + } + + @Test + public void testRetentionStarts1DayBeforeLatestCompletedTask() + { + final DateTime startOfLatestCompletedTask = DateTimes.of("2023-01-01"); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask, TaskState.FAILED); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusHours(2), TaskState.SUCCESS); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusDays(2), TaskState.FAILED); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS); + + DruidCoordinatorRuntimeParams params = createParamsWithDatasources(DS.WIKI).build(); + killDuty.run(params); + + final Interval observedKillInterval = overlordClient.observedKillIntervals.get(DS.WIKI); + Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart()); + Assert.assertEquals(startOfLatestCompletedTask.minusDays(1), observedKillInterval.getEnd()); + + final CoordinatorRunStats stats = params.getCoordinatorStats(); + Assert.assertEquals(2, stats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of(Dimension.DATASOURCE, DS.WIKI))); + } + + @Test + public void testRetentionStarts1DayBeforeLatestCompletedOrEarliestActiveTask() + { + final DateTime startOfLatestCompletedTask = DateTimes.of("2023-02-01"); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask, TaskState.FAILED); + + final DateTime startOfEarliestActiveTask = DateTimes.of("2023-01-01"); + overlordClient.addTaskAndSegment(DS.KOALA, startOfEarliestActiveTask, TaskState.RUNNING); + + DruidCoordinatorRuntimeParams params = createParamsWithDatasources(DS.WIKI, DS.KOALA).build(); + killDuty.run(params); + + DateTime earliestEligibleTask = DateTimes.earlierOf(startOfEarliestActiveTask, startOfLatestCompletedTask); + final Interval wikiKillInterval = overlordClient.observedKillIntervals.get(DS.WIKI); + Assert.assertEquals(DateTimes.MIN, wikiKillInterval.getStart()); + Assert.assertEquals(earliestEligibleTask.minusDays(1), wikiKillInterval.getEnd()); + + final Interval koalaKillInterval = overlordClient.observedKillIntervals.get(DS.KOALA); + Assert.assertEquals(DateTimes.MIN, koalaKillInterval.getStart()); + Assert.assertEquals(earliestEligibleTask.minusDays(1), wikiKillInterval.getEnd()); + } + + @Test + public void testPendingSegmentOfDisallowedDatasourceIsNotDeleted() + { + DruidCoordinatorRuntimeParams params = + createParamsWithDatasources(DS.WIKI, DS.KOALA).withDynamicConfigs( + CoordinatorDynamicConfig + .builder() + .withDatasourcesToNotKillPendingSegmentsIn( + Collections.singleton(DS.KOALA) + ) + .build() + ).build(); + + DateTime startOfLatestCompletedTask = DateTimes.of("2023-01-01"); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask, TaskState.SUCCESS); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusDays(5), TaskState.SUCCESS); + overlordClient.addTaskAndSegment(DS.KOALA, startOfLatestCompletedTask, TaskState.SUCCESS); + overlordClient.addTaskAndSegment(DS.KOALA, startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS); + overlordClient.addTaskAndSegment(DS.KOALA, startOfLatestCompletedTask.minusDays(5), TaskState.SUCCESS); + + killDuty.run(params); + + // Verify that stale pending segments are killed in "wiki" but not in "koala" + final CoordinatorRunStats stats = params.getCoordinatorStats(); + Assert.assertTrue(overlordClient.observedKillIntervals.containsKey(DS.WIKI)); + Assert.assertEquals(2, stats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of(Dimension.DATASOURCE, DS.WIKI))); + + Assert.assertFalse(overlordClient.observedKillIntervals.containsKey(DS.KOALA)); + Assert.assertEquals(0, stats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of(Dimension.DATASOURCE, DS.KOALA))); + } + + private DruidCoordinatorRuntimeParams.Builder createParamsWithDatasources(String... datasources) + { + DruidCoordinatorRuntimeParams.Builder builder = DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc()); + + // Create a dummy for each of the datasources so that they get added to the timeline + Set usedSegments = new HashSet<>(); + for (String datasource : datasources) { + usedSegments.add( + DataSegment.builder().dataSource(datasource).interval(Intervals.ETERNITY) + .version("v1").shardSpec(new NumberedShardSpec(0, 1)).size(100).build() + ); + } + + return builder.withUsedSegments(usedSegments); + } + + private static class DS + { + static final String WIKI = "wiki"; + static final String KOALA = "koala"; + } + + /** + * Simulates an Overlord with a configurable list of tasks and pending segments. + */ + private static class TestOverlordClient extends NoopOverlordClient + { + private final List taskStatuses = new ArrayList<>(); + private final Map> datasourceToPendingSegments = new HashMap<>(); + + private final Map observedKillIntervals = new HashMap<>(); + + private int taskIdSuffix = 0; + + void addTaskAndSegment(String datasource, DateTime createdTime, TaskState state) + { + taskStatuses.add( + new TaskStatusPlus( + datasource + "__" + taskIdSuffix++, + null, null, createdTime, createdTime, state, + state.isComplete() ? RunnerTaskState.NONE : RunnerTaskState.RUNNING, + 100L, TaskLocation.unknown(), datasource, null + ) + ); + + // Add a pending segment with created time 5 minutes after the task was created + datasourceToPendingSegments.computeIfAbsent(datasource, ds -> new ArrayList<>()) + .add(createdTime.plusMinutes(5)); + } + + @Override + public ListenableFuture> taskStatuses( + @Nullable String state, + @Nullable String dataSource, + @Nullable Integer maxCompletedTasks + ) + { + return Futures.immediateFuture( + CloseableIterators.wrap(taskStatuses.iterator(), null) + ); + } + + @Override + public ListenableFuture killPendingSegments(String dataSource, Interval interval) + { + observedKillIntervals.put(dataSource, interval); + + List pendingSegments = datasourceToPendingSegments.remove(dataSource); + if (pendingSegments == null || pendingSegments.isEmpty()) { + return Futures.immediateFuture(0); + } + + List remainingPendingSegments = new ArrayList<>(); + int numDeletedPendingSegments = 0; + for (DateTime createdTime : pendingSegments) { + if (createdTime.isBefore(interval.getEnd())) { + ++numDeletedPendingSegments; + } else { + remainingPendingSegments.add(createdTime); + } + } + + if (remainingPendingSegments.size() > 0) { + datasourceToPendingSegments.put(dataSource, remainingPendingSegments); + } + + return Futures.immediateFuture(numDeletedPendingSegments); + } + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 5327f0b3a764..a7883ceeb5f5 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -78,7 +78,6 @@ import org.apache.druid.server.audit.AuditManagerProvider; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; -import org.apache.druid.server.coordinator.KillStalePendingSegments; import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory; import org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategyConfig; import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy; @@ -91,6 +90,7 @@ import org.apache.druid.server.coordinator.duty.KillCompactionConfig; import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata; import org.apache.druid.server.coordinator.duty.KillRules; +import org.apache.druid.server.coordinator.duty.KillStalePendingSegments; import org.apache.druid.server.coordinator.duty.KillSupervisors; import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;