diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java index fbe703a888e2..79435869e7b1 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java @@ -149,7 +149,7 @@ private boolean deleteKeysForBucket( try { deleteObjectsRequest.setKeys(chunkOfKeys); log.info( - "Removing from bucket: [%s] the following index files: [%s] from s3!", + "Deleting the following segment files from S3 bucket[%s]: [%s]", s3Bucket, keysToDeleteStrings ); diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index 5c722530acf8..da30e431e8fe 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -267,6 +267,11 @@ maven-resolver-api 1.3.1 + + org.jdbi + jdbi + test + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java index 84445d3968a5..6c73f664e3fd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java @@ -27,16 +27,23 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; import java.util.Set; import java.util.stream.Collectors; +/** + * Permanently deletes unused segments from the metadata store. + */ public class SegmentNukeAction implements TaskAction { + private static final Logger log = new Logger(SegmentNukeAction.class); + private final Set segments; @JsonCreator @@ -65,22 +72,25 @@ public Void perform(Task task, TaskActionToolbox toolbox) TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments); try { - toolbox.getTaskLockbox().doInCriticalSection( + final Set intervals = segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()); + int numDeletedSegments = toolbox.getTaskLockbox().doInCriticalSection( task, - segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), - CriticalAction.builder() - .onValidLocks( - () -> { - toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments); - return null; - } - ) - .onInvalidLocks( - () -> { - throw new ISE("Some locks for task[%s] are already revoked", task.getId()); - } - ) - .build() + intervals, + CriticalAction.builder().onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments) + ).onInvalidLocks( + () -> { + throw new ISE("Some locks for task[%s] are already revoked", task.getId()); + } + ).build() + ); + + log.info( + "Deleted [%d] segments from metadata store out of requested[%d]," + + " across [%d] intervals[%s], for task[%s] of datasource[%s].", + numDeletedSegments, segments.size(), + intervals.size(), intervals, + task.getId(), task.getDataSource() ); } catch (Exception e) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 06082a988d98..fe58c264ca8b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -212,7 +212,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception int nextBatchSize = computeNextBatchSize(numSegmentsKilled); @Nullable Integer numTotalBatches = getNumTotalBatches(); List unusedSegments; - LOG.info( + logInfo( "Starting kill for datasource[%s] in interval[%s] and versions[%s] with batchSize[%d], up to limit[%d]" + " segments before maxUsedStatusLastUpdatedTime[%s] will be deleted%s", getDataSource(), getInterval(), getVersions(), batchSize, limit, maxUsedStatusLastUpdatedTime, @@ -236,9 +236,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception break; } - unusedSegments = toolbox.getTaskActionClient().submit( - new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), getVersions(), nextBatchSize, maxUsedStatusLastUpdatedTime) - ); + unusedSegments = fetchNextBatchOfUnusedSegments(toolbox, nextBatchSize); // Fetch locks each time as a revokal could have occurred in between batches final NavigableMap> taskLockMap @@ -283,6 +281,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // Nuke Segments taskActionClient.submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); + emitMetric(toolbox.getEmitter(), TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, unusedSegments.size()); // Determine segments to be killed final List segmentsToBeKilled @@ -290,22 +289,27 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception final Set segmentsNotKilled = new HashSet<>(unusedSegments); segmentsToBeKilled.forEach(segmentsNotKilled::remove); - LOG.infoSegments( - segmentsNotKilled, - "Skipping segment kill from deep storage as their load specs are referenced by other segments." - ); + + if (!segmentsNotKilled.isEmpty()) { + LOG.warn( + "Skipping kill of [%d] segments from deep storage as their load specs are used by other segments.", + segmentsNotKilled.size() + ); + } toolbox.getDataSegmentKiller().kill(segmentsToBeKilled); + emitMetric(toolbox.getEmitter(), TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, segmentsToBeKilled.size()); + numBatchesProcessed++; numSegmentsKilled += segmentsToBeKilled.size(); - LOG.info("Processed [%d] batches for kill task[%s].", numBatchesProcessed, getId()); + logInfo("Processed [%d] batches for kill task[%s].", numBatchesProcessed, getId()); nextBatchSize = computeNextBatchSize(numSegmentsKilled); } while (!unusedSegments.isEmpty() && (null == numTotalBatches || numBatchesProcessed < numTotalBatches)); final String taskId = getId(); - LOG.info( + logInfo( "Finished kill task[%s] for dataSource[%s] and interval[%s]." + " Deleted total [%d] unused segments in [%d] batches.", taskId, getDataSource(), getInterval(), numSegmentsKilled, numBatchesProcessed @@ -322,9 +326,8 @@ taskId, getDataSource(), getInterval(), numSegmentsKilled, numBatchesProcessed } @JsonIgnore - @VisibleForTesting @Nullable - Integer getNumTotalBatches() + protected Integer getNumTotalBatches() { return null != limit ? (int) Math.ceil((double) limit / batchSize) : null; } @@ -336,6 +339,31 @@ int computeNextBatchSize(int numSegmentsKilled) return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : batchSize; } + /** + * Fetches the next batch of unused segments that are eligible for kill. + */ + protected List fetchNextBatchOfUnusedSegments(TaskToolbox toolbox, int nextBatchSize) throws IOException + { + return toolbox.getTaskActionClient().submit( + new RetrieveUnusedSegmentsAction( + getDataSource(), + getInterval(), + getVersions(), + nextBatchSize, + maxUsedStatusLastUpdatedTime + ) + ); + } + + /** + * Logs the given info message. Exposed here to allow embedded kill tasks to + * suppress info logs. + */ + protected void logInfo(String message, Object... args) + { + LOG.info(message, args); + } + private NavigableMap> getNonRevokedTaskLockMap(TaskActionClient client) throws IOException { final NavigableMap> taskLockMap = new TreeMap<>(); @@ -385,6 +413,10 @@ private List getKillableSegments( response.getUpgradedToSegmentIds().forEach((parent, children) -> { if (!CollectionUtils.isNullOrEmpty(children)) { // Do not kill segment if its parent or any of its siblings still exist in metadata store + LOG.info( + "Skipping kill of segments[%s] as its load spec is also used by segment IDs[%s].", + parentIdToUnusedSegments.get(parent), children + ); parentIdToUnusedSegments.remove(parent); } }); @@ -402,10 +434,25 @@ private List getKillableSegments( return parentIdToUnusedSegments.values() .stream() .flatMap(Set::stream) - .filter(segment -> !usedSegmentLoadSpecs.contains(segment.getLoadSpec())) + .filter(segment -> !isSegmentLoadSpecPresentIn(segment, usedSegmentLoadSpecs)) .collect(Collectors.toList()); } + /** + * @return true if the load spec of the segment is present in the given set of + * used load specs. + */ + private boolean isSegmentLoadSpecPresentIn( + DataSegment segment, + Set> usedSegmentLoadSpecs + ) + { + boolean isPresent = usedSegmentLoadSpecs.contains(segment.getLoadSpec()); + if (isPresent) { + LOG.info("Skipping kill of segment[%s] as its load spec is also used by other segments.", segment); + } + return isPresent; + } @Override public LookupLoadingSpec getLookupLoadingSpec() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskMetrics.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskMetrics.java new file mode 100644 index 000000000000..86456d12fdc9 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskMetrics.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +/** + * Task-related metrics emitted by the Druid cluster. + */ +public class TaskMetrics +{ + private TaskMetrics() + { + // no instantiation + } + + public static final String RUN_DURATION = "task/run/time"; + + public static final String SEGMENTS_DELETED_FROM_METADATA_STORE = "segment/killed/metadataStore/count"; + public static final String SEGMENTS_DELETED_FROM_DEEPSTORE = "segment/killed/deepStorage/count"; + public static final String FILES_DELETED_FROM_DEEPSTORE = "segment/killed/deepStorageFile/count"; +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java index 341ac0c9326b..8e62113f0c75 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java @@ -32,6 +32,12 @@ public class Tasks public static final int DEFAULT_BATCH_INDEX_TASK_PRIORITY = 50; public static final int DEFAULT_MERGE_TASK_PRIORITY = 25; + /** + * Priority of embedded kill tasks. Kept lower than batch and realtime tasks + * to allow them to preempt embbedded kill tasks. + */ + public static final int DEFAULT_EMBEDDED_KILL_TASK_PRIORITY = 25; + static { Verify.verify(DEFAULT_MERGE_TASK_PRIORITY == DataSourceCompactionConfig.DEFAULT_COMPACTION_TASK_PRIORITY); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 9815bb14a715..99168143f84d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -1232,16 +1232,17 @@ public void remove(final Task task) { giant.lock(); try { - try { - log.info("Removing task[%s] from activeTasks", task.getId()); - cleanupUpgradeAndPendingSegments(task); - unlockAll(task); - } - finally { - activeTasks.remove(task.getId()); + if (!activeTasks.contains(task.getId())) { + return; } + log.info("Removing task[%s] from activeTasks", task.getId()); + cleanupUpgradeAndPendingSegments(task); + unlockAll(task); } finally { + if (task != null) { + activeTasks.remove(task.getId()); + } giant.unlock(); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/KillTaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/KillTaskToolbox.java new file mode 100644 index 000000000000..dd2afe6017d8 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/KillTaskToolbox.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.duty; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; +import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter; +import org.apache.druid.indexer.report.TaskReport; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.loading.DataSegmentKiller; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; + +import java.io.OutputStream; + +/** + * Wrapper over {@link TaskToolbox} used for embedded kill tasks launched by + * {@link UnusedSegmentsKiller}. + */ +public class KillTaskToolbox +{ + /** + * Creates a {@link TaskToolbox} with just enough dependencies to make the + * embedded kill tasks work in {@link UnusedSegmentsKiller}. + */ + static TaskToolbox create( + TaskActionClient taskActionClient, + DataSegmentKiller dataSegmentKiller, + ServiceEmitter emitter + ) + { + final ObjectMapper mapper = DefaultObjectMapper.INSTANCE; + final IndexIO indexIO = new IndexIO(mapper, ColumnConfig.DEFAULT); + + return new TaskToolbox.Builder() + .taskActionClient(taskActionClient) + .dataSegmentKiller(dataSegmentKiller) + .taskReportFileWriter(NoopReportWriter.INSTANCE) + .indexIO(indexIO) + .indexMergerV9(new IndexMergerV9(mapper, indexIO, TmpFileSegmentWriteOutMediumFactory.instance(), false)) + .emitter(emitter) + .build(); + } + + /** + * Noop report writer. + */ + private static class NoopReportWriter extends SingleFileTaskReportFileWriter + { + private static final NoopReportWriter INSTANCE = new NoopReportWriter(); + + private NoopReportWriter() + { + super(null); + } + + @Override + public void setObjectMapper(ObjectMapper objectMapper) + { + // Do nothing + } + + @Override + public void write(String taskId, TaskReport.ReportMap reports) + { + // Do nothing, metrics are emitted by the KillUnusedSegmentsTask itself + } + + @Override + public OutputStream openReportOutputStream(String taskId) + { + throw DruidException.defensive("Cannot write reports using this reporter"); + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java index 9ba4e90f0f66..689d0876894c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java @@ -97,7 +97,12 @@ private void schedule(OverlordDuty duty) initExecutor(); final DutySchedule schedule = duty.getSchedule(); - final String dutyName = duty.getClass().getName(); + final String dutyName = duty.getClass().getSimpleName(); + + if (schedule == null || schedule.getPeriodMillis() <= 0) { + log.info("Not scheduling overlord duty[%s] as it has no period specified.", dutyName); + return; + } ScheduledExecutors.scheduleWithFixedDelay( exec, @@ -108,13 +113,13 @@ private void schedule(OverlordDuty duty) duty.run(); } catch (Exception e) { - log.error(e, "Error while running duty [%s]", dutyName); + log.error(e, "Error while running duty[%s]", dutyName); } } ); log.info( - "Scheduled overlord duty [%s] with initial delay [%d], period [%d].", + "Scheduled overlord duty[%s] with initial delay[%d], period[%d].", dutyName, schedule.getInitialDelayMillis(), schedule.getPeriodMillis() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java new file mode 100644 index 000000000000..bce95ffa2582 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.duty; + +import com.google.common.collect.Ordering; +import com.google.inject.Inject; +import org.apache.druid.client.indexing.IndexingService; +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.discovery.DruidLeaderSelector; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TaskActionClientFactory; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask; +import org.apache.druid.indexing.common.task.TaskMetrics; +import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.TaskLockbox; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.metadata.SegmentsMetadataManagerConfig; +import org.apache.druid.metadata.UnusedSegmentKillerConfig; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.loading.DataSegmentKiller; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + * {@link OverlordDuty} to delete unused segments from metadata store and the + * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments + * of a single datasource-interval. + * + * @see SegmentsMetadataManagerConfig to enable the cleanup + * @see org.apache.druid.server.coordinator.duty.KillUnusedSegments for legacy + * mode of killing unused segments via Coordinator duties + */ +public class UnusedSegmentsKiller implements OverlordDuty +{ + private static final EmittingLogger log = new EmittingLogger(UnusedSegmentsKiller.class); + + private static final String TASK_ID_PREFIX = "overlord-issued"; + + private static final int INITIAL_KILL_QUEUE_SIZE = 1000; + private static final int MAX_INTERVALS_TO_KILL_IN_DATASOURCE = 10_000; + private static final int MAX_SEGMENTS_TO_KILL_IN_INTERVAL = 1000; + + /** + * Period after which the queue is reset even if there are existing jobs in queue. + */ + private static final Duration QUEUE_RESET_PERIOD = Duration.standardDays(1); + + /** + * Duration for which a kill task is allowed to run. + */ + private static final Duration MAX_TASK_DURATION = Duration.standardMinutes(10); + + private final ServiceEmitter emitter; + private final TaskLockbox taskLockbox; + private final DruidLeaderSelector leaderSelector; + private final DataSegmentKiller dataSegmentKiller; + + private final UnusedSegmentKillerConfig killConfig; + private final TaskActionClientFactory taskActionClientFactory; + private final IndexerMetadataStorageCoordinator storageCoordinator; + + /** + * Single-threaded executor to process kill jobs. + */ + private final ScheduledExecutorService exec; + private int previousLeaderTerm; + private final AtomicReference lastResetTime = new AtomicReference<>(null); + + private final AtomicReference currentTaskInfo = new AtomicReference<>(null); + + /** + * Queue of kill candidates. Use a PriorityBlockingQueue to ensure thread-safety + * since this queue is accessed by both {@link #run()} and {@link #startNextJobInKillQueue}. + */ + private final PriorityBlockingQueue killQueue; + + @Inject + public UnusedSegmentsKiller( + SegmentsMetadataManagerConfig config, + TaskActionClientFactory taskActionClientFactory, + IndexerMetadataStorageCoordinator storageCoordinator, + @IndexingService DruidLeaderSelector leaderSelector, + ScheduledExecutorFactory executorFactory, + DataSegmentKiller dataSegmentKiller, + TaskLockbox taskLockbox, + ServiceEmitter emitter + ) + { + this.emitter = emitter; + this.taskLockbox = taskLockbox; + this.leaderSelector = leaderSelector; + this.dataSegmentKiller = dataSegmentKiller; + this.storageCoordinator = storageCoordinator; + this.taskActionClientFactory = taskActionClientFactory; + + this.killConfig = config.getKillUnused(); + + if (isEnabled()) { + this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s"); + this.killQueue = new PriorityBlockingQueue<>( + INITIAL_KILL_QUEUE_SIZE, + Ordering.from(Comparators.intervalsByEndThenStart()) + .onResultOf(candidate -> candidate.interval) + ); + } else { + this.exec = null; + this.killQueue = null; + } + } + + @Override + public boolean isEnabled() + { + return killConfig.isEnabled(); + } + + /** + * Ensures that things are moving along and the kill queue is not stuck. + * Updates the state if leadership changes or if the queue needs to be reset. + */ + @Override + public void run() + { + if (!isEnabled()) { + return; + } + + updateStateIfNewLeader(); + if (shouldRebuildKillQueue()) { + // Clear the killQueue to stop further processing of already queued jobs + killQueue.clear(); + exec.submit(() -> { + rebuildKillQueue(); + startNextJobInKillQueue(); + }); + } + + // Cancel the current task if it has been running for too long + final TaskInfo taskInfo = currentTaskInfo.get(); + if (taskInfo != null && !taskInfo.future.isDone() + && taskInfo.sinceTaskStarted.hasElapsed(MAX_TASK_DURATION)) { + log.warn( + "Cancelling kill task[%s] as it has been running for [%,d] millis.", + taskInfo.taskId, taskInfo.sinceTaskStarted.millisElapsed() + ); + taskInfo.future.cancel(true); + } + } + + @Override + public DutySchedule getSchedule() + { + if (isEnabled()) { + // Check every hour that the kill queue is being processed normally + log.info("Scheduling is enabled to launch embedded kill tasks."); + return new DutySchedule(Duration.standardHours(1).getMillis(), Duration.standardMinutes(1).getMillis()); + } else { + return new DutySchedule(0, 0); + } + } + + private void updateStateIfNewLeader() + { + final int currentLeaderTerm = leaderSelector.localTerm(); + if (currentLeaderTerm != previousLeaderTerm) { + previousLeaderTerm = currentLeaderTerm; + killQueue.clear(); + lastResetTime.set(null); + } + } + + /** + * Returns true if the kill queue is empty or if the queue has not been reset + * yet or if {@code (lastResetTime + resetPeriod) < (now + 1)}. + */ + private boolean shouldRebuildKillQueue() + { + final DateTime now = DateTimes.nowUtc().plus(1); + + return killQueue.isEmpty() + || lastResetTime.get() == null + || lastResetTime.get().plus(QUEUE_RESET_PERIOD).isBefore(now); + } + + /** + * Clears the kill queue and adds fresh jobs. + * This method need not handle race conditions as it is always run on + * {@link #exec} which is single-threaded. + */ + private void rebuildKillQueue() + { + final Stopwatch resetDuration = Stopwatch.createStarted(); + try { + killQueue.clear(); + if (!leaderSelector.isLeader()) { + log.info("Not rebuilding kill queue as we are not leader anymore."); + return; + } + + final Set dataSources = storageCoordinator.retrieveAllDatasourceNames(); + + final Map dataSourceToIntervalCounts = new HashMap<>(); + for (String dataSource : dataSources) { + storageCoordinator.retrieveUnusedSegmentIntervals(dataSource, MAX_INTERVALS_TO_KILL_IN_DATASOURCE).forEach( + interval -> { + dataSourceToIntervalCounts.merge(dataSource, 1, Integer::sum); + killQueue.offer(new KillCandidate(dataSource, interval)); + } + ); + } + + lastResetTime.set(DateTimes.nowUtc()); + log.info( + "Queued [%d] kill jobs for [%d] datasources in [%d] millis.", + killQueue.size(), dataSources.size(), resetDuration.millisElapsed() + ); + dataSourceToIntervalCounts.forEach( + (dataSource, intervalCount) -> emitMetric( + Metric.UNUSED_SEGMENT_INTERVALS, + intervalCount, + Map.of(DruidMetrics.DATASOURCE, dataSource) + ) + ); + emitMetric(Metric.QUEUE_RESET_TIME, resetDuration.millisElapsed(), null); + } + catch (Throwable t) { + log.makeAlert(t, "Error while resetting kill queue."); + } + } + + /** + * Launches an {@link EmbeddedKillTask} on the {@link #exec} for the next + * {@link KillCandidate} in the {@link #killQueue}. This method returns + * immediately. + */ + private void startNextJobInKillQueue() + { + if (!isEnabled() || !leaderSelector.isLeader()) { + return; + } + + if (killQueue.isEmpty()) { + // If the last entry has been processed, emit the total processing time and exit + final DateTime lastQueueResetTime = lastResetTime.get(); + if (lastQueueResetTime != null) { + long processTimeMillis = DateTimes.nowUtc().getMillis() - lastQueueResetTime.getMillis(); + emitMetric(Metric.QUEUE_PROCESS_TIME, processTimeMillis, null); + } + return; + } + + try { + final KillCandidate candidate = killQueue.poll(); + if (candidate == null) { + return; + } + + final String taskId = IdUtils.newTaskId( + TASK_ID_PREFIX, + KillUnusedSegmentsTask.TYPE, + candidate.dataSource, + candidate.interval + ); + + final Future taskFuture = exec.submit(() -> { + runKillTask(candidate, taskId); + startNextJobInKillQueue(); + }); + currentTaskInfo.set(new TaskInfo(taskId, taskFuture)); + } + catch (Throwable t) { + log.makeAlert(t, "Error while processing kill queue."); + currentTaskInfo.set(null); + } + } + + /** + * Launches an embedded kill task for the given candidate. + */ + private void runKillTask(KillCandidate candidate, String taskId) + { + final Stopwatch taskRunTime = Stopwatch.createStarted(); + final EmbeddedKillTask killTask = new EmbeddedKillTask( + taskId, + candidate, + DateTimes.nowUtc().minus(killConfig.getBufferPeriod()) + ); + + final TaskActionClient taskActionClient = taskActionClientFactory.create(killTask); + final TaskToolbox taskToolbox = KillTaskToolbox.create(taskActionClient, dataSegmentKiller, emitter); + + final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); + IndexTaskUtils.setTaskDimensions(metricBuilder, killTask); + + try { + taskLockbox.add(killTask); + final boolean isReady = killTask.isReady(taskActionClient); + if (!isReady) { + emitter.emit(metricBuilder.setMetric(Metric.SKIPPED_INTERVALS, 1L)); + return; + } + + final TaskStatus status = killTask.runTask(taskToolbox); + + IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status); + emitter.emit(metricBuilder.setMetric(TaskMetrics.RUN_DURATION, taskRunTime.millisElapsed())); + } + catch (Throwable t) { + log.error(t, "Embedded kill task[%s] failed.", killTask.getId()); + + IndexTaskUtils.setTaskStatusDimensions(metricBuilder, TaskStatus.failure(taskId, "Unknown error")); + emitter.emit(metricBuilder.setMetric(TaskMetrics.RUN_DURATION, taskRunTime.millisElapsed())); + } + finally { + cleanupLocksSilently(killTask); + emitMetric(Metric.PROCESSED_KILL_JOBS, 1L, Map.of(DruidMetrics.DATASOURCE, candidate.dataSource)); + } + } + + private void cleanupLocksSilently(EmbeddedKillTask killTask) + { + try { + taskLockbox.remove(killTask); + } + catch (Throwable t) { + log.error(t, "Error while cleaning up locks for kill task[%s].", killTask.getId()); + } + } + + private void emitMetric(String metricName, long value, Map dimensions) + { + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + if (dimensions != null) { + dimensions.forEach(builder::setDimension); + } + emitter.emit(builder.setMetric(metricName, value)); + } + + /** + * Represents a single candidate interval that contains unused segments. + */ + private static class KillCandidate + { + private final String dataSource; + private final Interval interval; + + private KillCandidate(String dataSource, Interval interval) + { + this.dataSource = dataSource; + this.interval = interval; + } + } + + /** + * Info of the currently running task. + */ + private static class TaskInfo + { + private final Stopwatch sinceTaskStarted; + private final String taskId; + private final Future future; + + private TaskInfo(String taskId, Future future) + { + this.future = future; + this.taskId = taskId; + this.sinceTaskStarted = Stopwatch.createStarted(); + } + } + + /** + * Embedded kill task. Unlike other task types, this task is not persisted and + * does not run on a worker or indexer. Hence, it doesn't take up any task slots. + * To ensure that locks are held very briefly over short segment intervals, + * this kill task processes: + *
    + *
  • only 1 unused segment interval
  • + *
  • only 1 batch of upto 1000 unused segments
  • + *
