Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
Expand All @@ -56,6 +57,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -82,14 +84,21 @@
*/
public class TaskQueue
{
private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60);
private final long MIN_WAIT_TIME_MS = 100;
private static final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60);
private static final long MIN_WAIT_TIME_MS = 100;

// Task ID -> Task, for tasks that are active in some way (submitted, running, or finished and to-be-cleaned-up).
@GuardedBy("giant")
private final List<Task> tasks = new ArrayList<>();
private final LinkedHashMap<String, Task> tasks = new LinkedHashMap<>();

// Task ID -> Future from the TaskRunner
@GuardedBy("giant")
private final Map<String, ListenableFuture<TaskStatus>> taskFutures = new HashMap<>();

// Tasks that are in the process of being cleaned up by notifyStatus. Prevents manageInternal from re-launching them.
@GuardedBy("giant")
private final Set<String> recentlyCompletedTasks = new HashSet<>();

private final TaskLockConfig lockConfig;
private final TaskQueueConfig config;
private final DefaultTaskConfig defaultTaskConfig;
Expand Down Expand Up @@ -349,11 +358,19 @@ private void manageInternalCritical(
{
// Task futures available from the taskRunner
for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
if (!recentlyCompletedTasks.contains(workItem.getTaskId())) {
// Don't do anything with tasks that have recently finished; notifyStatus will handle it.
runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
}
}
// Attain futures for all active tasks (assuming they are ready to run).
// Copy tasks list, as notifyStatus may modify it.
for (final Task task : ImmutableList.copyOf(tasks)) {
for (final Task task : ImmutableList.copyOf(tasks.values())) {
if (recentlyCompletedTasks.contains(task.getId())) {
// Don't do anything with tasks that have recently finished; notifyStatus will handle it.
continue;
}

knownTaskIds.add(task.getId());

if (!taskFutures.containsKey(task.getId())) {
Expand Down Expand Up @@ -478,20 +495,32 @@ public boolean add(final Task task) throws EntryExistsException
}
}

// Should always be called after taking giantLock
@GuardedBy("giant")
private void addTaskInternal(final Task task)
{
tasks.add(task);
taskLockbox.add(task);
final Task existingTask = tasks.putIfAbsent(task.getId(), task);

if (existingTask == null) {
taskLockbox.add(task);
} else if (!existingTask.equals(task)) {
throw new ISE("Cannot add task ID [%s] with same ID as task that has already been added", task.getId());
}
}

// Should always be called after taking giantLock
/**
* Removes a task from {@link #tasks} and {@link #taskLockbox}, if it exists. Returns whether the task was
* removed or not.
*/
@GuardedBy("giant")
private void removeTaskInternal(final Task task)
private boolean removeTaskInternal(final String taskId)
{
taskLockbox.remove(task);
tasks.remove(task);
final Task task = tasks.remove(taskId);
if (task != null) {
taskLockbox.remove(task);
return true;
} else {
return false;
}
}

/**
Expand All @@ -506,12 +535,9 @@ public void shutdown(final String taskId, String reasonFormat, Object... args)
giant.lock();

try {
Preconditions.checkNotNull(taskId, "taskId");
for (final Task task : tasks) {
if (task.getId().equals(taskId)) {
notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args);
break;
}
final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId"));
if (task != null) {
notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args);
}
}
finally {
Expand All @@ -531,12 +557,9 @@ public void shutdownWithSuccess(final String taskId, String reasonFormat, Object
giant.lock();

try {
Preconditions.checkNotNull(taskId, "taskId");
for (final Task task : tasks) {
if (task.getId().equals(taskId)) {
notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args);
break;
}
final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId"));
if (task != null) {
notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args);
}
}
finally {
Expand Down Expand Up @@ -568,62 +591,65 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r
taskStatus.getId()
);

// Inform taskRunner that this task can be shut down
TaskLocation taskLocation = TaskLocation.unknown();
try {
taskLocation = taskRunner.getTaskLocation(task.getId());
taskRunner.shutdown(task.getId(), reasonFormat, args);
if (!taskStatus.isComplete()) {
// Nothing to do for incomplete statuses.
return;
}
catch (Exception e) {
log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId());
}

int removed = 0;

///////// critical section

// Critical section: add this task to recentlyCompletedTasks, so it isn't managed while being cleaned up.
giant.lock();
try {
// Remove from running tasks
for (int i = tasks.size() - 1; i >= 0; i--) {
if (tasks.get(i).getId().equals(task.getId())) {
removed++;
removeTaskInternal(tasks.get(i));
break;
}
}

// Remove from futures list
taskFutures.remove(task.getId());
recentlyCompletedTasks.add(task.getId());
}
finally {
giant.unlock();
}

///////// end critical
final TaskLocation taskLocation = taskRunner.getTaskLocation(task.getId());

if (removed == 0) {
log.warn("Unknown task completed: %s", task.getId());
// Save status to metadata store first, so if we crash while doing the rest of the shutdown, our successor
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doubt : does this order change also solve the race condition problem itself? As per my current understanding, re-launching can happen only if syncFromStorage thinks that there is an active task in metadata storage and the in-memory view doesn't know about it. That leads to creation of in-memory task which then management thread launches.

With this change, the syncFromStorage can have three views :

  1. tasks map contains the task and task-storage has it as active task
  2. tasks map contains the task and task-storage doesn't have it as active task in which case both syncFromStorage and notifyStatus will clean it up
  3. tasks map doesn't contain the task and task-storage doesn't have it too

I think all 3 cases should be ok, but I maybe missing something here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order switch solves the original race but creates a new one, which you have listed as case (2). It would create a situation where two threads are trying to clean up the same task at the same time. This may be fine, but I don't think it's prudent to rely on it being fine. Cleaner to ensure that only one thread tries to clean up the task.

// remembers that this task has completed.
try {
final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId());
if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
} else {
taskStorage.setStatus(taskStatus.withLocation(taskLocation));
}
}
catch (Throwable e) {
// If persist fails, even after the retries performed in taskStorage, then metadata store and actual cluster
// state have diverged. Send out an alert and continue with the task shutdown routine.
log.makeAlert(e, "Failed to persist status for task")
.addData("task", task.getId())
.addData("statusCode", taskStatus.getStatusCode())
.emit();
}

if (removed > 0) {
// If we thought this task should be running, save status to DB
try {
final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId());
if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
} else {
taskStorage.setStatus(taskStatus.withLocation(taskLocation));
log.info("Task done: %s", task);
requestManagement();
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to persist status for task")
.addData("task", task.getId())
.addData("statusCode", taskStatus.getStatusCode())
.emit();
// Inform taskRunner that this task can be shut down.
try {
taskRunner.shutdown(task.getId(), reasonFormat, args);
}
catch (Throwable e) {
// If task runner shutdown fails, continue with the task shutdown routine. We'll come back and try to
// shut it down again later in manageInternalPostCritical, once it's removed from the "tasks" map.
log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId());
}

// Critical section: remove this task from all of our tracking data structures.
giant.lock();
try {
if (removeTaskInternal(task.getId())) {
taskFutures.remove(task.getId());
} else {
log.warn("Unknown task completed: %s", task.getId());
}

recentlyCompletedTasks.remove(task.getId());
requestManagement();
}
finally {
giant.unlock();
}
}

Expand Down Expand Up @@ -684,7 +710,7 @@ private void handleStatus(final TaskStatus status)
log.info(
"Task %s: %s (%d run duration)",
status.getStatusCode(),
task,
task.getId(),
status.getDuration()
);

Expand Down Expand Up @@ -719,7 +745,7 @@ private void syncFromStorage()
if (active) {
final Map<String, Task> newTasks = toTaskIDMap(taskStorage.getActiveTasks());
final int tasksSynced = newTasks.size();
final Map<String, Task> oldTasks = toTaskIDMap(tasks);
final Map<String, Task> oldTasks = new HashMap<>(tasks);

// Calculate differences on IDs instead of Task Objects.
Set<String> commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
Expand All @@ -732,7 +758,7 @@ private void syncFromStorage()

// Clean up removed Tasks
for (Task task : removedTasks) {
removeTaskInternal(task);
removeTaskInternal(task.getId());
}

// Add newly Added tasks to the queue
Expand Down Expand Up @@ -800,7 +826,7 @@ Map<String, String> getCurrentTaskDatasources()
{
giant.lock();
try {
return tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
return tasks.values().stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
}
finally {
giant.unlock();
Expand Down Expand Up @@ -840,7 +866,7 @@ public Map<String, Long> getWaitingTaskCount()

giant.lock();
try {
return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId()))
return tasks.values().stream().filter(task -> !runnerKnownTaskIds.contains(task.getId()))
.collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum));
}
finally {
Expand All @@ -853,7 +879,7 @@ List<Task> getTasks()
{
giant.lock();
try {
return new ArrayList<Task>(tasks);
return new ArrayList<>(tasks.values());
}
finally {
giant.unlock();
Expand Down