Temporarily skip compaction for locked intervals#11190
Temporarily skip compaction for locked intervals#11190maytasm merged 16 commits intoapache:masterfrom
Conversation
| (interval, taskLockPosses) -> taskLockPosses.forEach( | ||
| taskLockPosse -> taskLockPosse.taskIds.forEach(taskId -> { | ||
| // Do not proceed if the lock is revoked | ||
| if (taskLockPosse.getTaskLock().isRevoked()) { |
There was a problem hiding this comment.
what is the rationale behind this?
There was a problem hiding this comment.
A lock that is revoked is effectively not locking any interval. So we don't need to consider revoked locks while looking for locked intervals.
Revoked locks are kept in the TaskLockbox to notify that those locks are revoked to the callers when they acquire the same locks again.
| ); | ||
|
|
||
| final CompactionSegmentIterator iterator = | ||
| policy.reset(compactionConfigs, dataSources, compactionTaskIntervals); |
There was a problem hiding this comment.
maybe compactionTaskIntervals needs to be called something else now since it can also include intervals for which there is no lock.
There was a problem hiding this comment.
Yeah, should we just call it compactionSkipIntervals?
| * `/druid/indexer/v1/lockedIntervals` | ||
|
|
||
| Retrieve the list of Intervals locked by currently running ingestion/compaction tasks. The response contains a Map from | ||
| Task IDs to the list of Intervals locked by the respective Tasks. |
There was a problem hiding this comment.
why do we need task ids? can we just return a map of datasource --> intervals?
There was a problem hiding this comment.
In CompactSegments.run(), some currently running compaction tasks can get cancelled if their spec is out of date. We use the taskId to identify the intervals for these tasks getting cancelled so that we don't consider those intervals as locked.
This is the only case where taskId is useful. Otherwise, datasource would be sufficient.
There was a problem hiding this comment.
There was a problem hiding this comment.
Do we need to return interval locked by compaction tasks?
If the existing compaction task are out of date, we already have code to cancelled and submit for that interval (hence the lock those to-be-cancel compaction task does not matter). If they are not canceled, then there is already code to skip the interval of running compaction task.
In the above case, we can just return a datasource --> intervals of locked by non compaction tasks
There was a problem hiding this comment.
Good point, @maytasm . I will make the changes.
maytasm
left a comment
There was a problem hiding this comment.
- I think what we can do here is use the taskPriority set for the auto compaction of each dataSource. The lockedIntervals API can then only returns lockedIntervals of task with priority greater than the taskPriority set for the auto compaction. For example, if the task that is holding the lock has priority lower than compaction task to be scheduled, we should schedule it as it will revoke the lock of the current task (and not skip it). The priority of compaction task scheduled by auto compaction can also be set by the user (taskPriority field) and revoke locks of batch/stream ingestion task.
- Can you add integration tests too? Maybe a simple one for the lockedIntervals API and a simple one to check that auto compaction skipped locked intervals.
| * `/druid/indexer/v1/lockedIntervals` | ||
|
|
||
| Retrieve the list of Intervals locked by currently running ingestion/compaction tasks. The response contains a Map from | ||
| Task IDs to the list of Intervals locked by the respective Tasks. |
There was a problem hiding this comment.
Do we need to return interval locked by compaction tasks?
If the existing compaction task are out of date, we already have code to cancelled and submit for that interval (hence the lock those to-be-cancel compaction task does not matter). If they are not canceled, then there is already code to skip the interval of running compaction task.
In the above case, we can just return a datasource --> intervals of locked by non compaction tasks
| final Map<String, String> taskToDatasource = new HashMap<>(); | ||
|
|
||
| // Take a lock and populate the maps | ||
| giant.lock(); |
There was a problem hiding this comment.
why do we need the lock here?
| public Response getTaskLockedIntervals(@Context HttpServletRequest request) | ||
| { | ||
| // Perform authorization check | ||
| final ResourceAction resourceAction = new ResourceAction( |
There was a problem hiding this comment.
Any particular this API needs a finer grained access control rather than using the StateResourceFilter ?
The StateResourceFilter is already used for API like /taskStatus which I think is similar in access control to this API.
| final LockedIntervalsResponse response = new LockedIntervalsResponse( | ||
| taskStorageQueryAdapter.getLockedIntervals() | ||
| ); | ||
| log.warn("Found Intervals: %s", response.getLockedIntervals()); |
| } | ||
|
|
||
| // Build the response | ||
| final LockedIntervalsResponse response = new LockedIntervalsResponse( |
There was a problem hiding this comment.
Can the API just return Map<String, DatasourceIntervals> (removing the need for another class)?
There was a problem hiding this comment.
Response.ok(taskStorageQueryAdapter.getLockedIntervals()).build()
| indexingServiceClient.cancelTask(status.getId()); | ||
|
|
||
| // Remove this from the locked intervals | ||
| taskToLockedIntervals.remove(status.getId()); |
There was a problem hiding this comment.
As mentioned earlier, this is not really needed. This would be the same as taskToLockedIntervals not containing lock for compaction task in the first place.
| * | ||
| * @return Map from Task Id to locked intervals. | ||
| */ | ||
| public Map<String, DatasourceIntervals> getLockedIntervals() |
There was a problem hiding this comment.
getLockedInterval() seems a misnomer because this method returns the intervals of segment locks as well, but they don't lock intervals. I don't have a better suggestion though..
| "Skipping the following intervals for Compaction as they are currently locked: %s", | ||
| taskToLockedIntervals | ||
| ); | ||
| taskToLockedIntervals.forEach( |
There was a problem hiding this comment.
I think it should behave differently depending on what lockGranularity is used. If both the compaction task to run and the task that is already running use the segment lock, the compaction task can safely run. Otherwise, the entire locked interval should be skipped as what this code does.
There was a problem hiding this comment.
For simplicity, we are treating Segment Locks the same as Time Chunk Locks i.e. the whole interval would be skipped while submitting compaction tasks even if there is just one Segment in that interval that is locked by a higher priority task.
Added this as a javadoc comment here:
|
|
||
| Retrieve a [task completion report](../ingestion/tasks.md#task-reports) for a task. Only works for completed tasks. | ||
|
|
||
| * `/druid/indexer/v1/lockedIntervals` |
There was a problem hiding this comment.
Is this supposed to be called by users? What is the use case?
There was a problem hiding this comment.
Actually, instead of exposing this API, maybe a better thing we need is how to communicate to the user that compaction skips an intervals because of some task holding lock. Like what intervals was skipped for which datasource and what was the task that holds the lock.
There was a problem hiding this comment.
For now, we are just logging the intervals that were skipped due to locks.
The part about notifying the user about skipped intervals will most likely be done through Task Reports in a follow up PR.
There was a problem hiding this comment.
Is this supposed to be called by users? What is the use case?
Removed changes to api-reference.md as this API is for internal use (between Coordinator and Overlord) only.
|
Fixed lockedIntervals API to use taskPriority |
| @@ -62,9 +62,9 @@ public List<Task> getActiveTasks() | |||
| * | |||
| * @return Map from Task Id to locked intervals. | |||
There was a problem hiding this comment.
Is this Map from datasource to locked intervals?
| * priority are returned. Tasks for datasources that | ||
| * are not present in this Map are not returned. | ||
| * @return Map from Datasource to List of Intervals locked by Tasks that have | ||
| * priority strictly greater than the {@code minTaskPriority} for that datasource. |
There was a problem hiding this comment.
should this be > or >= the {@code minTaskPriority}?
Do we want to submit auto compaction task if there is a task with equal task priority already running?
| @@ -1149,29 +1149,24 @@ public void testGetLockedIntervals() | |||
| ); | |||
|
|
|||
| // Verify the locked intervals | |||
There was a problem hiding this comment.
Can you add test where the existing taskLock priority is lower than the minTaskPriority argument (and hence the lock should not be returned)?
| indexingServiceClient.getLockedIntervals()); | ||
|
|
||
| // Skip all the intervals locked by higher priority tasks for each datasource | ||
| getLockedIntervalsToSkip(compactionConfigList).forEach( |
There was a problem hiding this comment.
Is this needed? Doesn't getLockedIntervalsToSkip already return dataSource -> list of intervals
|
Thank you for the PR, @kfaraz. will there be a way to disable this code-path in a running druid cluster (either dynamically or by restarting services) |
…ion_skip_locked_intervals
…ion_skip_locked_intervals
|
@kfaraz : Thank a lot for the PR - I have a couple of doubts regarding the new config added :
|
@rohangarg , the locked intervals would be skipped only for that run, they would be retried in the next run
Normally, the user would not have to specify any value for this config. The default value of true would suffice. The config has been provided to be able to disable the feature of skipping locked intervals in case of a bug. |
|
Ok but given that the feature/change-set is not too big, I'm not sure whether adding a config would help. Is it possible to know some scenarios or unknowns where this could fail and might be hard to cover them in tests? Apologies but one more question I had was that if this flag is turned on, will it always ensure that there is no locking conflict between compaction task and other tasks? or is it still possible that compaction task might get stuck due to unavailable lock? my initial guess is that there is still a chance that compaction might get stuck, so confirming? :) |
| } else if (taskLockPosse.getTaskLock().getPriority() == null | ||
| || taskLockPosse.getTaskLock().getPriority() <= minTaskPriority.get(datasource)) { | ||
| // Do not proceed if the lock has a priority less than or equal to the minimum | ||
| || taskLockPosse.getTaskLock().getPriority() < minTaskPriority.get(datasource)) { |
There was a problem hiding this comment.
I had missed changing it the first time.
The behaviour after this change is that intervals of tasks with equal priority will be considered locked. That is the required behaviour, right?
Compaction Tasks, both auto and manual, try to acquire locks on the Datasource Intervals which they need to compact. If a higher priority ingestion task is in progress for an overlapping interval, the compaction task waits until it can acquire a lock. This can lead to the following potential issues:
skipOffsetFromLatest, compaction can get stuck for long periods of time.Resolution
Rather than waiting on already locked intervals, we can proceed to the compaction of other intervals (these would be older intervals, if the compaction policy is Newest First). When the locked intervals are freed up, subsequent compaction runs can submit compaction tasks for them.
In every compaction run (invocation of
CompactSegments.run())/lockedIntervalsin-memory state of the
TaskLockboxis locked by a higher priority, the whole interval is skipped while submitting compaction tasks
Code Changes:
/druid/indexer/v1/lockedIntervalsCompactSegments.run()to skip locked intervalsdruid.coordinator.compaction.skipLockedIntervalswith default value as trueThis PR has: