From fa4125a1757a33b9b1f8afd2920b6f64d6a71c8a Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 7 Sep 2023 16:57:38 +0530 Subject: [PATCH 1/4] Add test and metrics for KillStalePendingSegments duty --- .../druid/java/util/common/DateTimes.java | 31 +++ .../druid/java/util/common/DateTimesTest.java | 30 ++ .../coordinator/CoordinatorDynamicConfig.java | 6 + .../coordinator/KillStalePendingSegments.java | 112 +++++--- .../druid/server/coordinator/stats/Stats.java | 2 + .../KillStalePendingSegmentsTest.java | 262 ++++++++++++++++++ 6 files changed, 398 insertions(+), 45 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/KillStalePendingSegmentsTest.java diff --git a/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java b/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java index b9c8daff0d6b..2faebaec3309 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/DateTimes.java @@ -20,7 +20,9 @@ package org.apache.druid.java.util.common; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Ordering; import io.netty.util.SuppressForbidden; +import org.apache.druid.java.util.common.guava.Comparators; import org.joda.time.Chronology; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -30,6 +32,7 @@ import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; +import java.util.Objects; import java.util.Set; import java.util.TimeZone; import java.util.regex.Pattern; @@ -194,6 +197,34 @@ public static boolean canCompareAsString(final DateTime dateTime) && ISOChronology.getInstanceUTC().equals(dateTime.getChronology()); } + /** + * Returns the earlier of the two given dates. When passed a null and a non-null + * date, this method simply returns the non-null value. + */ + public static DateTime earlierOf(DateTime a, DateTime b) + { + // Put nulls last to select the smaller non-null value + if (Objects.compare(a, b, Ordering.natural().nullsLast()) < 0) { + return a; + } else { + return b; + } + } + + /** + * Returns the later of the two given dates. When passed a null and a non-null + * date, this method simply returns the non-null value. + */ + public static DateTime laterOf(DateTime a, DateTime b) + { + // Put nulls first to select the bigger non-null value + if (Objects.compare(a, b, Comparators.naturalNullsFirst()) > 0) { + return a; + } else { + return b; + } + } + private DateTimes() { } diff --git a/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java b/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java index 55cc0b52a94d..867cf3facd48 100644 --- a/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/common/DateTimesTest.java @@ -123,4 +123,34 @@ public void testCanCompareAsString() DateTimes.of("2000").withZone(DateTimes.inferTzFromString("America/Los_Angeles"))) ); } + + @Test + public void testEarlierOf() + { + Assert.assertNull(DateTimes.earlierOf(null, null)); + + final DateTime jan14 = DateTimes.of("2013-01-14"); + Assert.assertEquals(jan14, DateTimes.earlierOf(null, jan14)); + Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, null)); + Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, jan14)); + + final DateTime jan15 = DateTimes.of("2013-01-15"); + Assert.assertEquals(jan14, DateTimes.earlierOf(jan15, jan14)); + Assert.assertEquals(jan14, DateTimes.earlierOf(jan14, jan15)); + } + + @Test + public void testLaterOf() + { + Assert.assertNull(DateTimes.laterOf(null, null)); + + final DateTime jan14 = DateTimes.of("2013-01-14"); + Assert.assertEquals(jan14, DateTimes.laterOf(null, jan14)); + Assert.assertEquals(jan14, DateTimes.laterOf(jan14, null)); + Assert.assertEquals(jan14, DateTimes.laterOf(jan14, jan14)); + + final DateTime jan15 = DateTimes.of("2013-01-15"); + Assert.assertEquals(jan15, DateTimes.laterOf(jan15, jan14)); + Assert.assertEquals(jan15, DateTimes.laterOf(jan14, jan15)); + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 0b39688dbf55..0467c239d93a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -576,6 +576,12 @@ public Builder withSpecificDataSourcesToKillUnusedSegmentsIn(Set dataSou return this; } + public Builder withDatasourcesToNotKillPendingSegmentsIn(Set datasources) + { + this.dataSourcesToNotKillStalePendingSegmentsIn = datasources; + return this; + } + public Builder withKillTaskSlotRatio(Double killTaskSlotRatio) { this.killTaskSlotRatio = killTaskSlotRatio; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java index c2cd2c1fe686..a5d7d48dc78b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java @@ -19,27 +19,29 @@ package org.apache.druid.server.coordinator; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.coordinator.duty.CoordinatorDuty; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; -import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class KillStalePendingSegments implements CoordinatorDuty { private static final Logger log = new Logger(KillStalePendingSegments.class); - private static final Period KEEP_PENDING_SEGMENTS_OFFSET = new Period("P1D"); + private static final Period DURATION_TO_RETAIN = new Period("P1D"); private final OverlordClient overlordClient; @@ -52,55 +54,75 @@ public KillStalePendingSegments(OverlordClient overlordClient) @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { - final List createdTimes = new ArrayList<>(); - - // Include one complete status so we can get the time of the last-created complete task. (The Overlord API returns - // complete tasks in descending order of created_date.) - final List statuses = - ImmutableList.copyOf(FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 1), true)); - createdTimes.add( - statuses - .stream() - .filter(status -> status.getStatusCode() == null || !status.getStatusCode().isComplete()) - .map(TaskStatusPlus::getCreatedTime) - .min(Comparators.naturalNullsFirst()) - .orElse(DateTimes.nowUtc()) // If there are no active tasks, this returns the current time. + final Set killDatasources = new HashSet<>( + params.getUsedSegmentsTimelinesPerDataSource().keySet() + ); + killDatasources.removeAll( + params.getCoordinatorDynamicConfig() + .getDataSourcesToNotKillStalePendingSegmentsIn() ); - final TaskStatusPlus completeTaskStatus = - statuses.stream() - .filter(status -> status != null && status.getStatusCode().isComplete()) - .findFirst() - .orElse(null); - if (completeTaskStatus != null) { - createdTimes.add(completeTaskStatus.getCreatedTime()); + final DateTime maxCreatedTime = getMaxCreatedTimeOfStalePendingSegments(); + for (String dataSource : killDatasources) { + int pendingSegmentsKilled = FutureUtils.getUnchecked( + overlordClient.killPendingSegments( + dataSource, + new Interval(DateTimes.MIN, maxCreatedTime) + ), + true + ); + if (pendingSegmentsKilled > 0) { + log.info( + "Killed [%d] pendingSegments created before [%s] for datasource[%s].", + pendingSegmentsKilled, maxCreatedTime, dataSource + ); + params.getCoordinatorStats().add( + Stats.Kill.PENDING_SEGMENTS, + RowKey.of(Dimension.DATASOURCE, dataSource), + pendingSegmentsKilled + ); + } } - createdTimes.sort(Comparators.naturalNullsFirst()); + return params; + } - // There should be at least one createdTime because the current time is added to the 'createdTimes' list if there - // is no running/pending/waiting tasks. - Preconditions.checkState(!createdTimes.isEmpty(), "Failed to gather createdTimes of tasks"); + /** + * Computes the upper limit on created time of pending segments below which + * they are considered to be stale and can be safely deleted. The limit is + * determined to ensure that the following pending segments are retained for + * at least {@link #DURATION_TO_RETAIN}: + *
    + *
  • Pending segments created by any active task (across all datasources)
  • + *
  • Pending segments created by the latest completed task (across all datasources)
  • + *
+ */ + private DateTime getMaxCreatedTimeOfStalePendingSegments() + { + // Fetch the statuses of all active tasks and the latest completed task + // (The Overlord API returns complete tasks in descending order of created_date.) + final List statuses = ImmutableList.copyOf( + FutureUtils.getUnchecked(overlordClient.taskStatuses(null, null, 1), true) + ); - // If there is no running/pending/waiting/complete tasks, stalePendingSegmentsCutoffCreationTime is - // (DateTimes.nowUtc() - KEEP_PENDING_SEGMENTS_OFFSET). - final DateTime stalePendingSegmentsCutoffCreationTime = createdTimes.get(0).minus(KEEP_PENDING_SEGMENTS_OFFSET); - for (String dataSource : params.getUsedSegmentsTimelinesPerDataSource().keySet()) { - if (!params.getCoordinatorDynamicConfig().getDataSourcesToNotKillStalePendingSegmentsIn().contains(dataSource)) { - final int pendingSegmentsKilled = FutureUtils.getUnchecked( - overlordClient.killPendingSegments( - dataSource, - new Interval(DateTimes.MIN, stalePendingSegmentsCutoffCreationTime) - ), - true + DateTime earliestActiveTaskStart = DateTimes.nowUtc(); + DateTime latestCompletedTaskStart = null; + for (TaskStatusPlus status : statuses) { + if (status.getStatusCode() == null) { + // Unknown status + } else if (status.getStatusCode().isComplete()) { + latestCompletedTaskStart = DateTimes.laterOf( + latestCompletedTaskStart, + status.getCreatedTime() ); - log.info( - "Killed [%d] pendingSegments created until [%s] for dataSource[%s]", - pendingSegmentsKilled, - stalePendingSegmentsCutoffCreationTime, - dataSource + } else { + earliestActiveTaskStart = DateTimes.earlierOf( + earliestActiveTaskStart, + status.getCreatedTime() ); } } - return params; + + return DateTimes.earlierOf(latestCompletedTaskStart, earliestActiveTaskStart) + .minus(DURATION_TO_RETAIN); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index 84f2d471b079..539d58d5594d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -147,6 +147,8 @@ public static class Kill = CoordinatorStat.toDebugAndEmit("killMaxSlots", "killTask/maxSlot/count"); public static final CoordinatorStat SUBMITTED_TASKS = CoordinatorStat.toDebugAndEmit("killTasks", "kill/task/count"); + public static final CoordinatorStat PENDING_SEGMENTS + = CoordinatorStat.toDebugAndEmit("killPendingSegs", "kill/pendingSegments/count"); } public static class Balancer diff --git a/server/src/test/java/org/apache/druid/server/coordinator/KillStalePendingSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/KillStalePendingSegmentsTest.java new file mode 100644 index 000000000000..ebbb4c0a2016 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/KillStalePendingSegmentsTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class KillStalePendingSegmentsTest +{ + private TestOverlordClient overlordClient; + private KillStalePendingSegments killDuty; + + @Before + public void setup() + { + this.overlordClient = new TestOverlordClient(); + this.killDuty = new KillStalePendingSegments(overlordClient); + } + + @Test + public void testRetentionStarts1DayBeforeNowWhenNoKnownTask() + { + DruidCoordinatorRuntimeParams params = createParamsWithDatasources(DS.WIKI).build(); + killDuty.run(params); + + final Interval observedKillInterval = overlordClient.observedKillIntervals.get(DS.WIKI); + Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart()); + + // Verify that the cutoff time is no later than 1 day ago from now + DateTime expectedCutoffTime = DateTimes.nowUtc().minusDays(1); + Assert.assertTrue( + expectedCutoffTime.getMillis() - observedKillInterval.getEnd().getMillis() <= 100 + ); + } + + @Test + public void testRetentionStarts1DayBeforeEarliestActiveTask() + { + final DateTime startOfEarliestActiveTask = DateTimes.of("2023-01-01"); + overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask, TaskState.RUNNING); + overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask.plusHours(2), TaskState.RUNNING); + overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask.plusDays(1), TaskState.RUNNING); + overlordClient.addTaskAndSegment(DS.WIKI, startOfEarliestActiveTask.plusHours(3), TaskState.RUNNING); + + DruidCoordinatorRuntimeParams params = createParamsWithDatasources(DS.WIKI).build(); + killDuty.run(params); + + final Interval observedKillInterval = overlordClient.observedKillIntervals.get(DS.WIKI); + Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart()); + Assert.assertEquals(startOfEarliestActiveTask.minusDays(1), observedKillInterval.getEnd()); + } + + @Test + public void testRetentionStarts1DayBeforeLatestCompletedTask() + { + final DateTime startOfLatestCompletedTask = DateTimes.of("2023-01-01"); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask, TaskState.FAILED); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusHours(2), TaskState.SUCCESS); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusDays(2), TaskState.FAILED); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS); + + DruidCoordinatorRuntimeParams params = createParamsWithDatasources(DS.WIKI).build(); + killDuty.run(params); + + final Interval observedKillInterval = overlordClient.observedKillIntervals.get(DS.WIKI); + Assert.assertEquals(DateTimes.MIN, observedKillInterval.getStart()); + Assert.assertEquals(startOfLatestCompletedTask.minusDays(1), observedKillInterval.getEnd()); + + final CoordinatorRunStats stats = params.getCoordinatorStats(); + Assert.assertEquals(2, stats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of(Dimension.DATASOURCE, DS.WIKI))); + } + + @Test + public void testRetentionStarts1DayBeforeLatestCompletedOrEarliestActiveTask() + { + final DateTime startOfLatestCompletedTask = DateTimes.of("2023-02-01"); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask, TaskState.FAILED); + + final DateTime startOfEarliestActiveTask = DateTimes.of("2023-01-01"); + overlordClient.addTaskAndSegment(DS.KOALA, startOfEarliestActiveTask, TaskState.RUNNING); + + DruidCoordinatorRuntimeParams params = createParamsWithDatasources(DS.WIKI, DS.KOALA).build(); + killDuty.run(params); + + DateTime earliestEligibleTask = DateTimes.earlierOf(startOfEarliestActiveTask, startOfLatestCompletedTask); + final Interval wikiKillInterval = overlordClient.observedKillIntervals.get(DS.WIKI); + Assert.assertEquals(DateTimes.MIN, wikiKillInterval.getStart()); + Assert.assertEquals(earliestEligibleTask.minusDays(1), wikiKillInterval.getEnd()); + + final Interval koalaKillInterval = overlordClient.observedKillIntervals.get(DS.KOALA); + Assert.assertEquals(DateTimes.MIN, koalaKillInterval.getStart()); + Assert.assertEquals(earliestEligibleTask.minusDays(1), wikiKillInterval.getEnd()); + } + + @Test + public void testPendingSegmentOfDisallowedDatasourceIsNotDeleted() + { + DruidCoordinatorRuntimeParams params = + createParamsWithDatasources(DS.WIKI, DS.KOALA).withDynamicConfigs( + CoordinatorDynamicConfig + .builder() + .withDatasourcesToNotKillPendingSegmentsIn( + Collections.singleton(DS.KOALA) + ) + .build() + ).build(); + + DateTime startOfLatestCompletedTask = DateTimes.of("2023-01-01"); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask, TaskState.SUCCESS); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS); + overlordClient.addTaskAndSegment(DS.WIKI, startOfLatestCompletedTask.minusDays(5), TaskState.SUCCESS); + overlordClient.addTaskAndSegment(DS.KOALA, startOfLatestCompletedTask, TaskState.SUCCESS); + overlordClient.addTaskAndSegment(DS.KOALA, startOfLatestCompletedTask.minusDays(3), TaskState.SUCCESS); + overlordClient.addTaskAndSegment(DS.KOALA, startOfLatestCompletedTask.minusDays(5), TaskState.SUCCESS); + + killDuty.run(params); + + // Verify that stale pending segments are killed in "wiki" but not in "koala" + final CoordinatorRunStats stats = params.getCoordinatorStats(); + Assert.assertTrue(overlordClient.observedKillIntervals.containsKey(DS.WIKI)); + Assert.assertEquals(2, stats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of(Dimension.DATASOURCE, DS.WIKI))); + + Assert.assertFalse(overlordClient.observedKillIntervals.containsKey(DS.KOALA)); + Assert.assertEquals(0, stats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of(Dimension.DATASOURCE, DS.KOALA))); + } + + private DruidCoordinatorRuntimeParams.Builder createParamsWithDatasources(String... datasources) + { + DruidCoordinatorRuntimeParams.Builder builder = DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc()); + + // Create a dummy for each of the datasources so that they get added to the timeline + Set usedSegments = new HashSet<>(); + for (String datasource : datasources) { + usedSegments.add( + DataSegment.builder().dataSource(datasource).interval(Intervals.ETERNITY) + .version("v1").shardSpec(new NumberedShardSpec(0, 1)).size(100).build() + ); + } + + return builder.withUsedSegments(usedSegments); + } + + private static class DS + { + static final String WIKI = "wiki"; + static final String KOALA = "koala"; + } + + /** + * Simulates an Overlord with a configurable list of tasks and pending segments. + */ + private static class TestOverlordClient extends NoopOverlordClient + { + private final List taskStatuses = new ArrayList<>(); + private final Map> datasourceToPendingSegments = new HashMap<>(); + + private final Map observedKillIntervals = new HashMap<>(); + + private int taskIdSuffix = 0; + + void addTaskAndSegment(String datasource, DateTime createdTime, TaskState state) + { + taskStatuses.add( + new TaskStatusPlus( + datasource + "__" + taskIdSuffix++, + null, null, createdTime, createdTime, state, + state.isComplete() ? RunnerTaskState.NONE : RunnerTaskState.RUNNING, + 100L, TaskLocation.unknown(), datasource, null + ) + ); + + // Add a pending segment with created time 5 minutes after the task was created + datasourceToPendingSegments.computeIfAbsent(datasource, ds -> new ArrayList<>()) + .add(createdTime.plusMinutes(5)); + } + + @Override + public ListenableFuture> taskStatuses( + @Nullable String state, + @Nullable String dataSource, + @Nullable Integer maxCompletedTasks + ) + { + return Futures.immediateFuture( + CloseableIterators.wrap(taskStatuses.iterator(), null) + ); + } + + @Override + public ListenableFuture killPendingSegments(String dataSource, Interval interval) + { + observedKillIntervals.put(dataSource, interval); + + List pendingSegments = datasourceToPendingSegments.remove(dataSource); + if (pendingSegments == null || pendingSegments.isEmpty()) { + return Futures.immediateFuture(0); + } + + List remainingPendingSegments = new ArrayList<>(); + int numDeletedPendingSegments = 0; + for (DateTime createdTime : pendingSegments) { + if (createdTime.isBefore(interval.getEnd())) { + ++numDeletedPendingSegments; + } else { + remainingPendingSegments.add(createdTime); + } + } + + if (remainingPendingSegments.size() > 0) { + datasourceToPendingSegments.put(dataSource, remainingPendingSegments); + } + + return Futures.immediateFuture(numDeletedPendingSegments); + } + } +} From de993a1e90538407b65b94379d772af8a8eb28ec Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 7 Sep 2023 17:06:54 +0530 Subject: [PATCH 2/4] Minor fixes --- docs/operations/metrics.md | 1 + .../coordinator/KillStalePendingSegments.java | 16 ++++++++-------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 38e68e81c76b..9c9bbd22df32 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -332,6 +332,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina |`killTask/availableSlot/count`| Number of available task slots that can be used for auto kill tasks in the auto kill run. This is the max number of task slots minus any currently running auto kill tasks. | |Varies| |`killTask/maxSlot/count`| Maximum number of task slots available for auto kill tasks in the auto kill run. | |Varies| |`kill/task/count`| Number of tasks issued in the auto kill run. | |Varies| +|`kill/pendingSegments/count`|Number of stale pending segments deleted from the metadata store.|`dataSource`|Varies| |`segment/waitCompact/bytes`|Total bytes of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies| |`segment/waitCompact/count`|Total number of segments of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies| |`interval/waitCompact/count`|Total number of intervals of this datasource waiting to be compacted by the auto compaction (only consider intervals/segments that are eligible for auto compaction).|`dataSource`|Varies| diff --git a/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java index a5d7d48dc78b..842283a8cceb 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java @@ -62,19 +62,19 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) .getDataSourcesToNotKillStalePendingSegmentsIn() ); - final DateTime maxCreatedTime = getMaxCreatedTimeOfStalePendingSegments(); + final DateTime minCreatedTime = getMinCreatedTimeToRetain(); for (String dataSource : killDatasources) { int pendingSegmentsKilled = FutureUtils.getUnchecked( overlordClient.killPendingSegments( dataSource, - new Interval(DateTimes.MIN, maxCreatedTime) + new Interval(DateTimes.MIN, minCreatedTime) ), true ); if (pendingSegmentsKilled > 0) { log.info( "Killed [%d] pendingSegments created before [%s] for datasource[%s].", - pendingSegmentsKilled, maxCreatedTime, dataSource + pendingSegmentsKilled, minCreatedTime, dataSource ); params.getCoordinatorStats().add( Stats.Kill.PENDING_SEGMENTS, @@ -87,16 +87,16 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) } /** - * Computes the upper limit on created time of pending segments below which - * they are considered to be stale and can be safely deleted. The limit is - * determined to ensure that the following pending segments are retained for - * at least {@link #DURATION_TO_RETAIN}: + * Computes the minimum created time of retainable pending segments. Any pending + * segment created before this time is considered stale and can be safely deleted. + * The limit is determined to ensure that the following pending segments are + * retained for at least {@link #DURATION_TO_RETAIN}: *
    *
  • Pending segments created by any active task (across all datasources)
  • *
  • Pending segments created by the latest completed task (across all datasources)
  • *
*/ - private DateTime getMaxCreatedTimeOfStalePendingSegments() + private DateTime getMinCreatedTimeToRetain() { // Fetch the statuses of all active tasks and the latest completed task // (The Overlord API returns complete tasks in descending order of created_date.) From 2ce3c46ebd9fb8350b6ad3b1dd4327c2c094df3a Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 7 Sep 2023 19:02:47 +0530 Subject: [PATCH 3/4] Fix inspection errors --- .../coordinator/CoordinatorDynamicConfig.java | 8 +----- .../{ => duty}/KillStalePendingSegments.java | 27 +++++++++++++------ .../KillStalePendingSegmentsTest.java | 4 ++- .../org/apache/druid/cli/CliCoordinator.java | 2 +- 4 files changed, 24 insertions(+), 17 deletions(-) rename server/src/main/java/org/apache/druid/server/coordinator/{ => duty}/KillStalePendingSegments.java (79%) rename server/src/test/java/org/apache/druid/server/coordinator/{ => duty}/KillStalePendingSegmentsTest.java (98%) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 0467c239d93a..e3ab70cf4492 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -77,19 +77,13 @@ public class CoordinatorDynamicConfig private final Map validDebugDimensions; /** - * Stale pending segments belonging to the data sources in this list are not killed by {@link + * Stale pending segments belonging to the data sources in this list are not killed by {@code * KillStalePendingSegments}. In other words, segments in these data sources are "protected". - *

- * Pending segments are considered "stale" when their created_time is older than {@link - * KillStalePendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now. */ private final Set dataSourcesToNotKillStalePendingSegmentsIn; /** * The maximum number of segments that can be queued for loading to any given server. - * - * @see LoadQueuePeon - * @see org.apache.druid.server.coordinator.rules.LoadRule#run */ private final int maxSegmentsInNodeLoadingQueue; private final boolean pauseCoordination; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java similarity index 79% rename from server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java rename to server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java index 842283a8cceb..da730e20c0ba 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/KillStalePendingSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.server.coordinator; +package org.apache.druid.server.coordinator.duty; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; @@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.rpc.indexing.OverlordClient; -import org.apache.druid.server.coordinator.duty.CoordinatorDuty; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; @@ -38,6 +38,20 @@ import java.util.List; import java.util.Set; +/** + * Duty to kill stale pending segments which are not needed anymore. Pending segments + * are created when appending realtime or batch tasks allocate segments to build + * incremental indexes. Under normal operation, these pending segments get committed + * when the task completes and become regular segments. But in case of task failures, + * some pending segments might be left around and cause clutter in the metadata store. + *

+ * While cleaning up, this duty ensures that the following pending segments are + * retained for at least {@link #DURATION_TO_RETAIN}: + *

    + *
  • Pending segments created by any active task (across all datasources)
  • + *
  • Pending segments created by the latest completed task (across all datasources)
  • + *
+ */ public class KillStalePendingSegments implements CoordinatorDuty { private static final Logger log = new Logger(KillStalePendingSegments.class); @@ -89,12 +103,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) /** * Computes the minimum created time of retainable pending segments. Any pending * segment created before this time is considered stale and can be safely deleted. - * The limit is determined to ensure that the following pending segments are - * retained for at least {@link #DURATION_TO_RETAIN}: - *
    - *
  • Pending segments created by any active task (across all datasources)
  • - *
  • Pending segments created by the latest completed task (across all datasources)
  • - *
+ * The limit is determined to ensure that pending segments created by any active + * task and the latest completed task (across all datasources) are retained for + * at least {@link #DURATION_TO_RETAIN}. */ private DateTime getMinCreatedTimeToRetain() { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/KillStalePendingSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.java similarity index 98% rename from server/src/test/java/org/apache/druid/server/coordinator/KillStalePendingSegmentsTest.java rename to server/src/test/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.java index ebbb4c0a2016..11ea5bd4b57b 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/KillStalePendingSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.server.coordinator; +package org.apache.druid.server.coordinator.duty; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -30,6 +30,8 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 5327f0b3a764..a7883ceeb5f5 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -78,7 +78,6 @@ import org.apache.druid.server.audit.AuditManagerProvider; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; -import org.apache.druid.server.coordinator.KillStalePendingSegments; import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory; import org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategyConfig; import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy; @@ -91,6 +90,7 @@ import org.apache.druid.server.coordinator.duty.KillCompactionConfig; import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata; import org.apache.druid.server.coordinator.duty.KillRules; +import org.apache.druid.server.coordinator.duty.KillStalePendingSegments; import org.apache.druid.server.coordinator.duty.KillSupervisors; import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster; From 25ac1a32f7daa15bc2ca13abef79417460d953db Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 7 Sep 2023 19:23:38 +0530 Subject: [PATCH 4/4] Remove unused import --- .../druid/server/coordinator/CoordinatorDynamicConfig.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index e3ab70cf4492..4f87c311f904 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -27,7 +27,6 @@ import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.error.InvalidInput; import org.apache.druid.server.coordinator.duty.KillUnusedSegments; -import org.apache.druid.server.coordinator.loading.LoadQueuePeon; import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.utils.JvmUtils;