Skip to content

Remove giant lock from Overlord TaskQueue#14293

Closed
kfaraz wants to merge 10 commits intoapache:masterfrom
kfaraz:cleanup_task_queue
Closed

Remove giant lock from Overlord TaskQueue#14293
kfaraz wants to merge 10 commits intoapache:masterfrom
kfaraz:cleanup_task_queue

Conversation

@kfaraz
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz commented May 16, 2023

Description

The TaskQueue in the Overlord implements concurrency control using a giant lock. A similar technique has been used in other classes such as TaskMaster and TaskLockbox. While this giant lock does guarantee thread-safe access of critical sections of code, it can be too restrictive at times and can even lead to the Overlord being completely stuck.

A typical scenario is described below.

  • Insertion of a sub-task of an index_parallel task fails with a SQLTransientException (say, due to an oversized payload)
  • The index_parallel repeatedly requests the Overlord to insert this sub-task
  • Each time, the Overlord tries to insert this task up to 10 times (using RetryUtils)
  • While the Overlord is trying to insert, the calling thread holds the TaskQueue.giant lock
  • This causes the Overlord to essentially hang as no other TaskQueue operation can proceed without the lock. This includes operations like adding a new task, killing a task, submitting tasks to runner for execution, syncing from metadata, etc.

Note: The indefinite retry issue in the above scenario is also being addressed separately in #14271

Current implementation

The giant lock is a reentrant lock, which is effectively the same as the object monitor associated with any java object. In principle, this lock could be replaced by simply making all the methods of TaskQueue synchronized.

There are several fields which can be protected with more lenient locking to improve performance.

Proposed implementation

The operations/fields currently protected by the giant lock are discussed below.

(a) Methods: start(), stop(), syncFromStorage(), manage()

Change: Make methods synchronized
Rationale: This effectively remains the same as the current implementation

(b) Field LinkedHashMap<String, Task> tasks: putIfAbsent(), get(), values(), remove()

Change: Use a ConcurrentHashMap instead in conjunction with a BlockingDeque<String>
Rationale:

  • The only concurrency control needed here is at a task level, which can be easily ensured by a ConcurrentHashMap.
  • The BlockingDeque is used to maintain the order in which task IDs were submitted to the TaskQueue.
  • The updates to these data structures are made atomically using compute() and computeIfAbsent().

(c) Field HashMap<String, Future> taskFutures: put(), remove()

Change: Replace with a Sets.newConcurrentHashSet() instead
Rationale: This field is just used to track task ids already submitted to the task runner.

(d) Field taskStorage: insert()

Change: Move outside critical section
Rationale: TaskStorage implementations do not maintain any state and thus don't require any concurrency control

(e) Field taskLockbox: add(), remove()

Change: Move outside critical section
Rationale: TaskLockbox has its own giant lock and can thus be safely accessed here.

Other changes

Pending items (WIP)

Unit tests for

  • cleanup of dangling locks left over by tasks that couldn't acquire their locks on leader re-election
  • proper operation under different race conditions in TaskQueue

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Long::sum
));
return taskRunner.getRunningTasks().stream().collect(
Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'task' is never used.
Long::sum
));
return taskRunner.getPendingTasks().stream().collect(
Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'task' is never used.
Long::sum
));
return taskRunner.getRunningTasks().stream().collect(
Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'task' is never used.
Long::sum
));
return taskRunner.getPendingTasks().stream().collect(
Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'task' is never used.
managementMayBeNecessary.offer(this);
synchronized (managementRequested) {
managementRequested.set(true);
managementRequested.notify();

Check warning

Code scanning / CodeQL

notify instead of notifyAll

Using notify rather than notifyAll.
Copy link
Copy Markdown
Contributor

@jasonk000 jasonk000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfaraz , good to see -- performance of this code has been key to stability of the cluster for us.

I think there's some prior work here that's worth reviewing too to make sure we don't reintroduce these bugs, I've tagged at appropriate points.

return delta;
}
Map<String, Long> total = new HashMap<>(datasourceToSuccessfulTaskCount);
datasourceToSuccessfulTaskCount.clear();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and following fns have a small race if the CHM was changed between the call to clear. There could be some alternatives, like iterating entries and using replaceAll, or remove() or merge() in a loop, that would allow a more atomic unload of the CHM? Or, maybe make the value in the map an AtomicLong, and CAS modify it to zero during unload?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I didn't like this either. Updated to iterate over keys and atomically remove them one by one.

}
requestManagement();
// Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired)
// This is called after requesting management as locks need to be cleared after notifyStatus is processed
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observation - This is an interesting comment, given that there's no guarantee the management has run by now. I wonder if it's stale, or is there an issue here? ... , since it has same behaviour as previous code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my thoughts exactly. Calling requestManagement() does not guarantee that management has actually happened. I am not a 100% on what was going on here, so left it as is. I will double check this code and maybe add some tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the code. This comment is outdated/irrelevant now.

Firstly, calling requestManagement() just adds a request to the queue and does not ensure that task management has actually taken place. In fact, on start-up, it typically wouldn't take place until after 1 minute (default value of start delay).

Secondly, even if for some other reason, requestManagement() needs to be called before clearing the dangling locks, it would have already been called in the for loop preceding this (shutdown calls notifyStatus calls requestManagement).

I am removing this comment and moving the contents of the next for loop into the first one. I am also adding some tests to ensure cleanup of such dangling locks.