+ */ + private class EmbeddedKillTask extends KillUnusedSegmentsTask + { + private EmbeddedKillTask( + String taskId, + KillCandidate candidate, + DateTime maxUpdatedTimeOfEligibleSegment + ) + { + super( + taskId, + candidate.dataSource, + candidate.interval, + null, + Map.of(Tasks.PRIORITY_KEY, Tasks.DEFAULT_EMBEDDED_KILL_TASK_PRIORITY), + null, + null, + maxUpdatedTimeOfEligibleSegment + ); + } + + @Nullable + @Override + protected Integer getNumTotalBatches() + { + // Do everything in a single batch so that locks are not held for very long + return 1; + } + + @Override + protected List fetchNextBatchOfUnusedSegments(TaskToolbox toolbox, int nextBatchSize) + { + // Kill only 1000 segments in the batch so that locks are not held for very long + return storageCoordinator.retrieveUnusedSegmentsWithExactInterval( + getDataSource(), + getInterval(), + getMaxUsedStatusLastUpdatedTime(), + MAX_SEGMENTS_TO_KILL_IN_INTERVAL + ); + } + + @Override + protected void logInfo(String message, Object... args) + { + // Reduce the level of embedded task info logs to reduce noise on the Overlord + log.debug(message, args); + } + } + + public static class Metric + { + public static final String QUEUE_RESET_TIME = "segment/kill/queueReset/time"; + public static final String QUEUE_PROCESS_TIME = "segment/kill/queueProcess/time"; + public static final String PROCESSED_KILL_JOBS = "segment/kill/jobsProcessed/count"; + + public static final String SKIPPED_INTERVALS = "segment/kill/skippedIntervals/count"; + public static final String UNUSED_SEGMENT_INTERVALS = "segment/kill/unusedIntervals/count"; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index 3a63495928aa..2ddd0632291a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -30,6 +30,8 @@ import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageTablesConfig; @@ -44,7 +46,6 @@ import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; -import org.apache.druid.server.metrics.NoopServiceEmitter; import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.rules.ExternalResource; @@ -55,6 +56,7 @@ public class TaskActionTestKit extends ExternalResource private TaskStorage taskStorage; private TaskLockbox taskLockbox; + private StubServiceEmitter emitter; private TestDerbyConnector testDerbyConnector; private IndexerMetadataStorageCoordinator metadataStorageCoordinator; private TaskActionToolbox taskActionToolbox; @@ -64,6 +66,21 @@ public class TaskActionTestKit extends ExternalResource private boolean useSegmentMetadataCache = false; private boolean skipSegmentPayloadFetchForAllocation = new TaskLockConfig().isBatchAllocationReduceMetadataIO(); + public StubServiceEmitter getServiceEmitter() + { + return emitter; + } + + public TestDerbyConnector getTestDerbyConnector() + { + return testDerbyConnector; + } + + public MetadataStorageTablesConfig getMetadataStorageTablesConfig() + { + return metadataStorageTablesConfig; + } + public TaskLockbox getTaskLockbox() { return taskLockbox; @@ -97,6 +114,7 @@ public void syncSegmentMetadataCache() @Override public void before() { + emitter = new StubServiceEmitter(); taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H"))); testDerbyConnector = new TestDerbyConnector( Suppliers.ofInstance(new MetadataStorageConnectorConfig()), @@ -109,7 +127,7 @@ public void before() testDerbyConnector ); - final SqlSegmentMetadataTransactionFactory transactionFactory = setupTransactionFactory(objectMapper); + final SqlSegmentMetadataTransactionFactory transactionFactory = setupTransactionFactory(objectMapper, emitter); metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator( transactionFactory, objectMapper, @@ -142,10 +160,10 @@ public boolean isBatchAllocationReduceMetadataIO() taskLockbox, taskLockConfig, metadataStorageCoordinator, - NoopServiceEmitter.instance(), + emitter, ScheduledExecutors::fixed ), - NoopServiceEmitter.instance(), + emitter, EasyMock.createMock(SupervisorManager.class), objectMapper ); @@ -163,7 +181,10 @@ public boolean isBatchAllocationReduceMetadataIO() syncSegmentMetadataCache(); } - private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMapper objectMapper) + private SqlSegmentMetadataTransactionFactory setupTransactionFactory( + ObjectMapper objectMapper, + ServiceEmitter emitter + ) { metadataCachePollExec = new BlockingExecutorService("test-cache-poll-exec"); SegmentMetadataCache.UsageMode cacheMode @@ -173,12 +194,12 @@ private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMappe segmentMetadataCache = new HeapMemorySegmentMetadataCache( objectMapper, - Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)), + Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode, null)), Suppliers.ofInstance(metadataStorageTablesConfig), new NoopSegmentSchemaCache(), testDerbyConnector, (poolSize, name) -> new WrappingScheduledExecutorService(name, metadataCachePollExec, false), - NoopServiceEmitter.instance() + emitter ); final TestDruidLeaderSelector leaderSelector = new TestDruidLeaderSelector(); @@ -190,7 +211,7 @@ private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMappe testDerbyConnector, leaderSelector, segmentMetadataCache, - NoopServiceEmitter.instance() + emitter ) { @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 848a2bb91ce5..9972340ae0cb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -181,7 +181,7 @@ public void setUpIngestionTestBase() throws IOException segmentMetadataCache, segmentSchemaCache, derbyConnectorRule.getConnector(), - () -> new SegmentsMetadataManagerConfig(null, null), + () -> new SegmentsMetadataManagerConfig(null, null, null), derbyConnectorRule.metadataTablesConfigSupplier(), CentralizedDatasourceSchemaConfig::create, NoopServiceEmitter.instance(), @@ -323,7 +323,7 @@ private SqlSegmentMetadataTransactionFactory createTransactionFactory() : SegmentMetadataCache.UsageMode.NEVER; segmentMetadataCache = new HeapMemorySegmentMetadataCache( objectMapper, - Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), cacheMode)), + Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), cacheMode, null)), derbyConnectorRule.metadataTablesConfigSupplier(), segmentSchemaCache, derbyConnectorRule.getConnector(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java index 00169b0a955d..bc6be505c62d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java @@ -36,11 +36,11 @@ public void testStartAndStop() { OverlordDuty testDuty1 = Mockito.mock(OverlordDuty.class); Mockito.when(testDuty1.isEnabled()).thenReturn(true); - Mockito.when(testDuty1.getSchedule()).thenReturn(new DutySchedule(0, 0)); + Mockito.when(testDuty1.getSchedule()).thenReturn(new DutySchedule(1, 0)); OverlordDuty testDuty2 = Mockito.mock(OverlordDuty.class); Mockito.when(testDuty2.isEnabled()).thenReturn(true); - Mockito.when(testDuty2.getSchedule()).thenReturn(new DutySchedule(0, 0)); + Mockito.when(testDuty2.getSchedule()).thenReturn(new DutySchedule(1, 0)); ScheduledExecutorFactory executorFactory = Mockito.mock(ScheduledExecutorFactory.class); ScheduledExecutorService executorService = Mockito.mock(ScheduledExecutorService.class); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java new file mode 100644 index 000000000000..64a551fdee48 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.duty; + +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.actions.LocalTaskActionClient; +import org.apache.druid.indexing.common.actions.TaskActionTestKit; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.TaskMetrics; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.TaskLockbox; +import org.apache.druid.indexing.overlord.TimeChunkLockRequest; +import org.apache.druid.indexing.test.TestDataSegmentKiller; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.metadata.SegmentsMetadataManagerConfig; +import org.apache.druid.metadata.UnusedSegmentKillerConfig; +import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.TestDataSource; +import org.apache.druid.server.coordinator.CreateDataSegments; +import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; +import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; +import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Duration; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class UnusedSegmentsKillerTest +{ + @Rule + public TaskActionTestKit taskActionTestKit = new TaskActionTestKit(); + + private static final List WIKI_SEGMENTS_1X10D = + CreateDataSegments.ofDatasource(TestDataSource.WIKI) + .forIntervals(10, Granularities.DAY) + .eachOfSize(500); + + private StubServiceEmitter emitter; + private UnusedSegmentsKiller killer; + private BlockingExecutorService killExecutor; + private UnusedSegmentKillerConfig killerConfig; + private TestDruidLeaderSelector leaderSelector; + private TestDataSegmentKiller dataSegmentKiller; + private IndexerMetadataStorageCoordinator storageCoordinator; + + @Before + public void setup() + { + emitter = taskActionTestKit.getServiceEmitter(); + leaderSelector = new TestDruidLeaderSelector(); + dataSegmentKiller = new TestDataSegmentKiller(); + killerConfig = new UnusedSegmentKillerConfig(true, Period.ZERO); + killExecutor = new BlockingExecutorService("UnusedSegmentsKillerTest-%s"); + storageCoordinator = taskActionTestKit.getMetadataStorageCoordinator(); + initKiller(); + } + + private void initKiller() + { + killer = new UnusedSegmentsKiller( + new SegmentsMetadataManagerConfig( + null, + SegmentMetadataCache.UsageMode.ALWAYS, + killerConfig + ), + task -> new LocalTaskActionClient(task, taskActionTestKit.getTaskActionToolbox()), + storageCoordinator, + leaderSelector, + (corePoolSize, nameFormat) -> new WrappingScheduledExecutorService(nameFormat, killExecutor, true), + dataSegmentKiller, + taskActionTestKit.getTaskLockbox(), + taskActionTestKit.getServiceEmitter() + ); + } + + private void finishQueuedKillJobs() + { + killExecutor.finishAllPendingTasks(); + } + + @Test + public void test_getSchedule_returnsOneHourPeriod_ifEnabled() + { + final DutySchedule schedule = killer.getSchedule(); + Assert.assertEquals(Duration.standardHours(1).getMillis(), schedule.getPeriodMillis()); + Assert.assertEquals(Duration.standardMinutes(1).getMillis(), schedule.getInitialDelayMillis()); + } + + @Test + public void test_getSchedule_returnsZeroPeriod_ifDisabled() + { + killerConfig = new UnusedSegmentKillerConfig(false, null); + initKiller(); + + final DutySchedule schedule = killer.getSchedule(); + Assert.assertEquals(0, schedule.getPeriodMillis()); + Assert.assertEquals(0, schedule.getInitialDelayMillis()); + } + + @Test + public void test_run_startsProcessing_ifEnabled() + { + Assert.assertFalse(killExecutor.hasPendingTasks()); + Assert.assertTrue(killer.isEnabled()); + + killer.run(); + Assert.assertTrue(killExecutor.hasPendingTasks()); + } + + @Test + public void test_run_isNoop_ifDisabled() + { + killerConfig = new UnusedSegmentKillerConfig(false, null); + initKiller(); + + Assert.assertFalse(killer.isEnabled()); + + killer.run(); + Assert.assertFalse(killExecutor.hasPendingTasks()); + } + + @Test + public void test_run_doesNotProcessSegments_ifNotLeader() + { + storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null); + storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI); + + leaderSelector.becomeLeader(); + killer.run(); + + leaderSelector.stopBeingLeader(); + + Assert.assertTrue(killExecutor.hasPendingTasks()); + + finishQueuedKillJobs(); + emitter.verifyNotEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS); + Assert.assertFalse(killExecutor.hasPendingTasks()); + } + + @Test + public void test_run_launchesEmbeddedKillTasks_ifLeader() + { + leaderSelector.becomeLeader(); + + storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null); + storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI); + + // Reset the queue and verify that kill jobs have been added to the queue + killer.run(); + Assert.assertTrue(killExecutor.hasPendingTasks()); + emitter.verifyNotEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS); + + finishQueuedKillJobs(); + emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10); + + emitter.verifyEmitted(UnusedSegmentsKiller.Metric.QUEUE_PROCESS_TIME, 1); + emitter.verifyEmitted(TaskMetrics.RUN_DURATION, 10); + + emitter.verifyEmitted(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10); + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10L); + + emitter.verifyEmitted(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 10); + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 10L); + + Assert.assertTrue( + retrieveUnusedSegments(Intervals.ETERNITY).isEmpty() + ); + } + + @Test + public void test_maxSegmentsKilledInAnInterval_is_1k() + { + leaderSelector.becomeLeader(); + + final List segments = + CreateDataSegments.ofDatasource(TestDataSource.WIKI) + .forIntervals(1, Granularities.DAY) + .withNumPartitions(2000) + .eachOfSizeInMb(50); + + storageCoordinator.commitSegments(Set.copyOf(segments), null); + storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI); + + Assert.assertEquals( + 2000, + retrieveUnusedSegments(segments.get(0).getInterval()).size() + ); + + // Reset the kill queue and execute kill tasks + killer.run(); + finishQueuedKillJobs(); + + // Verify that a single kill task has run which killed 1k segments + emitter.verifyEmitted(TaskMetrics.RUN_DURATION, 1); + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 1000L); + + Assert.assertEquals( + 1000, + retrieveUnusedSegments(segments.get(0).getInterval()).size() + ); + } + + @Test(timeout = 20_000L) + public void test_maxIntervalsKilledInADatasource_is_10k() + { + leaderSelector.becomeLeader(); + + final List segments = + CreateDataSegments.ofDatasource(TestDataSource.WIKI) + .forIntervals(20_000, Granularities.DAY) + .eachOfSizeInMb(50); + + storageCoordinator.commitSegments(Set.copyOf(segments), null); + storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI); + + Assert.assertEquals( + 20_000, + retrieveUnusedSegments(Intervals.ETERNITY).size() + ); + + // Reset the kill queue and execute kill tasks + killer.run(); + finishQueuedKillJobs(); + + // Verify that 10k kill tasks have run, each killing a single segment + emitter.verifyEmitted(TaskMetrics.RUN_DURATION, 10000); + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10_000L); + + Assert.assertEquals( + 10_000, + retrieveUnusedSegments(Intervals.ETERNITY).size() + ); + } + + @Test + public void test_run_resetsQueue_ifLeadershipIsReacquired() + { + leaderSelector.becomeLeader(); + + // Verify that the queue has been reset + killer.run(); + finishQueuedKillJobs(); + emitter.verifyEmitted(UnusedSegmentsKiller.Metric.QUEUE_RESET_TIME, 1); + emitter.verifyNotEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS); + + storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null); + storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI); + + // Lose and reacquire leadership + leaderSelector.stopBeingLeader(); + leaderSelector.becomeLeader(); + + // Run again and verify that queue has been reset + emitter.flush(); + killer.run(); + finishQueuedKillJobs(); + emitter.verifyEmitted(UnusedSegmentsKiller.Metric.QUEUE_RESET_TIME, 1); + emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10); + } + + @Test + public void test_run_doesNotResetQueue_ifThereArePendingJobs_andLastRunWasLessThanOneDayAgo() + { + leaderSelector.becomeLeader(); + + storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null); + storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI); + + // Invoke run, reset the queue and process only some of the jobs + killer.run(); + killExecutor.finishNextPendingTasks(6); + emitter.verifyEmitted(UnusedSegmentsKiller.Metric.QUEUE_RESET_TIME, 1); + emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 5); + + Assert.assertTrue(killExecutor.hasPendingTasks()); + + // Invoke run again and verify that queue has not been reset + emitter.flush(); + killer.run(); + finishQueuedKillJobs(); + emitter.verifyNotEmitted(UnusedSegmentsKiller.Metric.QUEUE_RESET_TIME); + emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 5); + + // All jobs have been processed + Assert.assertFalse(killExecutor.hasPendingTasks()); + } + + @Test + public void test_run_prioritizesOlderIntervals() + { + leaderSelector.becomeLeader(); + + storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null); + storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI); + + killer.run(); + finishQueuedKillJobs(); + emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10); + + // Verify that the kill intervals are sorted with the oldest interval first + final List events = + emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION); + final List killIntervals = events.stream().map(event -> { + final String taskId = (String) event.getUserDims().get(DruidMetrics.TASK_ID); + String[] splits = taskId.split("_"); + return Intervals.of(splits[4] + "/" + splits[5]); + }).collect(Collectors.toList()); + + Assert.assertEquals(10, killIntervals.size()); + + final List expectedIntervals = + WIKI_SEGMENTS_1X10D.stream() + .map(DataSegment::getInterval) + .sorted(Comparators.intervalsByEndThenStart()) + .collect(Collectors.toList()); + Assert.assertEquals(expectedIntervals, killIntervals); + } + + @Test + public void test_run_doesNotDeleteSegmentFiles_ifLoadSpecIsUsedByAnotherSegment() + { + storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null); + storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI); + + // Add a new segment upgraded from one of the unused segments + final DataSegment upgradedSegment1 = WIKI_SEGMENTS_1X10D.get(0).withVersion("v2"); + final DataSegment upgradedSegment2 = WIKI_SEGMENTS_1X10D.get(1).withVersion("v2"); + storageCoordinator.commitSegments(Set.of(upgradedSegment1, upgradedSegment2), null); + + leaderSelector.becomeLeader(); + killer.run(); + + // Verify that all unused segments are deleted from metadata store but the + // ones with used load specs are not deleted from the deep store + finishQueuedKillJobs(); + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10L); + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 8L); + } + + @Test + public void test_run_doesNotKillSegment_ifUpdatedWithinBufferPeriod() + { + killerConfig = new UnusedSegmentKillerConfig(true, Period.hours(1)); + initKiller(); + + storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null); + storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI); + + leaderSelector.becomeLeader(); + killer.run(); + finishQueuedKillJobs(); + + // Verify that tasks are launched but no segment is killed + emitter.verifyValue(UnusedSegmentsKiller.Metric.UNUSED_SEGMENT_INTERVALS, 10L); + emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10); + emitter.verifyEmitted(TaskMetrics.RUN_DURATION, 10); + + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 0L); + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 0L); + } + + @Test + public void test_run_killsFutureSegment() + { + final List futureSegments = CreateDataSegments + .ofDatasource(TestDataSource.WIKI) + .forIntervals(10, Granularities.DAY) + .startingAt("2050-01-01") + .eachOfSize(500); + storageCoordinator.commitSegments(Set.copyOf(futureSegments), null); + storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI); + + leaderSelector.becomeLeader(); + killer.run(); + finishQueuedKillJobs(); + + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10L); + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 10L); + } + + @Test + public void test_run_skipsLockedIntervals() throws InterruptedException + { + storageCoordinator.commitSegments(Set.copyOf(WIKI_SEGMENTS_1X10D), null); + storageCoordinator.markAllSegmentsAsUnused(TestDataSource.WIKI); + + // Lock up some of the segment intervals + final Interval lockedInterval = new Interval( + WIKI_SEGMENTS_1X10D.get(0).getInterval().getStart(), + WIKI_SEGMENTS_1X10D.get(4).getInterval().getEnd() + ); + + final Task ingestionTask = new NoopTask(null, null, TestDataSource.WIKI, 0L, 0L, null); + final TaskLockbox taskLockbox = taskActionTestKit.getTaskLockbox(); + + try { + taskLockbox.add(ingestionTask); + taskLockbox.lock( + ingestionTask, + new TimeChunkLockRequest(TaskLockType.APPEND, ingestionTask, lockedInterval, null) + ); + + leaderSelector.becomeLeader(); + killer.run(); + finishQueuedKillJobs(); + + // Verify that unused segments from locked intervals are not killed + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 5L); + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 5L); + emitter.verifySum(UnusedSegmentsKiller.Metric.SKIPPED_INTERVALS, 5L); + } + finally { + taskLockbox.remove(ingestionTask); + } + + // Do another run to clean up the rest of the segments + killer.run(); + finishQueuedKillJobs(); + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_METADATA_STORE, 10L); + emitter.verifySum(TaskMetrics.SEGMENTS_DELETED_FROM_DEEPSTORE, 10L); + } + + private List retrieveUnusedSegments(Interval interval) + { + return storageCoordinator.retrieveUnusedSegmentsForInterval( + TestDataSource.WIKI, + interval, + null, + null + ); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index bd8d6481ba68..d0e639db2c22 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -56,6 +56,29 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto private int deleteSegmentsCount = 0; + @Override + public Set retrieveAllDatasourceNames() + { + return Set.of(); + } + + @Override + public List retrieveUnusedSegmentIntervals(String dataSource, int limit) + { + return List.of(); + } + + @Override + public List retrieveUnusedSegmentsWithExactInterval( + String dataSource, + Interval interval, + DateTime maxUpdatedTime, + int limit + ) + { + return List.of(); + } + @Override public DataSourceMetadata retrieveDataSourceMetadata(String dataSource) { @@ -80,12 +103,6 @@ public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata da return false; } - @Override - public Set retrieveAllDatasourceNames() - { - return Set.of(); - } - @Override public Set retrieveAllUsedSegments(String dataSource, Segments visibility) { @@ -286,10 +303,11 @@ public int deletePendingSegments(String dataSource) } @Override - public void deleteSegments(Set segments) + public int deleteSegments(Set segments) { deleteSegmentsCount++; nuked.addAll(segments); + return segments.size(); } @Override diff --git a/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java b/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java index a877d2bf7b1a..750f9090c26f 100644 --- a/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java +++ b/processing/src/main/java/org/apache/druid/indexer/report/KillTaskReport.java @@ -56,7 +56,7 @@ public String getReportKey() @Override @JsonProperty - public Object getPayload() + public Stats getPayload() { return stats; } diff --git a/processing/src/main/java/org/apache/druid/segment/loading/SegmentKillResult.java b/processing/src/main/java/org/apache/druid/segment/loading/SegmentKillResult.java new file mode 100644 index 000000000000..ff3c3b8a98b4 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/loading/SegmentKillResult.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import java.util.List; + +/** + * Result of killing data segments using {@link DataSegmentKiller}. + */ +public class SegmentKillResult +{ + private static final SegmentKillResult EMPTY_INSTANCE = new SegmentKillResult(List.of()); + + public static SegmentKillResult empty() + { + return EMPTY_INSTANCE; + } + + private final List deletedPaths; + + public SegmentKillResult( + List deletedPaths + ) + { + this.deletedPaths = deletedPaths; + } + + public List getDeletedPaths() + { + return deletedPaths; + } +} diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java b/processing/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java index 98e538e3b068..5ce60dcfaebe 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java @@ -22,6 +22,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.junit.Assert; +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -50,7 +51,7 @@ default void verifyEmitted(String metricName, int times) * Verifies that the metric was emitted for the given dimension filters the * expected number of times. */ - default void verifyEmitted(String metricName, Map dimensionFilters, int times) + default void verifyEmitted(String metricName, @Nullable Map dimensionFilters, int times) { Assert.assertEquals( StringUtils.format("Metric [%s] was emitted unexpected number of times.", metricName), @@ -71,7 +72,7 @@ default void verifyValue(String metricName, Number expectedValue) * Verifies the value of the event corresponding to the specified metric and * dimensionFilters emitted in the previous run. */ - default void verifyValue(String metricName, Map dimensionFilters, Number expectedValue) + default void verifyValue(String metricName, @Nullable Map dimensionFilters, Number expectedValue) { Assert.assertEquals(expectedValue, getValue(metricName, dimensionFilters)); } @@ -80,7 +81,7 @@ default void verifyValue(String metricName, Map dimensionFilters * Gets the value of the event corresponding to the specified metric and * dimensionFilters. */ - default Number getValue(String metricName, Map dimensionFilters) + default Number getValue(String metricName, @Nullable Map dimensionFilters) { List values = getMetricValues(metricName, dimensionFilters); Assert.assertEquals( @@ -91,9 +92,36 @@ default Number getValue(String metricName, Map dimensionFilters) return values.get(0); } + /** + * Gets the sum of values of all events corresponding to the specified metric. + */ + default void verifySum(String metricName, long expectedSum) + { + verifySum(metricName, null, expectedSum); + } + + /** + * Gets the sum of values of all events corresponding to the specified metric + * and dimensionFilters. + */ + default void verifySum(String metricName, @Nullable Map dimensionFilters, long expectedSum) + { + long observedSum = getMetricValues(metricName, dimensionFilters) + .stream() + .mapToLong(Number::longValue) + .sum(); + Assert.assertEquals( + StringUtils.format( + "Unexpected sum[%s] of metric[%s] with filters[%s]", + observedSum, metricName, dimensionFilters + ), + expectedSum, observedSum + ); + } + /** * Gets the metric values for the specified dimension filters. */ - List getMetricValues(String metricName, Map dimensionFilters); + List getMetricValues(String metricName, @Nullable Map dimensionFilters); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index db62686cd594..13da5aa3ac86 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -150,6 +150,26 @@ List retrieveUnusedSegmentsForInterval( @Nullable DateTime maxUsedStatusLastUpdatedTime ); + /** + * Retrieves unused segments from the metadata store that match the given + * interval exactly. There is no guarantee on the order of segments in the list + * or on whether the limited list contains the highest or lowest segment IDs + * in the interval. + * + * @param interval Returned segments must exactly match this interval. + * @param maxUpdatedTime Returned segments must have a {@code used_status_last_updated} + * which is either null or earlier than this value. + * @param limit Maximum number of segments to return. + * + * @return Unsorted list of unused segments that match the given parameters. + */ + List retrieveUnusedSegmentsWithExactInterval( + String dataSource, + Interval interval, + DateTime maxUpdatedTime, + int limit + ); + /** * Retrieves segments for the given IDs, regardless of their visibility * (visible, overshadowed or unused). @@ -456,7 +476,12 @@ SegmentPublishResult commitMetadataOnly( void updateSegmentMetadata(Set segments); - void deleteSegments(Set segments); + /** + * Deletes unused segments from the metadata store. + * + * @return Number of segments actually deleted. + */ + int deleteSegments(Set segments); /** * Retrieve the segment for a given id from the metadata store. Return null if no such segment exists @@ -556,6 +581,15 @@ List getUnusedSegmentIntervals( DateTime maxUsedStatusLastUpdatedTime ); + /** + * Retrieves intervals of the specified datasource that contain any unused segments. + * There is no guarantee on the order of intervals in the list or on whether + * the limited list contains the earliest or latest intervals of the datasource. + * + * @return Unsorted list of unused segment intervals containing upto {@code limit} entries. + */ + List retrieveUnusedSegmentIntervals(String dataSource, int limit); + /** * Returns the number of segment entries in the database whose state was changed as the result of this call (that is, * the segments were marked as used). If the call results in a database error, an exception is relayed to the caller. diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 8ede92dc9c5f..722b3a8155e9 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -48,7 +48,6 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.metadata.segment.DatasourceSegmentMetadataWriter; import org.apache.druid.metadata.segment.SegmentMetadataReadTransaction; import org.apache.druid.metadata.segment.SegmentMetadataTransaction; import org.apache.druid.metadata.segment.SegmentMetadataTransactionFactory; @@ -158,6 +157,14 @@ public Set retrieveAllDatasourceNames() ); } + @Override + public List retrieveUnusedSegmentIntervals(String dataSource, int limit) + { + return inReadOnlyTransaction( + sql -> sql.retrieveUnusedSegmentIntervals(dataSource, limit) + ); + } + @Override public Set retrieveUsedSegmentsForIntervals( final String dataSource, @@ -225,7 +232,8 @@ public List retrieveUnusedSegmentsForInterval( { final List matchingSegments = inReadOnlyDatasourceTransaction( dataSource, - transaction -> transaction.findUnusedSegments( + transaction -> transaction.noCacheSql().findUnusedSegments( + dataSource, interval, versions, limit, @@ -241,6 +249,24 @@ public List retrieveUnusedSegmentsForInterval( return matchingSegments; } + @Override + public List retrieveUnusedSegmentsWithExactInterval( + String dataSource, + Interval interval, + DateTime maxUpdatedTime, + int limit + ) + { + return inReadOnlyTransaction( + sql -> sql.retrieveUnusedSegmentsWithExactInterval( + dataSource, + interval, + maxUpdatedTime, + limit + ) + ); + } + @Override public Set retrieveSegmentsById(String dataSource, Set segmentIds) { @@ -1574,7 +1600,8 @@ private SegmentIdWithShardSpec getTrueAllocatedId( } // If yes, try to compute allocated partition num using the max unused segment shard spec - SegmentId unusedMaxId = transaction.findHighestUnusedSegmentId( + SegmentId unusedMaxId = transaction.noCacheSql().retrieveHighestUnusedSegmentId( + allocatedId.getDataSource(), allocatedId.getInterval(), allocatedId.getVersion() ); @@ -1617,7 +1644,7 @@ public int deletePendingSegments(String dataSource) { return inReadWriteDatasourceTransaction( dataSource, - DatasourceSegmentMetadataWriter::deleteAllPendingSegments + SegmentMetadataTransaction::deleteAllPendingSegments ); } @@ -2305,11 +2332,11 @@ public void updateSegmentMetadata(final Set segments) } @Override - public void deleteSegments(final Set segments) + public int deleteSegments(final Set segments) { if (segments.isEmpty()) { log.info("No segments to delete."); - return; + return 0; } final String dataSource = verifySegmentsToCommit(segments); @@ -2322,7 +2349,7 @@ public void deleteSegments(final Set segments) ); log.debugSegments(segments, "Delete the metadata of segments"); - log.info("Deleted [%d] segments from metadata storage for dataSource [%s].", numDeletedSegments, dataSource); + return numDeletedSegments; } @Override diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java index f53f31082219..5718a0bdc9ac 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.common.config.Configs; +import org.apache.druid.error.DruidException; import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; import org.joda.time.Period; @@ -39,14 +40,29 @@ public class SegmentsMetadataManagerConfig @JsonProperty private final SegmentMetadataCache.UsageMode useIncrementalCache; + @JsonProperty + private final UnusedSegmentKillerConfig killUnused; + @JsonCreator public SegmentsMetadataManagerConfig( @JsonProperty("pollDuration") Period pollDuration, - @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode useIncrementalCache + @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode useIncrementalCache, + @JsonProperty("killUnused") UnusedSegmentKillerConfig killUnused ) { this.pollDuration = Configs.valueOrDefault(pollDuration, Period.minutes(1)); this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache, SegmentMetadataCache.UsageMode.NEVER); + this.killUnused = Configs.valueOrDefault(killUnused, new UnusedSegmentKillerConfig(null, null)); + if (this.killUnused.isEnabled() && this.useIncrementalCache == SegmentMetadataCache.UsageMode.NEVER) { + throw DruidException + .forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "Segment metadata cache must be enabled to allow killing of unused segments." + + " Set 'druid.manager.segments.useIncrementalCache=always'" + + " or 'druid.manager.segments.useIncrementalCache=ifSynced' to enable the cache." + ); + } } /** @@ -61,4 +77,9 @@ public Period getPollDuration() { return pollDuration; } + + public UnusedSegmentKillerConfig getKillUnused() + { + return killUnused; + } } diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index de2e35813c4d..2779c16da175 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -29,6 +29,7 @@ import com.google.common.collect.UnmodifiableIterator; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.DruidException; +import org.apache.druid.error.InternalServerError; import org.apache.druid.error.InvalidInput; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.DateTimes; @@ -187,8 +188,9 @@ public CloseableIterator retrieveUsedSegmentsPlus( } /** - * Determines the highest ID amongst unused segments for the given datasource, - * interval and version. + * Retrieves the ID of the unused segment that has the highest partition + * number amongst all unused segments that exactly match the given interval + * and version. * * @return null if no unused segment exists for the given parameters. */ @@ -286,6 +288,37 @@ public List iterateAllUnusedSegmentsForDatasource( } } + /** + * Retrieves unused segments that are fully contained within the given interval. + * + * @param interval Returned segments must be fully contained within this + * interval + * @param versions Optional list of segment versions. If passed as null, + * all segment versions are eligible. + * @param limit Maximum number of segments to return. If passed as null, + * all segments are returned. + * @param maxUpdatedTime Returned segments must have a {@code used_status_last_updated} + * which is either null or earlier than this value. + */ + public List findUnusedSegments( + String dataSource, + Interval interval, + @Nullable List versions, + @Nullable Integer limit, + @Nullable DateTime maxUpdatedTime + ) + { + try ( + final CloseableIterator iterator = + retrieveUnusedSegments(dataSource, List.of(interval), versions, limit, null, null, maxUpdatedTime) + ) { + return ImmutableList.copyOf(iterator); + } + catch (IOException e) { + throw InternalServerError.exception(e, "Error while reading unused segments"); + } + } + /** * Retrieves segments for a given datasource that are marked unused and that are fully contained by any interval * in a particular collection of intervals. If the collection of intervals is empty, this method will retrieve all @@ -1027,6 +1060,74 @@ public List retrieveUnusedSegmentIntervals( return unusedIntervals.stream().filter(Objects::nonNull).collect(Collectors.toList()); } + /** + * Gets unused segment intervals for the specified datasource. There is no + * guarantee on the order of intervals in the list or on whether the limited + * list contains the earliest or latest intervals present in the datasource. + * + * @return List of unused segment intervals containing upto {@code limit} entries. + */ + public List retrieveUnusedSegmentIntervals(String dataSource, int limit) + { + final String sql = StringUtils.format( + "SELECT start, %2$send%2$s FROM %1$s" + + " WHERE dataSource = :dataSource AND used = false" + + " GROUP BY start, %2$send%2$s" + + " %3$s", + dbTables.getSegmentsTable(), connector.getQuoteString(), connector.limitClause(limit) + ); + + final List intervals = connector.inReadOnlyTransaction( + (handle, status) -> + handle.createQuery(sql) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("dataSource", dataSource) + .map((index, r, ctx) -> mapToInterval(r, dataSource)) + .list() + ); + + return intervals.stream().filter(Objects::nonNull).collect(Collectors.toList()); + } + + /** + * Retrieves unused segments that exactly match the given interval. + * + * @param interval Returned segments must exactly match this interval. + * @param maxUpdatedTime Returned segments must have a {@code used_status_last_updated} + * which is either null or earlier than this value. + * @param limit Maximum number of segments to return + */ + public List retrieveUnusedSegmentsWithExactInterval( + String dataSource, + Interval interval, + DateTime maxUpdatedTime, + int limit + ) + { + final String sql = StringUtils.format( + "SELECT id, payload FROM %1$s" + + " WHERE dataSource = :dataSource AND used = false" + + " AND %2$send%2$s = :end AND start = :start" + + " AND (used_status_last_updated IS NULL OR used_status_last_updated <= :maxUpdatedTime)" + + " %3$s", + dbTables.getSegmentsTable(), connector.getQuoteString(), connector.limitClause(limit) + ); + + final List segments = connector.inReadOnlyTransaction( + (handle, status) -> + handle.createQuery(sql) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .bind("maxUpdatedTime", maxUpdatedTime.toString()) + .map((index, r, ctx) -> mapToSegment(r)) + .list() + ); + + return segments.stream().filter(Objects::nonNull).collect(Collectors.toList()); + } + /** * Retrieve the used segment for a given id if it exists in the metadata store and null otherwise */ @@ -1608,6 +1709,20 @@ private ResultIterator getDataSegmentPlusResultIterator( }).iterator(); } + @Nullable + private DataSegment mapToSegment(ResultSet resultSet) + { + String segmentId = ""; + try { + segmentId = resultSet.getString("id"); + return JacksonUtils.readValue(jsonMapper, resultSet.getBytes("payload"), DataSegment.class); + } + catch (Throwable t) { + log.error(t, "Could not read segment with ID[%s]", segmentId); + return null; + } + } + private UnmodifiableIterator filterDataSegmentIteratorByInterval( ResultIterator resultIterator, final Collection intervals, diff --git a/server/src/main/java/org/apache/druid/metadata/UnusedSegmentKillerConfig.java b/server/src/main/java/org/apache/druid/metadata/UnusedSegmentKillerConfig.java new file mode 100644 index 000000000000..082b08251a8e --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/UnusedSegmentKillerConfig.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.config.Configs; +import org.joda.time.Period; + +import javax.annotation.Nullable; + +/** + * Config for {@code UnusedSegmentKiller}. This is used only by the Overlord. + * Enabling this config on the Coordinator or other services has no effect. + */ +public class UnusedSegmentKillerConfig +{ + @JsonProperty("enabled") + private final boolean enabled; + + @JsonProperty("bufferPeriod") + private final Period bufferPeriod; + + @JsonCreator + public UnusedSegmentKillerConfig( + @JsonProperty("enabled") @Nullable Boolean enabled, + @JsonProperty("bufferPeriod") @Nullable Period bufferPeriod + ) + { + this.enabled = Configs.valueOrDefault(enabled, false); + this.bufferPeriod = Configs.valueOrDefault(bufferPeriod, Period.days(30)); + } + + /** + * Period for which segments are retained even after being marked as unused. + */ + public Period getBufferPeriod() + { + return bufferPeriod; + } + + public boolean isEnabled() + { + return enabled; + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java b/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java index 3d19d7c7365e..cc949a5dd6e3 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java @@ -22,6 +22,7 @@ import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.error.DruidException; import org.apache.druid.metadata.PendingSegmentRecord; +import org.apache.druid.metadata.SqlSegmentsMetadataQuery; import org.apache.druid.metadata.segment.cache.DatasourceSegmentCache; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.server.http.DataSegmentPlus; @@ -118,6 +119,12 @@ public Handle getHandle() return delegate.getHandle(); } + @Override + public SqlSegmentsMetadataQuery noCacheSql() + { + return delegate.noCacheSql(); + } + @Override public void setRollbackOnly() { diff --git a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java b/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java index cb02f7c8f38a..924a8c3c54b8 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/SegmentMetadataReadTransaction.java @@ -19,6 +19,7 @@ package org.apache.druid.metadata.segment; +import org.apache.druid.metadata.SqlSegmentsMetadataQuery; import org.skife.jdbi.v2.Handle; import java.io.Closeable; @@ -36,6 +37,11 @@ public interface SegmentMetadataReadTransaction */ Handle getHandle(); + /** + * @return SQL tool to read or update the metadata store directly. + */ + SqlSegmentsMetadataQuery noCacheSql(); + /** * Completes the transaction by either committing it or rolling it back. */ diff --git a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransaction.java b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransaction.java index 116832dfe31a..5c9938f8c5a3 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransaction.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransaction.java @@ -96,6 +96,12 @@ public Handle getHandle() return handle; } + @Override + public SqlSegmentsMetadataQuery noCacheSql() + { + return query; + } + @Override public void setRollbackOnly() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index 64d81a066f93..26b34a368458 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -67,10 +67,10 @@ * The datasources to be killed during each cycle are selected from {@link #datasourceCircularKillList}. This state is * refreshed in a run if the set of datasources to be killed changes. Consecutive duplicate datasources are avoided * across runs, provided there are other datasources to be killed. - *

- *

- * See {@link org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}. - *

+ * + * @see org.apache.druid.indexing.common.task.KillUnusedSegmentsTask for details + * of the actual kill task and {@code UnusedSegmentKiller} to run embedded kill + * tasks on the Overlord. */ public class KillUnusedSegments implements CoordinatorDuty { diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java index 48656aa93e96..72ee7a9a1691 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorReadOnlyTest.java @@ -101,7 +101,7 @@ public void setup() cachePollExecutor = new BlockingExecutorService("test-cache-poll-exec"); segmentMetadataCache = new HeapMemorySegmentMetadataCache( mapper, - () -> new SegmentsMetadataManagerConfig(null, cacheMode), + () -> new SegmentsMetadataManagerConfig(null, cacheMode, null), derbyConnectorRule.metadataTablesConfigSupplier(), new NoopSegmentSchemaCache(), derbyConnector, diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 9a1b7ce63edf..db5c23b8d30e 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -151,7 +151,7 @@ public void setUp() segmentMetadataCache = new HeapMemorySegmentMetadataCache( mapper, - () -> new SegmentsMetadataManagerConfig(null, cacheMode), + () -> new SegmentsMetadataManagerConfig(null, cacheMode, null), derbyConnectorRule.metadataTablesConfigSupplier(), new NoopSegmentSchemaCache(), derbyConnector, @@ -1831,6 +1831,99 @@ public void testSimpleUnusedListWithLimit() Assert.assertTrue(SEGMENTS.containsAll(retreivedUnusedSegments)); } + @Test + public void testRetrieveUnusedSegmentsWithExactInterval() + { + final String dataSource = defaultSegment.getDataSource(); + coordinator.commitSegments(Set.of(defaultSegment, defaultSegment2, defaultSegment3), null); + + final DateTime now = DateTimes.nowUtc(); + markAllSegmentsUnused(Set.of(defaultSegment, defaultSegment2, defaultSegment3), now.minusHours(1)); + + // Verify that query for overlapping interval does not return the segments + Assert.assertTrue( + coordinator.retrieveUnusedSegmentsWithExactInterval( + dataSource, + Intervals.ETERNITY, + now, + 10 + ).isEmpty() + ); + + // Verify that query for exact interval returns the segments + Assert.assertEquals( + List.of(defaultSegment3), + coordinator.retrieveUnusedSegmentsWithExactInterval( + dataSource, + defaultSegment3.getInterval(), + now, + 10 + ) + ); + + Assert.assertEquals(defaultSegment.getInterval(), defaultSegment2.getInterval()); + Assert.assertEquals( + Set.of(defaultSegment, defaultSegment2), + Set.copyOf( + coordinator.retrieveUnusedSegmentsWithExactInterval( + dataSource, + defaultSegment.getInterval(), + now, + 10 + ) + ) + ); + + // Verify that query with limit 1 returns only 1 result + Assert.assertEquals( + 1, + coordinator.retrieveUnusedSegmentsWithExactInterval( + dataSource, + defaultSegment.getInterval(), + now, + 1 + ).size() + ); + } + + @Test + public void testRetrieveUnusedSegmentIntervals() + { + final String dataSource = defaultSegment.getDataSource(); + coordinator.commitSegments(Set.of(defaultSegment, defaultSegment3), null); + + Assert.assertTrue(coordinator.retrieveUnusedSegmentIntervals(dataSource, 100).isEmpty()); + + markAllSegmentsUnused(Set.of(defaultSegment), DateTimes.nowUtc().minusHours(1)); + Assert.assertEquals( + List.of(defaultSegment.getInterval()), + coordinator.retrieveUnusedSegmentIntervals(dataSource, 100) + ); + + markAllSegmentsUnused(Set.of(defaultSegment3), DateTimes.nowUtc().minusHours(1)); + Assert.assertEquals( + Set.of(defaultSegment.getInterval(), defaultSegment3.getInterval()), + Set.copyOf(coordinator.retrieveUnusedSegmentIntervals(dataSource, 100)) + ); + + // Verify retrieve with limit 1 returns only 1 interval + Assert.assertEquals( + 1, + coordinator.retrieveUnusedSegmentIntervals(dataSource, 1).size() + ); + } + + @Test + public void testRetrieveAllDatasourceNames() + { + coordinator.commitSegments(Set.of(defaultSegment), null); + coordinator.commitSegments(Set.of(hugeTimeRangeSegment1), null); + Assert.assertEquals( + Set.of("fooDataSource", "hugeTimeRangeDataSource"), + coordinator.retrieveAllDatasourceNames() + ); + } + @Test public void testUsedOverlapLow() { @@ -3742,7 +3835,11 @@ public void testRetrieveUnusedSegmentsForExactIntervalAndVersion() SegmentId highestUnusedId = transactionFactory.inReadWriteDatasourceTransaction( TestDataSource.WIKI, - transaction -> transaction.findHighestUnusedSegmentId(Intervals.of("2024/2025"), "v1") + transaction -> transaction.noCacheSql().retrieveHighestUnusedSegmentId( + TestDataSource.WIKI, + Intervals.of("2024/2025"), + "v1" + ) ); Assert.assertEquals( unusedSegmentForExactIntervalAndVersion.getId(), diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java index fb310430ac38..44812c3acbf2 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerProviderTest.java @@ -46,7 +46,7 @@ public class SqlSegmentsMetadataManagerProviderTest public void testLifecycleStartCreatesSegmentTables() throws Exception { final TestDerbyConnector connector = derbyConnectorRule.getConnector(); - final SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(null, null); + final SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(null, null, null); final Lifecycle lifecycle = new Lifecycle(); final SegmentSchemaCache segmentSchemaCache = new SegmentSchemaCache(); SqlSegmentsMetadataManagerProvider provider = new SqlSegmentsMetadataManagerProvider( diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java index b1d8335a2276..8e7e6a7fc5b0 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java @@ -109,7 +109,7 @@ public void testPollSegmentAndSchema() CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig = CentralizedDatasourceSchemaConfig.enabled(true); - config = new SegmentsMetadataManagerConfig(Period.seconds(3), null); + config = new SegmentsMetadataManagerConfig(Period.seconds(3), null, null); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( jsonMapper, Suppliers.ofInstance(config), @@ -193,7 +193,7 @@ public void testPollOnlyNewSchemaVersion() CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig = CentralizedDatasourceSchemaConfig.enabled(true); - config = new SegmentsMetadataManagerConfig(Period.seconds(3), null); + config = new SegmentsMetadataManagerConfig(Period.seconds(3), null, null); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( jsonMapper, Suppliers.ofInstance(config), diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index 987d93dcd276..9e1ba861e1e6 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -384,7 +384,8 @@ public void testIterateAllUsedNonOvershadowedSegmentsForDatasourceInterval() final Interval theInterval = Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000"); // Re-create SqlSegmentsMetadataManager with a higher poll duration - final SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(1), null); + final SegmentsMetadataManagerConfig config = + new SegmentsMetadataManagerConfig(Period.seconds(1), null, null); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( jsonMapper, Suppliers.ofInstance(config), diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java index c35488c62ebf..bedc41f11e81 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java @@ -72,7 +72,7 @@ public class SqlSegmentsMetadataManagerTestBase protected void setUp(TestDerbyConnector.DerbyConnectorRule derbyConnectorRule) throws Exception { - config = new SegmentsMetadataManagerConfig(Period.seconds(3), null); + config = new SegmentsMetadataManagerConfig(Period.seconds(3), null, null); connector = derbyConnectorRule.getConnector(); storageConfig = derbyConnectorRule.metadataTablesConfigSupplier().get(); diff --git a/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java b/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java index e5554b931eef..08098decdab1 100644 --- a/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java +++ b/server/src/test/java/org/apache/druid/metadata/segment/SqlSegmentsMetadataManagerV2Test.java @@ -88,7 +88,7 @@ private void initManager( segmentMetadataCacheExec = new BlockingExecutorService("test"); SegmentMetadataCache segmentMetadataCache = new HeapMemorySegmentMetadataCache( jsonMapper, - Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)), + Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode, null)), Suppliers.ofInstance(storageConfig), useSchemaCache ? new SegmentSchemaCache() : new NoopSegmentSchemaCache(), connector, diff --git a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java index 2a5940b04951..fab64152ec8a 100644 --- a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java @@ -117,7 +117,7 @@ private void setupTargetWithCaching(SegmentMetadataCache.UsageMode cacheMode, bo throw new ISE("Test target has already been initialized with caching[%s]", cache.isEnabled()); } final SegmentsMetadataManagerConfig metadataManagerConfig - = new SegmentsMetadataManagerConfig(null, cacheMode); + = new SegmentsMetadataManagerConfig(null, cacheMode, null); schemaCache = useSchemaCache ? new SegmentSchemaCache() : new NoopSegmentSchemaCache(); cache = new HeapMemorySegmentMetadataCache( TestHelper.JSON_MAPPER, diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index d1b4cdfb7a61..0dea1be5d1a3 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -132,7 +132,7 @@ public void setUp() throws Exception Mockito.when(segmentsMetadataManager.getRecentDataSourcesSnapshot()) .thenReturn(DataSourcesSnapshot.fromUsedSegments(List.of())); SegmentsMetadataManagerConfig metadataManagerConfig = - new SegmentsMetadataManagerConfig(Period.millis(10), null); + new SegmentsMetadataManagerConfig(Period.millis(10), null, null); segmentsMetadataManagerConfigSupplier = Suppliers.ofInstance(metadataManagerConfig); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java index d84cbcff6efe..d526ccfaf6f9 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java @@ -23,16 +23,21 @@ import javax.annotation.Nullable; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public class TestDruidLeaderSelector implements DruidLeaderSelector { + private final AtomicInteger localTerm = new AtomicInteger(0); private final AtomicBoolean isLeader = new AtomicBoolean(false); private volatile Listener listener; public void becomeLeader() { - if (isLeader.compareAndSet(false, true) && listener != null) { - listener.becomeLeader(); + if (isLeader.compareAndSet(false, true)) { + if (listener != null) { + listener.becomeLeader(); + } + localTerm.incrementAndGet(); } } @@ -59,7 +64,7 @@ public boolean isLeader() @Override public int localTerm() { - return 0; + return localTerm.get(); } @Override diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 95fa654674d9..ef4901c68c35 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -93,6 +93,7 @@ import org.apache.druid.indexing.overlord.duty.OverlordDuty; import org.apache.druid.indexing.overlord.duty.TaskLogAutoCleaner; import org.apache.druid.indexing.overlord.duty.TaskLogAutoCleanerConfig; +import org.apache.druid.indexing.overlord.duty.UnusedSegmentsKiller; import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory; import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerResource; import org.apache.druid.indexing.overlord.http.OverlordCompactionResource; @@ -249,6 +250,8 @@ public void configure(Binder binder) binder.bind(ShuffleClient.class).toProvider(Providers.of(null)); binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(new NoopChatHandlerProvider())); + CliPeon.bindDataSegmentKiller(binder); + PolyBind.createChoice( binder, "druid.indexer.task.rowIngestionMeters.type", @@ -445,9 +448,9 @@ private void configureAutoscale(Binder binder) private void configureOverlordHelpers(Binder binder) { JsonConfigProvider.bind(binder, "druid.indexer.logs.kill", TaskLogAutoCleanerConfig.class); - Multibinder.newSetBinder(binder, OverlordDuty.class) - .addBinding() - .to(TaskLogAutoCleaner.class); + final Multibinder dutyBinder = Multibinder.newSetBinder(binder, OverlordDuty.class); + dutyBinder.addBinding().to(TaskLogAutoCleaner.class); + dutyBinder.addBinding().to(UnusedSegmentsKiller.class).in(LazySingleton.class); } }, new IndexingServiceInputSourceModule(), diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index e9cbab108d39..9cb939f9b899 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -468,14 +468,19 @@ static void bindChatHandler(Binder binder) static void bindPeonDataSegmentHandlers(Binder binder) { // Build it to make it bind even if nothing binds to it. - Binders.dataSegmentKillerBinder(binder); - binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class); + bindDataSegmentKiller(binder); Binders.dataSegmentMoverBinder(binder); binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class); Binders.dataSegmentArchiverBinder(binder); binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class); } + static void bindDataSegmentKiller(Binder binder) + { + Binders.dataSegmentKillerBinder(binder); + binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class); + } + private static void configureTaskActionClient(Binder binder) { binder.bind(TaskActionClientFactory.class)