-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Fix Overlord leader election when task lock re-acquisition fails #13172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9d309c4
ba876b7
704d92d
6665d21
4c0a6c0
e9b4c51
c853856
d17114a
212a786
87bff8a
7f6918b
63af27e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -108,17 +108,19 @@ public TaskLockbox( | |
|
|
||
| /** | ||
| * Wipe out our current in-memory state and resync it from our bundled {@link TaskStorage}. | ||
| * | ||
| * @return SyncResult which needs to be processed by the caller | ||
| */ | ||
| public void syncFromStorage() | ||
| public TaskLockboxSyncResult syncFromStorage() | ||
| { | ||
| giant.lock(); | ||
|
|
||
| try { | ||
| // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks. | ||
| final Set<String> storedActiveTasks = new HashSet<>(); | ||
| final Set<Task> storedActiveTasks = new HashSet<>(); | ||
| final List<Pair<Task, TaskLock>> storedLocks = new ArrayList<>(); | ||
| for (final Task task : taskStorage.getActiveTasks()) { | ||
| storedActiveTasks.add(task.getId()); | ||
| storedActiveTasks.add(task); | ||
| for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) { | ||
| storedLocks.add(Pair.of(task, taskLock)); | ||
| } | ||
|
|
@@ -138,7 +140,12 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right) | |
| }; | ||
| running.clear(); | ||
| activeTasks.clear(); | ||
| activeTasks.addAll(storedActiveTasks); | ||
| activeTasks.addAll(storedActiveTasks.stream() | ||
| .map(Task::getId) | ||
| .collect(Collectors.toSet()) | ||
| ); | ||
| // Set of task groups in which at least one task failed to re-acquire a lock | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the comments! |
||
| final Set<String> failedToReacquireLockTaskGroups = new HashSet<>(); | ||
| // Bookkeeping for a log message at the end | ||
| int taskLockCount = 0; | ||
| for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) { | ||
|
|
@@ -183,20 +190,39 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right) | |
| ); | ||
| } | ||
| } else { | ||
| throw new ISE( | ||
| "Could not reacquire lock on interval[%s] version[%s] for task: %s", | ||
| failedToReacquireLockTaskGroups.add(task.getGroupId()); | ||
| log.error( | ||
| "Could not reacquire lock on interval[%s] version[%s] for task: %s from group %s.", | ||
| savedTaskLockWithPriority.getInterval(), | ||
| savedTaskLockWithPriority.getVersion(), | ||
| task.getId() | ||
| task.getId(), | ||
| task.getGroupId() | ||
| ); | ||
| continue; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: probably not needed as we are already at the end of the loop. |
||
| } | ||
| } | ||
|
|
||
| Set<Task> tasksToFail = new HashSet<>(); | ||
| for (Task task : storedActiveTasks) { | ||
| if (failedToReacquireLockTaskGroups.contains(task.getGroupId())) { | ||
| tasksToFail.add(task); | ||
| activeTasks.remove(task.getId()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Style: You could choose to remove all of them in one go, thus retaining the sense of atomic update to |
||
| } | ||
| } | ||
|
|
||
| log.info( | ||
| "Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).", | ||
| taskLockCount, | ||
| activeTasks.size(), | ||
| storedLocks.size() - taskLockCount | ||
| ); | ||
|
|
||
| if (!failedToReacquireLockTaskGroups.isEmpty()) { | ||
| log.warn("Marking all tasks from task groups[%s] to be failed " | ||
| + "as they failed to reacquire at least one lock.", failedToReacquireLockTaskGroups); | ||
| } | ||
|
|
||
| return new TaskLockboxSyncResult(tasksToFail); | ||
| } | ||
| finally { | ||
| giant.unlock(); | ||
|
|
@@ -207,7 +233,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right) | |
| * This method is called only in {@link #syncFromStorage()} and verifies the given task and the taskLock have the same | ||
| * groupId, dataSource, and priority. | ||
| */ | ||
| private TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) | ||
| @VisibleForTesting | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: is there a way to avoid this and still be able to test it? (without too much hassle) |
||
| protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock) | ||
| { | ||
| giant.lock(); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.druid.indexing.overlord; | ||
|
|
||
| import org.apache.druid.indexing.common.task.Task; | ||
|
|
||
| import java.util.Set; | ||
|
|
||
| /** | ||
| * Result of TaskLockbox#syncFromStorage() | ||
| * Contains tasks which need to be forcefully failed to let the overlord become the leader | ||
| */ | ||
| class TaskLockboxSyncResult | ||
| { | ||
| private final Set<Task> tasksToFail; | ||
|
|
||
| TaskLockboxSyncResult(Set<Task> tasksToFail) | ||
| { | ||
| this.tasksToFail = tasksToFail; | ||
| } | ||
|
|
||
| /** | ||
| * Return set of tasks which need to be forcefully failed due to lock re-acquisition failure | ||
| */ | ||
| Set<Task> getTasksToFail() | ||
| { | ||
| return tasksToFail; | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.