Comment thread indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java Outdated
catch (Exception e) {
log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
}
final Set<String> knownTaskIds = tasks.keySet();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little wary of reintroducing this bug: #12901, in the case that tasks was changed. Can you check over it? It seems like if tasks has changed we might have an issue. But - you've covered most of them with synchronized. Can you check in add() and removeTasksInternal, I think these operate on tasks without being synchronized, so it might lead to some race?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nb: earlier PR #12099 is maybe relevant here too for background/history.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling this out, @jasonk000 !

The issue described in #12901 is basically a race condition between notifyStatus and manageInternalCritical (renamed in this patch to runReadyTasks), where a task that is being shutdown might get relaunched. In #12901, this was solved by using this order of events:

  • Mark the task as recentlyCompleted
  • Update metadata store
  • Finish cleanup of in-memory data structures
    All recentlyCompleted tasks were ignored in manageInternalCritical.

In the new set of changes, I have retained this complete behaviour. As soon as a task is marked as recentlyCompleted, it will not be touched by runReadyTasks or killUnknownTasks. The in-memory data-structures (including recentlyCompleted itself) are finally cleaned up atomically only when the task shutdown is finished.

I will try to add some tests for this scenario. Please let me know if you think of any other race condition, would be nice to have tests for all of those.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried to ensure that all the changes made in the patch #12901 are retained.

This patch does the following:

  1. Fixes the race by adding a recentlyCompletedTasks set that prevents
    the main management loop from doing anything with tasks that are
    currently being cleaned up.

The set of recentlyCompletedTaskIds is still being used for this purpose.

  1. Switches the order of the critical sections in notifyStatus, so
    metadata store updates happen first. This is useful in case of
    server failures: it ensures that if the Overlord fails in the midst
    of notifyStatus, then completed-task statuses are still available in
    ZK or on MMs for the next Overlord. (Those are cleaned up by
    taskRunner.shutdown, which formerly ran first.) This isn't related
    to the race described above, but is fixed opportunistically as part
    of the same patch.

Order of calls is still the same i.e. metadata store update happens first

  1. Changes the tasks list to a map. Many operations require retrieval
    or removal of individual tasks; those are now O(1) instead of O(N)
    in the number of running tasks.
  • The tasks map is now a ConcurrentHashMap which still performs O(1) get and remove.
  • But now we also have an activeTaskIdQueue on which we do an O(n) remove. But this is fine because it does not block other operations of the TaskQueue. We need a queue(-like) structure here that must be thread-safe, so I ended up using a LinkedBlockingDeque. I am not sure of an alternative that would offer better time complexity.
  1. Changes various log messages to use task ID instead of full task
    payload, to make the logs more readable.

Retained as is.

}
tasks.computeIfAbsent(
task.getId(),
taskId -> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the implementation here relies on an implicit lock being taken by CHM. Might be worth noting given that it is implementations-specific with CHM, in case implementation is switched in the future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is intentional. I will add a comment calling out why it must be a ConcurrentHashMap.

// Critical section: remove this task from all of our tracking data structures.
giant.lock();
try {
if (removeTaskInternal(task.getId())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: earlier note about possibly re-introducing #12901.

// Add new tasks and clean up removed tasks
addedTasks.forEach(this::addTaskInternal);
removedTasks.forEach(this::removeTaskInternal);
log.info(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This existed before, but it might be wise to remove as much as possible, including logging, from the synchronized block, and tighten it down a bit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the footprint of this call would really make much of a difference compared to the earlier operations, fetching from DB and map manipulation.

But yeah, you never really know with logging. I suppose I could just return the counts from this method and log the values in the caller itself.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use synchronized (this) { .... } ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yes - it does raise a good qn about the remainder of the workflow.

@kfaraz
Copy link
Copy Markdown
Contributor Author

kfaraz commented May 17, 2023

Thanks for the feedback, @jasonk000 ! I have responded to your comments and plan to make changes/add more tests wherever necessary.

// do not care if the item fits into the queue:
// if the queue is already full, request has been triggered anyway
managementMayBeNecessary.offer(this);
managementRequestQueue.offer(new Object());

Check notice

Code scanning / CodeQL

Ignored error status of call

Method requestManagement ignores exceptional return value of BlockingQueue<Object>.offer.
// do not care if the item fits into the queue:
// if the queue is already full, request has been triggered anyway
managementMayBeNecessary.offer(this);
managementRequestQueue.offer(this);

Check notice

Code scanning / CodeQL

Ignored error status of call

Method requestManagement ignores exceptional return value of BlockingQueue<Object>.offer.
// do not care if the item fits into the queue:
// if the queue is already full, request has been triggered anyway
managementMayBeNecessary.offer(this);
managementRequestQueue.offer(reason);

Check notice

Code scanning / CodeQL

Ignored error status of call

Method requestManagement ignores exceptional return value of BlockingQueue<String>.offer.
@AmatyaAvadhanula AmatyaAvadhanula self-requested a review May 31, 2023 14:02
@kfaraz kfaraz added the WIP label Jul 17, 2023
@github-actions
Copy link
Copy Markdown

This pull request has been marked as stale due to 60 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If you think
that's incorrect or this pull request should instead be reviewed, please simply
write any comment. Even if closed, you can still revive the PR at any time or
discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

@github-actions github-actions Bot added the stale label Feb 12, 2024
@github-actions
Copy link
Copy Markdown

This pull request/issue has been closed due to lack of activity. If you think that
is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions Bot closed this Mar 12, 2024
@kfaraz kfaraz deleted the cleanup_task_queue branch May 2, 2025 07:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants