Add embedded kill tasks that run on the Overlord#18028
Conversation
There was a problem hiding this comment.
Pull Request Overview
Adds embedded segment kill functionality to the Overlord to improve performance and reduce task-slot usage by running kill operations in-process rather than spawning separate tasks.
- Extended metadata queries (
SqlSegmentsMetadataQuery,IndexerSQLMetadataStorageCoordinator,IndexerMetadataStorageCoordinator) to fetch unused segments and intervals with new filter and paging methods. - Enhanced configuration (
SegmentsMetadataManagerConfig) to include a newkillUnusedsection with validation, and updated core interfaces to return deletion counts. - Introduced in-process kill tasks (
UnusedSegmentsKiller,KillTaskToolbox,EmbeddedKillTask) and updated scheduling (OverlordDutyExecutor), locking (TaskLockbox), and reporting (KillTaskReport,TaskMetrics).
Reviewed Changes
Copilot reviewed 37 out of 37 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java | Added methods to query unused segments and intervals; improved docs |
| server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java | Added killUnused config field and validation in constructor |
| server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java | Proxy methods for new SQL queries; updated deleteSegments to return count |
| server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java | Updated interface to support new kill and interval methods |
| processing/src/main/java/org/apache/druid/indexing/overlord/report/KillTaskReport.java | Changed getPayload() return type from Object to Stats |
| indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java | Made remove(...) idempotent and short-circuit when appropriate |
| indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/KillTaskToolbox.java | New toolbox class for embedded kill tasks |
| indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java | Use simple class name for logs and skip zero-period duties |
Comments suppressed due to low confidence (2)
processing/src/main/java/org/apache/druid/indexing/overlord/report/KillTaskReport.java:59
- Changed
getPayload()return type fromObjecttoStats, which may break existing consumers expecting anObject. Consider adding a deprecatedpublic Object getPayload()forwarder or otherwise preserving backward compatibility.
public Stats getPayload()
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:1235
- Returning early in
remove(...)skipscleanupUpgradeAndPendingSegmentsandunlockAll, which can leave segment locks or pending state uncleared. Ensure cleanup and unlock logic always executes before returning.
if (!activeTasks.contains(task.getId())) {
| * @param maxUpdatedTime Returned segments must have a {@code used_status_last_updated} | ||
| * which is either null or earlier than this value. | ||
| */ | ||
| public List<DataSegment> findUnusedSegments( |
There was a problem hiding this comment.
The Javadoc for findUnusedSegments does not include a @param dataSource description. Please add a @param dataSource entry to accurately document the parameter.
|
I have not review the changes carefully, but leave one question first, some kill tasks might take very long time(for example, up to dozens of minutes) to delete segments for large data source, if there're several such kill tasks run together, wil these kill tasks block the overlord's other duties? |
Thanks for the comment, @FrankChen021 ! A couple of clarifications on the above:
Thanks for bringing this up, I will also add the above points to the PR description (and perhaps some points |
|
@kfaraz thanks for the detailed PR description and answer to Franks question! I plan to review in full today (05/28) |
capistrant
left a comment
There was a problem hiding this comment.
I ran out of time before a hard stop at the end of my day so I wasn't able to complete a full review. Publishing my pending comments for now and will finish reviewing tomorrow. I'm overall supportive of the design and think this will be cool.
| updateStateIfNewLeader(); | ||
| if (shouldResetKillQueue()) { | ||
| // Clear the killQueue to stop further processing of already queued jobs | ||
| killQueue.clear(); |
There was a problem hiding this comment.
is this duplicative since resetKillQueue() also calls clear()
There was a problem hiding this comment.
This has two benefits:
- We clear the queue proactively thus stopping further processing right here.
- There is no interleaving between submissions of
startNextJobInKillQueueandresetKillQueuei.e.
at any given point, theexeceither has a singlestartNextJobInKillQueueor a singleresetKillQueue
in its executor queue. This keeps the semantics simple and easy to debug and reason about.
| "Failed while processing kill jobs. There are [%d] jobs in queue.", | ||
| killQueue.size() | ||
| ); | ||
| startNextJobInKillQueue(); |
There was a problem hiding this comment.
Could you explain the reasoning behind behind this calling itself on a caught exception? I'm trying to understand what kind of risks there are in doing this, if any. Could poll continuously throw an exception leaving us in an infinite loop since the queue will never be drawn down to hit the isEmpty conditional. Even if it is legit, a short comment could help folks who are confused by it like I am
There was a problem hiding this comment.
Thanks for pointing this out!
I wanted to continue with the next job in case a specific kill task throws an exception.
But we are already handling that in runKillTask.
I have updated this.
| { | ||
| if (isEnabled()) { | ||
| // Check every hour that the kill queue is being processed normally | ||
| return new DutySchedule(Duration.standardHours(1).getMillis(), Duration.standardMinutes(1).getMillis()); |
There was a problem hiding this comment.
should this be configurable? I ask only because I interpret this duty as if it runs through the 1k kill queue in less than 1 hour won't it be idle until the next run? If the metric for queue processing duration is constantly under an hour for a cluster, the operator may want to increase frequency?
There was a problem hiding this comment.
My reasoning was as follows:
Case 1: We are able to clear the queue within an hour
This implies that there are not too many unused segments in the cluster anyway and we are in a good place.
Chances are next reset will not have a lot of new unused segments to kill off anyway.
Also note that resetting the queue every hour is still much more frequent than the current Coordinator duty
setup, since even though that duty would typically run every 30 mins or so, it would queue up only a
handful of kill tasks (limited by task slots).
Case 2: Clearing the queue takes longer than an hour
In this case, we already have our hands full and checking every hour is frequent enough.
Note: 1k is only the initial size of the queue, it can have more elements in practice.
I have added a UT that launches a large number of kill tasks to clarify this.
There was a problem hiding this comment.
thanks for the info, makes sense
| if (isEnabled()) { | ||
| this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s"); | ||
| this.killQueue = new PriorityBlockingQueue<>( | ||
| 1000, |
There was a problem hiding this comment.
Is this kept as a non-configurable for operations simplicity?
There was a problem hiding this comment.
Yes.
This is only the initial capacity of the queue, it can scale up to accommodate more entries as necessary.
There was a problem hiding this comment.
:doh: I shoulda known this is just initial capacity
abhishekrb19
left a comment
There was a problem hiding this comment.
Thanks for this feature, @kfaraz! +1 to the idea as it's a clear step up from the coordinator-based kill duty for the reasons you mentioned. I’ve left a few comments on the implementation.
Do you think it makes sense to eventually deprecate the coordinator-based duty in favor of the Overlord one? On a similar note, are there other duties that the Overlord can start taking ownership of when the incremental segment cache is enabled?
| this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache, SegmentMetadataCache.UsageMode.NEVER); | ||
| this.killUnused = Configs.valueOrDefault(killUnused, new UnusedSegmentKillerConfig(null, null)); | ||
| if (this.killUnused.isEnabled() && this.useIncrementalCache == SegmentMetadataCache.UsageMode.NEVER) { | ||
| throw InvalidInput.exception( |
There was a problem hiding this comment.
- Please include the runtime properties in the error message to make corrective action easier; something like
Please set "druid.manager.segments.useIncrementalCache=true" when .... - Should the target persona in this case be an operator rather than an end user?
| catch (IOException e) { | ||
| throw DruidException.defensive(e, "Error while reading unused segments"); | ||
| } |
There was a problem hiding this comment.
Hmm, I don't think an IOException should be a developer-facing defensive exception. I think throwing a RuntimeException or an equivalent DruidException would work better.
| * If this returns null, segments are never killed by the {@code UnusedSegmentKiller} | ||
| * but they might still be killed by the Coordinator. |
There was a problem hiding this comment.
Can this actually be null since the default in the ctr is set to 90 days?
| this.pollDuration = Configs.valueOrDefault(pollDuration, Period.minutes(1)); | ||
| this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache, SegmentMetadataCache.UsageMode.NEVER); | ||
| this.killUnused = Configs.valueOrDefault(killUnused, new UnusedSegmentKillerConfig(null, null)); | ||
| if (this.killUnused.isEnabled() && this.useIncrementalCache == SegmentMetadataCache.UsageMode.NEVER) { |
There was a problem hiding this comment.
I was wondering about how this overlord embedded kill tasks feature would interact with the coordinator kill duty. @kfaraz I see your comment on this:
When embedded kill tasks are running on the Overlord, it is recommended to NOT launch kill tasks
manually or from the coordinator duty. The current implementation does not do any validation around
this. But we can perhaps give a warning message when submitting a normal kill task.
I think having a validation makes sense and we can:
- Fail fast if both the kill features on the Overlord and Coordinator are enabled
- Log a warning in the kill task as you mention
For the validation, is it possible to bind the coordinator kill duty config so that the Overlord knows about it (or vice-versa)?
There was a problem hiding this comment.
For the validation, is it possible to bind the coordinator kill duty config so that the Overlord knows about it (or vice-versa)?
If coordinator and overlord are different processes who may or may not be on the same server and may or may not use the same config files, wouldn't this be tough to reliably do?
There was a problem hiding this comment.
Yes, config-based validation would not be possible as the two can be separate processes.
The only validation we can do is not accept kill tasks submitted by the coordinator to the Overlord
(they would have the prefix coordinator-issued-kill). It might feel a bit hacky but it's the cleanest
option right now (short of exposing an API to read the config, which is overkill).
Also, as @capistrant points out, there is no inherent harm in running the two kill modes together
since each kill task (both embedded and normal) acquires an EXCLUSIVE lock on the interval.
Embedded kill tasks don't take up a task slot, so that should not be a concern either.
Also, users are always allowed to submit kill tasks manually. So several kill tasks for the same datasource
can be running concurrently anyway.
@capistrant , @abhishekrb19 , please let me know if we should add validation to not accept kill tasks submitted by the Coordinator if embedded kill is enabled.
There was a problem hiding this comment.
I don't love the idea of rejecting tasks based on prefix. I think that since it is not a risk to the correctness/health of the underlying cluster, we should leave it be with good documentation that we suggest updating coordinator configs if using the new embedded kill.
There was a problem hiding this comment.
Fair enough, I will add some docs to this PR.
There was a problem hiding this comment.
Yeah, documenting it seems enough in that case.
| this.killConfig = config.getKillUnused(); | ||
|
|
||
| if (isEnabled()) { | ||
| this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s"); |
There was a problem hiding this comment.
Given that this is a new feature, I think having an info log here is useful when this duty is enabled on the Overlord
There was a problem hiding this comment.
Added, thanks for the suggestion!
|
|
||
| public static final String RUN_DURATION = "task/run/time"; | ||
|
|
||
| public static final String NUKED_SEGMENTS = "segment/killed/metadataStore/count"; |
There was a problem hiding this comment.
To avoid ambiguity and for consistency, consider renaming this to SEGMENTS_DELETED_FROM_METADATA_STORE or similar
| LOG.warn( | ||
| "Skipping kill of segments[%s] as its load spec is also used by segment IDs[%s].", | ||
| parentIdToUnusedSegments.get(parent), children | ||
| ); | ||
| parentIdToUnusedSegments.remove(parent); |
There was a problem hiding this comment.
If this is expected during normal operations when concurrent append/replace is enabled, should this be logged at the info level instead of warn? We could also consider emitting a "skipped" metric with the reason as a dimension, if you think this is useful to track.
There was a problem hiding this comment.
Is it okay to postpone the skip metric for now?
I intend to touch this part of the code anyway in a follow up PR for log improvements in the DataSegmentKiller interface (details in the PR description above).
| { | ||
| boolean isPresent = usedSegmentLoadSpecs.contains(segment.getLoadSpec()); | ||
| if (isPresent) { | ||
| LOG.warn("Skipping kill of segment[%s] as its load spec is also used by other segments.", segment); |
There was a problem hiding this comment.
Same comment re warn vs info
| @Rule | ||
| public TaskActionTestKit taskActionTestKit = new TaskActionTestKit(); | ||
|
|
||
| private static final List<DataSegment> WIKI_SEGMENTS_1X10D = |
There was a problem hiding this comment.
The 1X10D in the variable got me thinking this was a roman numeral of sorts :) I see we use this convention in a few other places too
There was a problem hiding this comment.
Yeah, it is meant to indicate 1 partition x 10 days.
Any other nomenclature seemed too verbose, so I went ahead with this.
| * {@link OverlordDuty} to delete unused segments from metadata store and the | ||
| * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments | ||
| * of a single datasource-interval. | ||
| * |
There was a problem hiding this comment.
I think it would be good to cross-link the coordinator-based KillUnusedSegments duty in this class and vice-versa.
capistrant
left a comment
There was a problem hiding this comment.
Looking good to me. I left just a couple more small comments of my own and think @abhishekrb19 left a review with good comments that I won't individually plus 1 on, but in general, support
| } | ||
|
|
||
| @Test | ||
| public void test_run_killsSegmentUpdatedInFuture_ifBufferPeriodIsNegative() |
There was a problem hiding this comment.
Is there a legitimate use case for a negative buffer period? glad you added a test for it, but it feels weird
There was a problem hiding this comment.
Yeah, I don't recall why I had added it myself 😅, segment update times cannot be in the future and it doesn't make sense to have a negative buffer period anyway. Removing it.
| ) | ||
| { | ||
| this.enabled = Configs.valueOrDefault(enabled, false); | ||
| this.bufferPeriod = Configs.valueOrDefault(bufferPeriod, Period.days(90)); |
There was a problem hiding this comment.
Any reason for picking 90 days as default? I see that the coordinator config for the same is 30 days per https://druid.apache.org/docs/latest/configuration/ ... As long as it is documented, I don't have any preference between the two but was curious.
There was a problem hiding this comment.
Seems like a typo, fixing it.
What are the worst case side effects if a user has both coordinator and overlord kill running? or is submitting kills to coordinator while overlord kill is enabled? If feels like it will be hard to ensure druid users aren't doing this. We can heavily document the fact that it shouldn't be done. But short of having some centralized gate for who can do kills (metastore, zk or something), I don't see how we can prevent it as long as the ability to enable killing for both services exists at the same time. |
|
@capistrant , @abhishekrb19 , thanks for the thorough review! |
|
@kfaraz There is a follow up item listed in the description to limit logging to deletion failures/skips only:
There should be some message that is logged at INFO level for each segment that is deleted. Deleting a segment is a destructive operation that in some situations is not undoable. Operators need some logs when that happens, for situations where they wonder what happened to their segment files. Ideally, there is exactly one message logged at INFO level for each segment (not zero, not more than one). Ideally that one message has both the segment ID and the storage location (S3/GCS/Azure/etc). |
|
That's a fair point, @gianm . In that case, the current logging done the respective With embedded kill tasks, the only drawback would be that these segment delete messages would flood the Overlord logs. Another option could be to direct the task logs to a different log file, same as regular task logs Let me know what are your thoughts. |
| final Stopwatch resetDuration = Stopwatch.createStarted(); | ||
| try { | ||
| killQueue.clear(); | ||
| if (!leaderSelector.isLeader()) { |
There was a problem hiding this comment.
did you mean to add an early return in this new conditional block to prevent queue re-build?
There was a problem hiding this comment.
Ah, thanks for catching it!
| this.pollDuration = Configs.valueOrDefault(pollDuration, Period.minutes(1)); | ||
| this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache, SegmentMetadataCache.UsageMode.NEVER); | ||
| this.killUnused = Configs.valueOrDefault(killUnused, new UnusedSegmentKillerConfig(null, null)); | ||
| if (this.killUnused.isEnabled() && this.useIncrementalCache == SegmentMetadataCache.UsageMode.NEVER) { |
There was a problem hiding this comment.
I don't love the idea of rejecting tasks based on prefix. I think that since it is not a risk to the correctness/health of the underlying cluster, we should leave it be with good documentation that we suggest updating coordinator configs if using the new embedded kill.
| ).build() | ||
| ); | ||
|
|
||
| final Set<Interval> sampleIntervals = intervals.stream().limit(5).collect(Collectors.toSet()); |
There was a problem hiding this comment.
Will this change to using a sample of 5 have to potentially change back to the verbose interval set based on what we decide about Gian's note about all deletes being auditable in logs?
There was a problem hiding this comment.
The segment IDs would already be logged by the tasks, but I guess we can fix this up too.
Especially since the number of intervals is typically always low.
Coordinator and Overlord always launch kill tasks for a single interval.
Tasks submitted manually are likely to have few intervals too.
|
@capistrant , I have addressed the latest comments. Is it okay to keep the docs changes for a follow up PR? |
| this.pollDuration = Configs.valueOrDefault(pollDuration, Period.minutes(1)); | ||
| this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache, SegmentMetadataCache.UsageMode.NEVER); | ||
| this.killUnused = Configs.valueOrDefault(killUnused, new UnusedSegmentKillerConfig(null, null)); | ||
| if (this.killUnused.isEnabled() && this.useIncrementalCache == SegmentMetadataCache.UsageMode.NEVER) { |
There was a problem hiding this comment.
Yeah, documenting it seems enough in that case.
| final Set<String> dataSources = storageCoordinator.retrieveAllDatasourceNames(); | ||
|
|
||
| final Map<String, Integer> dataSourceToIntervalCounts = new HashMap<>(); | ||
| for (String dataSource : dataSources) { | ||
| storageCoordinator.retrieveUnusedSegmentIntervals(dataSource, MAX_INTERVALS_TO_KILL_IN_DATASOURCE).forEach( | ||
| interval -> { | ||
| dataSourceToIntervalCounts.merge(dataSource, 1, Integer::sum); | ||
| killQueue.offer(new KillCandidate(dataSource, interval)); | ||
| } |
There was a problem hiding this comment.
For fairness, the coordinator duty uses a CircularList to round robin through the kill datasources. But for the overlord duty, since there’s no constraint on task slots, it would potentially process all of them in a deterministic order. If there are many datasources with large numbers of unused segments intervals hitting the ~10K threshold, the ones later in the list could end up consistently deprioritized?
We can revisit this logic if needed, just trying to understand the queuing characteristics of this current logic.
There was a problem hiding this comment.
The kill queue prioritizes older intervals. So, once the intervals of a datasource become old enough, they will become top priority.
There was a problem hiding this comment.
Ah, I see, thanks for the clarification.
I'm not really worried about there being too many logs. I think of this log message as part of the segment lifecycle: first a segment is allocated (if in append mode), then published (always), then marked unused (if no longer needed), then deleted (if killed). So these kill logs shouldn't be "flooding", in terms of volume, any more than logs related to allocation and publish are flooding. I think we already have at least one log message for each of those actions. |
|
Thanks for the clarification, @gianm ! |
|
Thanks for the reviews, @abhishekrb19 , @capistrant ! I have merged the PR since the IT failures are unrelated and are already being fixed in another PR #18067 . |
|
@kfaraz with a release window coming up for Druid, lets make sure we get this documented soon so folks know how to use it in Druid 34, should they choose |
|
Absolutely, @capistrant , thanks for the reminder! |
|
@capistrant , I have created #18124 for docs changes. |
Docs changes for #18028 - Document metrics and configs for embedded kill tasks - Remove duplicate configs for Coordinator auto-kill from `data-management/delete.md` - Fix up references
Docs changes for apache#18028 - Document metrics and configs for embedded kill tasks - Remove duplicate configs for Coordinator auto-kill from `data-management/delete.md` - Fix up references
Description
Kill tasks currently suffer from several drawbacks.
spawning the peon and inter-process communication.
maxKillTaskSlotRatio,maxKillTaskSlots,kill.bufferPeriod,kill.durationToRetain,kill.period.This patch adds an embedded mode of running kill tasks on the Overlord itself.
Solution: Embedded kill tasks
These embedded tasks
Items for follow up PR
Design
Most of the heavy lifting of kill tasks is already done by the Overlord via task actions.
Moving kill of segments to the Overlord helps avoid unnecessary launching of tasks,
thus keeping more task slots available and also reduces inter-process communication.
The Overlord can also leverage the segment metadata cache for several of its operations
thus improving the performance further.
The only responsibility added to the Overlord would be deleting segment files from the deep store.
Implementation
This helps ensure that locks are not held over any interval for too long.
druid.manager.segments.killUnused.enableddruid.manager.segments.killUnused.bufferPeriodChanges
Main classes to review
UnusedSegmentKiller:OverlordDutythat launches embedded kill tasksUnusedSegmentKillerConfigwith the following fields:enabled: Turns on segment killer on the OverlordbufferPeriod: Period for which segments are retained even after being marked as unused.EmbeddedKillTask: extendsKillUnusedSegmentsTaskto modify some behaviourKillTaskToolbox: simplified version ofTaskToolboxto run embedded kill tasksOther changes
KillUnusedSegmentsTaskto modify some behaviour for embedded tasksIndexerMetadataStorageCoordinatorand
SqlSegmentsMetadataQueryTaskLockbox.removeand make it idempotentFollow up changes not included in this PR
Currently, a
killtask logs an info message for every file removed from the deep storage.For example, on S3:
druid/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
Lines 151 to 155 in 2b1f1fc
While this is okay for a normal
killtask, it becomes too verbose when running embedded kill tasks on the Overlord.For embedded kill tasks, we should log only warnings or errors for paths that could not be deleted or were skipped for some reason.
This would require the following changes:
DataSegmentKillerinterface to return details of deleted or skipped pathsDataSegmentKiller.killinKillUnusedSegmentsTaskRelease note
Add an embedded mode for running kill tasks for unused segments on the Overlord itself.
Advantages of embedded kill tasks
killtasksNew metrics
segment/killed/metadataStore/countdataSource,taskId,taskType,groupId,tagssegment/killed/deepStorage/countdataSource,taskId,taskType,groupId,tagssegment/kill/queueReset/timedruid.manager.segments.killUnused.enabledis true.segment/kill/queueProcess/timedruid.manager.segments.killUnused.enabledis true.segment/kill/jobsProcessed/countdruid.manager.segments.killUnused.enabledis true.segment/kill/skippedIntervals/countdruid.manager.segments.killUnused.enabledis true.dataSource,taskIdThis PR has: