diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 50d5d8b4fc7a..d3196f350671 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -42,6 +42,7 @@ import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -56,6 +57,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -82,14 +84,21 @@ */ public class TaskQueue { - private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60); - private final long MIN_WAIT_TIME_MS = 100; + private static final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60); + private static final long MIN_WAIT_TIME_MS = 100; + // Task ID -> Task, for tasks that are active in some way (submitted, running, or finished and to-be-cleaned-up). @GuardedBy("giant") - private final List tasks = new ArrayList<>(); + private final LinkedHashMap tasks = new LinkedHashMap<>(); + + // Task ID -> Future from the TaskRunner @GuardedBy("giant") private final Map> taskFutures = new HashMap<>(); + // Tasks that are in the process of being cleaned up by notifyStatus. Prevents manageInternal from re-launching them. + @GuardedBy("giant") + private final Set recentlyCompletedTasks = new HashSet<>(); + private final TaskLockConfig lockConfig; private final TaskQueueConfig config; private final DefaultTaskConfig defaultTaskConfig; @@ -349,11 +358,19 @@ private void manageInternalCritical( { // Task futures available from the taskRunner for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) { - runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); + if (!recentlyCompletedTasks.contains(workItem.getTaskId())) { + // Don't do anything with tasks that have recently finished; notifyStatus will handle it. + runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); + } } // Attain futures for all active tasks (assuming they are ready to run). // Copy tasks list, as notifyStatus may modify it. - for (final Task task : ImmutableList.copyOf(tasks)) { + for (final Task task : ImmutableList.copyOf(tasks.values())) { + if (recentlyCompletedTasks.contains(task.getId())) { + // Don't do anything with tasks that have recently finished; notifyStatus will handle it. + continue; + } + knownTaskIds.add(task.getId()); if (!taskFutures.containsKey(task.getId())) { @@ -478,20 +495,32 @@ public boolean add(final Task task) throws EntryExistsException } } - // Should always be called after taking giantLock @GuardedBy("giant") private void addTaskInternal(final Task task) { - tasks.add(task); - taskLockbox.add(task); + final Task existingTask = tasks.putIfAbsent(task.getId(), task); + + if (existingTask == null) { + taskLockbox.add(task); + } else if (!existingTask.equals(task)) { + throw new ISE("Cannot add task ID [%s] with same ID as task that has already been added", task.getId()); + } } - // Should always be called after taking giantLock + /** + * Removes a task from {@link #tasks} and {@link #taskLockbox}, if it exists. Returns whether the task was + * removed or not. + */ @GuardedBy("giant") - private void removeTaskInternal(final Task task) + private boolean removeTaskInternal(final String taskId) { - taskLockbox.remove(task); - tasks.remove(task); + final Task task = tasks.remove(taskId); + if (task != null) { + taskLockbox.remove(task); + return true; + } else { + return false; + } } /** @@ -506,12 +535,9 @@ public void shutdown(final String taskId, String reasonFormat, Object... args) giant.lock(); try { - Preconditions.checkNotNull(taskId, "taskId"); - for (final Task task : tasks) { - if (task.getId().equals(taskId)) { - notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args); - break; - } + final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId")); + if (task != null) { + notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args); } } finally { @@ -531,12 +557,9 @@ public void shutdownWithSuccess(final String taskId, String reasonFormat, Object giant.lock(); try { - Preconditions.checkNotNull(taskId, "taskId"); - for (final Task task : tasks) { - if (task.getId().equals(taskId)) { - notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args); - break; - } + final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId")); + if (task != null) { + notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args); } } finally { @@ -568,62 +591,65 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r taskStatus.getId() ); - // Inform taskRunner that this task can be shut down - TaskLocation taskLocation = TaskLocation.unknown(); - try { - taskLocation = taskRunner.getTaskLocation(task.getId()); - taskRunner.shutdown(task.getId(), reasonFormat, args); + if (!taskStatus.isComplete()) { + // Nothing to do for incomplete statuses. + return; } - catch (Exception e) { - log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId()); - } - - int removed = 0; - - ///////// critical section + // Critical section: add this task to recentlyCompletedTasks, so it isn't managed while being cleaned up. giant.lock(); try { - // Remove from running tasks - for (int i = tasks.size() - 1; i >= 0; i--) { - if (tasks.get(i).getId().equals(task.getId())) { - removed++; - removeTaskInternal(tasks.get(i)); - break; - } - } - - // Remove from futures list - taskFutures.remove(task.getId()); + recentlyCompletedTasks.add(task.getId()); } finally { giant.unlock(); } - ///////// end critical + final TaskLocation taskLocation = taskRunner.getTaskLocation(task.getId()); - if (removed == 0) { - log.warn("Unknown task completed: %s", task.getId()); + // Save status to metadata store first, so if we crash while doing the rest of the shutdown, our successor + // remembers that this task has completed. + try { + final Optional previousStatus = taskStorage.getStatus(task.getId()); + if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) { + log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); + } else { + taskStorage.setStatus(taskStatus.withLocation(taskLocation)); + } + } + catch (Throwable e) { + // If persist fails, even after the retries performed in taskStorage, then metadata store and actual cluster + // state have diverged. Send out an alert and continue with the task shutdown routine. + log.makeAlert(e, "Failed to persist status for task") + .addData("task", task.getId()) + .addData("statusCode", taskStatus.getStatusCode()) + .emit(); } - if (removed > 0) { - // If we thought this task should be running, save status to DB - try { - final Optional previousStatus = taskStorage.getStatus(task.getId()); - if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) { - log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); - } else { - taskStorage.setStatus(taskStatus.withLocation(taskLocation)); - log.info("Task done: %s", task); - requestManagement(); - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to persist status for task") - .addData("task", task.getId()) - .addData("statusCode", taskStatus.getStatusCode()) - .emit(); + // Inform taskRunner that this task can be shut down. + try { + taskRunner.shutdown(task.getId(), reasonFormat, args); + } + catch (Throwable e) { + // If task runner shutdown fails, continue with the task shutdown routine. We'll come back and try to + // shut it down again later in manageInternalPostCritical, once it's removed from the "tasks" map. + log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId()); + } + + // Critical section: remove this task from all of our tracking data structures. + giant.lock(); + try { + if (removeTaskInternal(task.getId())) { + taskFutures.remove(task.getId()); + } else { + log.warn("Unknown task completed: %s", task.getId()); } + + recentlyCompletedTasks.remove(task.getId()); + requestManagement(); + } + finally { + giant.unlock(); } } @@ -684,7 +710,7 @@ private void handleStatus(final TaskStatus status) log.info( "Task %s: %s (%d run duration)", status.getStatusCode(), - task, + task.getId(), status.getDuration() ); @@ -719,7 +745,7 @@ private void syncFromStorage() if (active) { final Map newTasks = toTaskIDMap(taskStorage.getActiveTasks()); final int tasksSynced = newTasks.size(); - final Map oldTasks = toTaskIDMap(tasks); + final Map oldTasks = new HashMap<>(tasks); // Calculate differences on IDs instead of Task Objects. Set commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet())); @@ -732,7 +758,7 @@ private void syncFromStorage() // Clean up removed Tasks for (Task task : removedTasks) { - removeTaskInternal(task); + removeTaskInternal(task.getId()); } // Add newly Added tasks to the queue @@ -800,7 +826,7 @@ Map getCurrentTaskDatasources() { giant.lock(); try { - return tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); + return tasks.values().stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); } finally { giant.unlock(); @@ -840,7 +866,7 @@ public Map getWaitingTaskCount() giant.lock(); try { - return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())) + return tasks.values().stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())) .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); } finally { @@ -853,7 +879,7 @@ List getTasks() { giant.lock(); try { - return new ArrayList(tasks); + return new ArrayList<>(tasks.values()); } finally { giant.unlock();