From ac59440909b41cb9a219f1fd09caef79c7dff955 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 16 May 2023 20:43:29 +0530 Subject: [PATCH 1/9] Remove giant lock from TaskQueue --- .../druid/indexing/overlord/TaskMaster.java | 4 +- .../druid/indexing/overlord/TaskQueue.java | 788 +++++++----------- .../indexing/overlord/TaskLifecycleTest.java | 8 +- .../indexing/overlord/TaskQueueTest.java | 381 ++++----- .../apache/druid/common/guava/DSuppliers.java | 9 +- 5 files changed, 488 insertions(+), 702 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 1eab403585fd..84c37956f9f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -312,7 +312,7 @@ public Map getSuccessfulTaskCount() { Optional taskQueue = getTaskQueue(); if (taskQueue.isPresent()) { - return taskQueue.get().getSuccessfulTaskCount(); + return taskQueue.get().getAndResetSuccessfulTaskCounts(); } else { return null; } @@ -323,7 +323,7 @@ public Map getFailedTaskCount() { Optional taskQueue = getTaskQueue(); if (taskQueue.isPresent()) { - return taskQueue.get().getFailedTaskCount(); + return taskQueue.get().getAndResetFailedTaskCounts(); } else { return null; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index c508876f0ce4..7732b9b2d266 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -22,18 +22,16 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.errorprone.annotations.concurrent.GuardedBy; -import org.apache.druid.annotations.SuppressFBWarnings; +import net.thisptr.jackson.jq.internal.misc.Lists; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.Counters; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.task.IndexTaskUtils; @@ -44,7 +42,6 @@ import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -53,26 +50,23 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.EntryExistsException; -import org.apache.druid.utils.CollectionUtils; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -86,20 +80,31 @@ */ public class TaskQueue { - private static final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60); - private static final long MIN_WAIT_TIME_MS = 100; + private static final long MANAGEMENT_WAIT_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60); + private static final long MIN_WAIT_TIME_MILLIS = 100; - // Task ID -> Task, for tasks that are active in some way (submitted, running, or finished and to-be-cleaned-up). - @GuardedBy("giant") - private final LinkedHashMap tasks = new LinkedHashMap<>(); + private static final EmittingLogger log = new EmittingLogger(TaskQueue.class); - // Task ID -> Future from the TaskRunner - @GuardedBy("giant") - private final Map> taskFutures = new HashMap<>(); + /** + * Map from Task ID to active task (submitted, running, recently finished). + */ + private final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); - // Tasks that are in the process of being cleaned up by notifyStatus. Prevents manageInternal from re-launching them. - @GuardedBy("giant") - private final Set recentlyCompletedTasks = new HashSet<>(); + /** + * Queue of all active Task IDs. + */ + private final BlockingDeque activeTaskIdQueue = new LinkedBlockingDeque<>(); + + /** + * Tasks that have already been submitted to the TaskRunner. + */ + private final Set submittedTaskIds = Sets.newConcurrentHashSet(); + + /** + * Tasks that have recently completed and are being cleaned up. These tasks + * should not be relaunched by task management. + */ + private final Set recentlyCompletedTaskIds = Sets.newConcurrentHashSet(); private final TaskLockConfig lockConfig; private final TaskQueueConfig config; @@ -110,9 +115,8 @@ public class TaskQueue private final TaskLockbox taskLockbox; private final ServiceEmitter emitter; - private final ReentrantLock giant = new ReentrantLock(true); - @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") - private final BlockingQueue managementMayBeNecessary = new ArrayBlockingQueue<>(8); + private final AtomicBoolean managementRequested = new AtomicBoolean(false); + private final ExecutorService managerExec = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setDaemon(false) @@ -126,14 +130,8 @@ public class TaskQueue private volatile boolean active = false; - private static final EmittingLogger log = new EmittingLogger(TaskQueue.class); - - private final ConcurrentHashMap totalSuccessfulTaskCount = new ConcurrentHashMap<>(); - private final ConcurrentHashMap totalFailedTaskCount = new ConcurrentHashMap<>(); - @GuardedBy("totalSuccessfulTaskCount") - private Map prevTotalSuccessfulTaskCount = new HashMap<>(); - @GuardedBy("totalFailedTaskCount") - private Map prevTotalFailedTaskCount = new HashMap<>(); + private final ConcurrentHashMap datasourceToSuccessfulTaskCount = new ConcurrentHashMap<>(); + private final ConcurrentHashMap datasourceToFailedTaskCount = new ConcurrentHashMap<>(); public TaskQueue( TaskLockConfig lockConfig, @@ -156,314 +154,248 @@ public TaskQueue( this.emitter = Preconditions.checkNotNull(emitter, "emitter"); } - @VisibleForTesting - void setActive(boolean active) - { - this.active = active; - } - /** * Starts this task queue. Allows {@link #add(Task)} to accept new tasks. */ @LifecycleStart - public void start() + public synchronized void start() { - giant.lock(); - - try { - Preconditions.checkState(!active, "queue must be stopped"); - active = true; - syncFromStorage(); - // Mark these tasks as failed as they could not reacuire the lock - // Clean up needs to happen after tasks have been synced from storage - Set tasksToFail = taskLockbox.syncFromStorage().getTasksToFail(); - for (Task task : tasksToFail) { - shutdown(task.getId(), - "Shutting down forcefully as task failed to reacquire lock while becoming leader"); + Preconditions.checkState(!active, "queue must be stopped"); + + // Mark queue as active only after first sync is complete + syncFromStorage(); + active = true; + + // Mark these tasks as failed as they could not reacquire locks + Set tasksToFail = taskLockbox.syncFromStorage().getTasksToFail(); + for (Task task : tasksToFail) { + shutdown( + task.getId(), + "Shutting down forcefully as task failed to reacquire lock while becoming leader" + ); + } + requestManagement(); + // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired) + // This is called after requesting management as locks need to be cleared after notifyStatus is processed + for (Task task : tasksToFail) { + for (TaskLock lock : taskStorage.getLocks(task.getId())) { + taskStorage.removeLock(task.getId(), lock); } - managerExec.submit( - new Runnable() - { - @Override - public void run() - { - while (true) { - try { - manage(); - break; - } - catch (InterruptedException e) { - log.info("Interrupted, exiting!"); - break; - } - catch (Exception e) { - final long restartDelay = config.getRestartDelay().getMillis(); - log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit(); - try { - Thread.sleep(restartDelay); - } - catch (InterruptedException e2) { - log.info("Interrupted, exiting!"); - break; - } - } - } + } + log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size()); + + // Submit task management job + managerExec.submit( + () -> { + log.info("Beginning task management in [%s].", config.getStartDelay()); + long startDelayMillis = config.getStartDelay().getMillis(); + while (active) { + try { + Thread.sleep(startDelayMillis); + runTaskManagement(); + } + catch (InterruptedException e) { + log.info("Interrupted, stopping task management."); + break; + } + catch (Exception e) { + startDelayMillis = config.getRestartDelay().getMillis(); + log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit(); } } - ); - ScheduledExecutors.scheduleAtFixedRate( - storageSyncExec, - config.getStorageSyncRate(), - new Callable() - { - @Override - public ScheduledExecutors.Signal call() - { - try { - syncFromStorage(); - } - catch (Exception e) { - if (active) { - log.makeAlert(e, "Failed to sync with storage").emit(); - } - } - if (active) { - return ScheduledExecutors.Signal.REPEAT; - } else { - return ScheduledExecutors.Signal.STOP; - } + } + ); + + // Schedule storage sync job + ScheduledExecutors.scheduleAtFixedRate( + storageSyncExec, + config.getStorageSyncRate(), + () -> { + if (!active) { + log.info("Stopping storage sync as TaskQueue has been stopped"); + return ScheduledExecutors.Signal.STOP; + } + + try { + syncFromStorage(); + } + catch (Exception e) { + if (active) { + log.makeAlert(e, "Failed to sync with storage").emit(); } } - ); - requestManagement(); - // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired) - // This is called after requesting management as locks need to be cleared after notifyStatus is processed - for (Task task : tasksToFail) { - for (TaskLock lock : taskStorage.getLocks(task.getId())) { - taskStorage.removeLock(task.getId(), lock); + + return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP; } - } - } - finally { - giant.unlock(); - } + ); } /** * Shuts down the queue. */ @LifecycleStop - public void stop() + public synchronized void stop() { - giant.lock(); - - try { - tasks.clear(); - taskFutures.clear(); - active = false; - managerExec.shutdownNow(); - storageSyncExec.shutdownNow(); - requestManagement(); - } - finally { - giant.unlock(); - } - } - - public boolean isActive() - { - return active; + active = false; + tasks.clear(); + submittedTaskIds.clear(); + recentlyCompletedTaskIds.clear(); + managerExec.shutdownNow(); + storageSyncExec.shutdownNow(); + requestManagement(); } /** * Request management from the management thread. Non-blocking. - * - * Other callers (such as notifyStatus) should trigger activity on the - * TaskQueue thread by requesting management here. + *

+ * Callers (such as notifyStatus) can trigger task management by calling + * this method. */ - void requestManagement() + private void requestManagement() { - // use a BlockingQueue since the offer/poll/wait behaviour is simple - // and very easy to reason about - - // the request has to be offer (non blocking), since someone might request - // while already holding giant lock - - // do not care if the item fits into the queue: - // if the queue is already full, request has been triggered anyway - managementMayBeNecessary.offer(this); + synchronized (managementRequested) { + managementRequested.set(true); + managementRequested.notify(); + } } /** - * Await for an event to manage. - * - * This should only be called from the management thread to wait for activity. + * Waits for a management request to be triggered by another thread. * - * @param nanos - * @throws InterruptedException + * @throws InterruptedException if the thread is interrupted while waiting. */ - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value") - void awaitManagementNanos(long nanos) throws InterruptedException + private void awaitManagementRequest() throws InterruptedException { // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops try { - Thread.sleep(MIN_WAIT_TIME_MS); + Thread.sleep(MIN_WAIT_TIME_MILLIS); } catch (InterruptedException e) { throw new RuntimeException(e); } - // wait for an item, if an item arrives (or is already available), complete immediately - // (does not actually matter what the item is) - managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS); - - // there may have been multiple requests, clear them all - managementMayBeNecessary.clear(); + // Wait for management to be requested + synchronized (managementRequested) { + while (!managementRequested.get()) { + managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS); + } + managementRequested.compareAndSet(true, false); + } } /** * Main task runner management loop. Meant to run forever, or, at least until we're stopped. */ - private void manage() throws InterruptedException + private void runTaskManagement() throws InterruptedException { - log.info("Beginning management in %s.", config.getStartDelay()); - Thread.sleep(config.getStartDelay().getMillis()); - // Ignore return value- we'll get the IDs and futures from getKnownTasks later. taskRunner.restore(); while (active) { - manageInternal(); - - // awaitNanos because management may become necessary without this condition signalling, - // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox. - awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS); + manageTasks(); + awaitManagementRequest(); } } @VisibleForTesting - void manageInternal() + void manageTasks() { - Set knownTaskIds = new HashSet<>(); - Map> runnerTaskFutures = new HashMap<>(); - - giant.lock(); - - try { - manageInternalCritical(knownTaskIds, runnerTaskFutures); - } - finally { - giant.unlock(); - } - - manageInternalPostCritical(knownTaskIds, runnerTaskFutures); + runReadyTasks(); + killUnknownTasks(); } /** - * Management loop critical section tasks. - * - * @param knownTaskIds will be modified - filled with known task IDs - * @param runnerTaskFutures will be modified - filled with futures related to getting the running tasks + * Submits ready tasks to the TaskRunner. + *

+ * This method should be called only by the management thread. */ - @GuardedBy("giant") - private void manageInternalCritical( - final Set knownTaskIds, - final Map> runnerTaskFutures - ) + private synchronized void runReadyTasks() { // Task futures available from the taskRunner + final Map> runnerTaskFutures = new HashMap<>(); for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) { - if (!recentlyCompletedTasks.contains(workItem.getTaskId())) { - // Don't do anything with tasks that have recently finished; notifyStatus will handle it. + if (!recentlyCompletedTaskIds.contains(workItem.getTaskId())) { + // Don't do anything with recently completed tasks; notifyStatus will handle it. runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); } } + // Attain futures for all active tasks (assuming they are ready to run). - // Copy tasks list, as notifyStatus may modify it. - for (final Task task : ImmutableList.copyOf(tasks.values())) { - if (recentlyCompletedTasks.contains(task.getId())) { - // Don't do anything with tasks that have recently finished; notifyStatus will handle it. - continue; + for (final String taskId : Lists.newArrayList(activeTaskIdQueue)) { + final Task task = tasks.get(taskId); + if (task == null || recentlyCompletedTaskIds.contains(taskId)) { + // Don't do anything for unknown tasks or recently completed tasks + } else if (submittedTaskIds.contains(taskId)) { + // Re-trigger execution of pending task to avoid unnecessary delays + // see https://github.com/apache/druid/pull/6991 + if (isTaskPending(task)) { + taskRunner.run(task); + } + } else if (runnerTaskFutures.containsKey(taskId)) { + attachCallbacks(task, runnerTaskFutures.get(taskId)); + submittedTaskIds.add(taskId); + } else if (isTaskReady(task)) { + log.info("Asking taskRunner to run ready task [%s].", taskId); + attachCallbacks(task, taskRunner.run(task)); + submittedTaskIds.add(taskId); + } else { + // Release all locks (possibly acquired by task.isReady()) if task is not ready + taskLockbox.unlockAll(task); } + } + } - knownTaskIds.add(task.getId()); - - if (!taskFutures.containsKey(task.getId())) { - final ListenableFuture runnerTaskFuture; - if (runnerTaskFutures.containsKey(task.getId())) { - runnerTaskFuture = runnerTaskFutures.get(task.getId()); - } else { - // Task should be running, so run it. - final boolean taskIsReady; - try { - taskIsReady = task.isReady(taskActionClientFactory.create(task)); - } - catch (Exception e) { - log.warn(e, "Exception thrown during isReady for task: %s", task.getId()); - final String errorMessage; - if (e instanceof MaxAllowedLocksExceededException) { - errorMessage = e.getMessage(); - } else { - errorMessage = "Failed while waiting for the task to be ready to run. " - + "See overlord logs for more details."; - } - notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage); - continue; - } - if (taskIsReady) { - log.info("Asking taskRunner to run: %s", task.getId()); - runnerTaskFuture = taskRunner.run(task); - } else { - // Task.isReady() can internally lock intervals or segments. - // We should release them if the task is not ready. - taskLockbox.unlockAll(task); - continue; - } - } - taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture)); - } else if (isTaskPending(task)) { - // if the taskFutures contain this task and this task is pending, also let the taskRunner - // to run it to guarantee it will be assigned to run - // see https://github.com/apache/druid/pull/6991 - taskRunner.run(task); + private boolean isTaskReady(Task task) + { + try { + return task.isReady(taskActionClientFactory.create(task)); + } + catch (Exception e) { + log.warn(e, "Error while checking if task [%s] is ready to run.", task.getId()); + final String errorMessage; + if (e instanceof MaxAllowedLocksExceededException) { + errorMessage = e.getMessage(); + } else { + errorMessage = "Failed while waiting for the task to be ready to run. " + + "See overlord logs for more details."; } + notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage); + return false; } } - @VisibleForTesting - private void manageInternalPostCritical( - final Set knownTaskIds, - final Map> runnerTaskFutures - ) + /** + * Kills tasks not present in the set of known tasks. + */ + private void killUnknownTasks() { - // Kill tasks that shouldn't be running - final Set tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds); - if (!tasksToKill.isEmpty()) { - log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size()); - - // On large installations running several thousands of tasks, - // concatenating the list of known task ids can be compupationally expensive. - final boolean logKnownTaskIds = log.isDebugEnabled(); - final String reason = logKnownTaskIds - ? StringUtils.format("Task is not in knownTaskIds[%s]", knownTaskIds) - : "Task is not in knownTaskIds"; - - for (final String taskId : tasksToKill) { - try { - taskRunner.shutdown(taskId, reason); - } - catch (Exception e) { - log.warn(e, "TaskRunner failed to clean up task: %s", taskId); - } + final Set knownTaskIds = tasks.keySet(); + final Set runnerTaskIds = new HashSet<>(submittedTaskIds); + taskRunner.getKnownTasks().forEach(item -> runnerTaskIds.add(item.getTaskId())); + + final Set tasksToKill = Sets.difference(runnerTaskIds, knownTaskIds); + tasksToKill.removeAll(recentlyCompletedTaskIds); + if (tasksToKill.isEmpty()) { + return; + } + + log.info("Asking TaskRunner to clean up [%,d] unknown tasks.", tasksToKill.size()); + log.debug("Killing all tasks not present in known task ids [%s].", knownTaskIds); + + for (final String taskId : tasksToKill) { + try { + taskRunner.shutdown(taskId, "Killing unknown task"); + } + catch (Exception e) { + log.warn(e, "TaskRunner failed to shutdown unknown task [%s]", taskId); } } } private boolean isTaskPending(Task task) { - return taskRunner.getPendingTasks() - .stream() + return taskRunner.getPendingTasks().stream() .anyMatch(workItem -> workItem.getTaskId().equals(task.getId())); } @@ -478,6 +410,13 @@ private boolean isTaskPending(Task task) */ public boolean add(final Task task) throws EntryExistsException { + Preconditions.checkState(active, "Queue is not active!"); + Preconditions.checkNotNull(task, "task"); + Preconditions.checkState( + tasks.size() < config.getMaxSize(), + "Task queue is already full, max size is [%,d].", config.getMaxSize() + ); + // Before adding the task, validate the ID, so it can be safely used in file paths, znodes, etc. IdUtils.validateId("Task ID", task.getId()); @@ -488,58 +427,56 @@ public boolean add(final Task task) throws EntryExistsException // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec. task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock()); defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent); - // Every task shuold use the lineage-based segment allocation protocol unless it is explicitly set to + // Every task should use the lineage-based segment allocation protocol unless it is explicitly set to // using the legacy protocol. task.addToContextIfAbsent( SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY, SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION ); - giant.lock(); - - try { - Preconditions.checkState(active, "Queue is not active!"); - Preconditions.checkNotNull(task, "task"); - Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize()); - - // If this throws with any sort of exception, including TaskExistsException, we don't want to - // insert the task into our queue. So don't catch it. - taskStorage.insert(task, TaskStatus.running(task.getId())); - addTaskInternal(task); - requestManagement(); - return true; - } - finally { - giant.unlock(); - } + // Do not add the task to queue if insert into metadata fails for any reason + taskStorage.insert(task, TaskStatus.running(task.getId())); + addTaskInternal(task); + requestManagement(); + return true; } - @GuardedBy("giant") + /** + * Atomically adds this task to the TaskQueue. + */ private void addTaskInternal(final Task task) { - final Task existingTask = tasks.putIfAbsent(task.getId(), task); - - if (existingTask == null) { - taskLockbox.add(task); - } else if (!existingTask.equals(task)) { - throw new ISE("Cannot add task ID [%s] with same ID as task that has already been added", task.getId()); - } + tasks.computeIfAbsent( + task.getId(), + taskId -> { + activeTaskIdQueue.add(taskId); + taskLockbox.add(task); + return task; + } + ); } /** - * Removes a task from {@link #tasks} and {@link #taskLockbox}, if it exists. Returns whether the task was - * removed or not. + * Atomically removes this task from the TaskQueue. */ - @GuardedBy("giant") - private boolean removeTaskInternal(final String taskId) + private void removeTaskInternal(final Task task) { - final Task task = tasks.remove(taskId); - if (task != null) { - taskLockbox.remove(task); - return true; - } else { - return false; - } + tasks.compute( + task.getId(), + (taskId, existingTask) -> { + if (existingTask != null) { + taskLockbox.remove(existingTask); + activeTaskIdQueue.remove(taskId); + } else { + log.warn("Removing unknown task [%s]", taskId); + } + + submittedTaskIds.remove(taskId); + recentlyCompletedTaskIds.remove(taskId); + + return null; + } + ); } /** @@ -551,16 +488,9 @@ private boolean removeTaskInternal(final String taskId) */ public void shutdown(final String taskId, String reasonFormat, Object... args) { - giant.lock(); - - try { - final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId")); - if (task != null) { - notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args); - } - } - finally { - giant.unlock(); + final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId")); + if (task != null) { + notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args); } } @@ -573,16 +503,9 @@ public void shutdown(final String taskId, String reasonFormat, Object... args) */ public void shutdownWithSuccess(final String taskId, String reasonFormat, Object... args) { - giant.lock(); - - try { - final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId")); - if (task != null) { - notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args); - } - } - finally { - giant.unlock(); + final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId")); + if (task != null) { + notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args); } } @@ -615,14 +538,8 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r return; } - // Critical section: add this task to recentlyCompletedTasks, so it isn't managed while being cleaned up. - giant.lock(); - try { - recentlyCompletedTasks.add(task.getId()); - } - finally { - giant.unlock(); - } + // Add this task to recentlyCompletedTasks, so it isn't managed while being cleaned up. + recentlyCompletedTaskIds.add(task.getId()); final TaskLocation taskLocation = taskRunner.getTaskLocation(task.getId()); @@ -631,7 +548,8 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r try { final Optional previousStatus = taskStorage.getStatus(task.getId()); if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) { - log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); + log.makeAlert("Ignoring notification for already-complete task") + .addData("task", task.getId()).emit(); } else { taskStorage.setStatus(taskStatus.withLocation(taskLocation)); } @@ -652,24 +570,12 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r catch (Throwable e) { // If task runner shutdown fails, continue with the task shutdown routine. We'll come back and try to // shut it down again later in manageInternalPostCritical, once it's removed from the "tasks" map. - log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId()); + log.warn(e, "TaskRunner failed to cleanup task [%s] after completion", task.getId()); } - // Critical section: remove this task from all of our tracking data structures. - giant.lock(); - try { - if (removeTaskInternal(task.getId())) { - taskFutures.remove(task.getId()); - } else { - log.warn("Unknown task completed: %s", task.getId()); - } - - recentlyCompletedTasks.remove(task.getId()); - requestManagement(); - } - finally { - giant.unlock(); - } + // Cleanup internal state + removeTaskInternal(task); + requestManagement(); } /** @@ -677,10 +583,8 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r * appropriate updates. * * @param statusFuture a task status future - * - * @return the same future, for convenience */ - private ListenableFuture attachCallbacks(final Task task, final ListenableFuture statusFuture) + private void attachCallbacks(final Task task, final ListenableFuture statusFuture) { final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); IndexTaskUtils.setTaskDimensions(metricBuilder, task); @@ -692,7 +596,7 @@ private ListenableFuture attachCallbacks(final Task task, final List @Override public void onSuccess(final TaskStatus status) { - log.info("Received %s status for task: %s", status.getStatusCode(), status.getId()); + log.info("Received status [%s] for task [%s]", status.getStatusCode(), status.getId()); handleStatus(status); } @@ -705,7 +609,7 @@ public void onFailure(final Throwable t) .addData("dataSource", task.getDataSource()) .emit(); handleStatus( - TaskStatus.failure(task.getId(), "Failed to run this task. See overlord logs for more details.") + TaskStatus.failure(task.getId(), "Failed to run task. See overlord logs for more details.") ); } @@ -715,7 +619,7 @@ private void handleStatus(final TaskStatus status) // If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set // after we check and before we commit the database transaction, but better than nothing. if (!active) { - log.info("Abandoning task due to shutdown: %s", task.getId()); + log.info("Not handling status of task [%s] as TaskQueue is stopped.", task.getId()); return; } @@ -727,16 +631,15 @@ private void handleStatus(final TaskStatus status) emitter.emit(metricBuilder.build("task/run/time", status.getDuration())); log.info( - "Task %s: %s (%d run duration)", - status.getStatusCode(), - task.getId(), - status.getDuration() + "Completed task [%s] with status [%s] in [%d] ms.", + task.getId(), status.getStatusCode(), status.getDuration() ); + final String datasource = task.getDataSource(); if (status.isSuccess()) { - Counters.incrementAndGetLong(totalSuccessfulTaskCount, task.getDataSource()); + datasourceToSuccessfulTaskCount.compute(datasource, (ds, c) -> c == null ? 1L : c + 1); } else { - Counters.incrementAndGetLong(totalFailedTaskCount, task.getDataSource()); + datasourceToFailedTaskCount.compute(datasource, (ds, c) -> c == null ? 1L : c + 1); } } } @@ -749,131 +652,69 @@ private void handleStatus(final TaskStatus status) } } ); - return statusFuture; } /** - * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state - * corresponds to the storage facility even if the latter is manually modified. + * Resync the contents of this task queue with our storage facility. + * Useful to make sure our in-memory state corresponds to the storage facility + * even if the latter is manually modified. + *

+ * This method must be called only when queue is {@link #active}, except when + * starting up. */ - private void syncFromStorage() + private synchronized void syncFromStorage() { - giant.lock(); - - try { - if (active) { - final Map newTasks = toTaskIDMap(taskStorage.getActiveTasks()); - final int tasksSynced = newTasks.size(); - final Map oldTasks = new HashMap<>(tasks); - - // Calculate differences on IDs instead of Task Objects. - Set commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet())); - for (String taskID : commonIds) { - newTasks.remove(taskID); - oldTasks.remove(taskID); - } - Collection addedTasks = newTasks.values(); - Collection removedTasks = oldTasks.values(); - - // Clean up removed Tasks - for (Task task : removedTasks) { - removeTaskInternal(task.getId()); - } - - // Add newly Added tasks to the queue - for (Task task : addedTasks) { - addTaskInternal(task); - } - - log.info( - "Synced %d tasks from storage (%d tasks added, %d tasks removed).", - tasksSynced, - addedTasks.size(), - removedTasks.size() - ); - requestManagement(); - } else { - log.info("Not active. Skipping storage sync."); - } - } - catch (Exception e) { - log.warn(e, "Failed to sync tasks from storage!"); - throw new RuntimeException(e); - } - finally { - giant.unlock(); - } - } - - private static Map toTaskIDMap(List taskList) - { - Map rv = new HashMap<>(); - for (Task task : taskList) { - rv.put(task.getId(), task); - } - return rv; - } - - private Map getDeltaValues(Map total, Map prev) - { - return total.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L))); - } + final Map newTasks = taskStorage.getActiveTasks().stream().collect( + Collectors.toMap(Task::getId, Function.identity()) + ); + final Map oldTasks = new HashMap<>(tasks); + + // Calculate differences on IDs instead of Task Objects. + Set commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet()); + for (String taskId : commonIds) { + newTasks.remove(taskId); + oldTasks.remove(taskId); + } + Collection addedTasks = newTasks.values(); + Collection removedTasks = oldTasks.values(); + + // Add new tasks and clean up removed tasks + addedTasks.forEach(this::addTaskInternal); + removedTasks.forEach(this::removeTaskInternal); + log.info( + "Synced [%d] tasks from storage. Added [%d] tasks, removed [%d] tasks.", + newTasks.size(), addedTasks.size(), removedTasks.size() + ); - public Map getSuccessfulTaskCount() - { - Map total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get); - synchronized (totalSuccessfulTaskCount) { - Map delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); - prevTotalSuccessfulTaskCount = total; - return delta; - } + requestManagement(); } - public Map getFailedTaskCount() + public Map getAndResetSuccessfulTaskCounts() { - Map total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get); - synchronized (totalFailedTaskCount) { - Map delta = getDeltaValues(total, prevTotalFailedTaskCount); - prevTotalFailedTaskCount = total; - return delta; - } + Map total = Maps.newHashMap(datasourceToSuccessfulTaskCount); + datasourceToSuccessfulTaskCount.clear(); + return total; } - Map getCurrentTaskDatasources() + public Map getAndResetFailedTaskCounts() { - giant.lock(); - try { - return tasks.values().stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); - } - finally { - giant.unlock(); - } + Map total = Maps.newHashMap(datasourceToFailedTaskCount); + datasourceToFailedTaskCount.clear(); + return total; } public Map getRunningTaskCount() { - Map taskDatasources = getCurrentTaskDatasources(); - return taskRunner.getRunningTasks() - .stream() - .collect(Collectors.toMap( - e -> taskDatasources.getOrDefault(e.getTaskId(), ""), - e -> 1L, - Long::sum - )); + return taskRunner.getRunningTasks().stream().collect( + Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum) + ); } public Map getPendingTaskCount() { - Map taskDatasources = getCurrentTaskDatasources(); - return taskRunner.getPendingTasks() - .stream() - .collect(Collectors.toMap( - e -> taskDatasources.getOrDefault(e.getTaskId(), ""), - e -> 1L, - Long::sum - )); + return taskRunner.getPendingTasks().stream().collect( + Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum) + ); } public Map getWaitingTaskCount() @@ -883,25 +724,14 @@ public Map getWaitingTaskCount() .map(TaskRunnerWorkItem::getTaskId) .collect(Collectors.toSet()); - giant.lock(); - try { - return tasks.values().stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())) - .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); - } - finally { - giant.unlock(); - } + return tasks.values().stream() + .filter(task -> !runnerKnownTaskIds.contains(task.getId())) + .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); } @VisibleForTesting List getTasks() { - giant.lock(); - try { - return new ArrayList<>(tasks.values()); - } - finally { - giant.unlock(); - } + return new ArrayList<>(tasks.values()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 6b5927bf1791..026bb9aa18ba 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -733,9 +733,7 @@ private TaskQueue setUpTaskQueue(TaskStorage ts, TaskRunner tr) throws Exception @After public void tearDown() { - if (taskQueue.isActive()) { - taskQueue.stop(); - } + taskQueue.stop(); } @Test @@ -1505,9 +1503,7 @@ private TaskStatus runTask(final Task task) throws Exception // Since multiple tasks can be run in a single unit test using runTask(), hence this check and synchronization synchronized (this) { - if (!taskQueue.isActive()) { - taskQueue.start(); - } + taskQueue.start(); } taskQueue.add(dummyTask); taskQueue.add(task); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 6388cdd573cd..15e0ca759b95 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -20,12 +20,11 @@ package org.apache.druid.indexing.overlord; import com.google.common.base.Optional; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.curator.framework.CuratorFramework; -import org.apache.druid.common.guava.DSuppliers; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.indexer.TaskLocation; @@ -57,20 +56,18 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.segment.TestHelper; -import org.apache.druid.server.initialization.IndexerZkConfig; -import org.apache.druid.server.initialization.ZkPathsConfig; -import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.Interval; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -80,12 +77,99 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; public class TaskQueueTest extends IngestionTestBase { - private static final Granularity SEGMENT_GRANULARITY = Granularities.DAY; + private TaskActionClientFactory actionClientFactory; + private StubServiceEmitter serviceEmitter; + + private TaskQueue taskQueue; + + @Before + public void setup() + { + actionClientFactory = createActionClientFactory(); + serviceEmitter = new StubServiceEmitter("taskQueueTest", "localhost"); + taskQueue = createTaskQueue( + new DefaultTaskConfig(), + new SimpleTaskRunner(actionClientFactory) + ); + taskQueue.start(); + } + + @After + public void tearDown() + { + taskQueue.stop(); + } + + @Test + public void testAddDuplicateTaskThrowsException() throws EntryExistsException + { + final TestTask task = new TestTask("t1", "2021-01/2021-02"); + taskQueue.add(task); + Assert.assertThrows( + EntryExistsException.class, + () -> taskQueue.add(task) + ); + + Assert.assertEquals(1, taskQueue.getTasks().size()); + + taskQueue.manageTasks(); + Assert.assertTrue(task.isDone()); + Assert.assertTrue(taskQueue.getTasks().isEmpty()); + } + + @Test + public void testShutdownIsIdempotent() throws EntryExistsException + { + final TestTask task = new TestTask("t1", "2021-01/2021-02"); + taskQueue.add(task); + Assert.assertEquals(1, taskQueue.getTasks().size()); + + taskQueue.shutdown(task.getId(), "killing"); + Assert.assertTrue(taskQueue.getTasks().isEmpty()); + + taskQueue.shutdown(task.getId(), "killing again"); + Assert.assertTrue(taskQueue.getTasks().isEmpty()); + } + + @Test + public void testAddAfterStopThrowsException() + { + taskQueue.stop(); + + final TestTask task = new TestTask("t1", "2021-01/2021-02"); + Assert.assertThrows(IllegalStateException.class, () -> taskQueue.add(task)); + + Assert.assertTrue(taskQueue.getTasks().isEmpty()); + } + + @Test + public void testShutdownAfterStopThrowsException() throws EntryExistsException + { + final TestTask task = new TestTask("t1", "2021-01/2021-02"); + taskQueue.add(task); + Assert.assertEquals(1, taskQueue.getTasks().size()); + + taskQueue.stop(); + Assert.assertThrows( + IllegalStateException.class, + () -> taskQueue.shutdown(task.getId(), "killing after stop") + ); + Assert.assertTrue(taskQueue.getTasks().isEmpty()); + } + + @Test + public void testConcurrencyWithStorageSync() + { + // A: Metadata says remove, another thread says add + // B: Metadata says add, another thread says remove + + // C: tasks changing while sync is going on + // - doesn't matter, we take a snapshot of tasks and then go from there + } /** * This test verifies releasing all locks of a task when it is not ready to run yet. @@ -99,35 +183,23 @@ public class TaskQueueTest extends IngestionTestBase @Test public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); // task1 emulates a case when there is a task that was issued before task2 and acquired locks conflicting // to task2. - final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M")); + final TestTask task1 = new TestTask("t1", "2021-01/P1M"); // Manually get locks for task1. task2 cannot be ready because of task1. prepareTaskForLocking(task1); Assert.assertTrue(task1.isReady(actionClientFactory.create(task1))); - final TestTask task2 = new TestTask("t2", Intervals.of("2021-01-31/P1M")); + final TestTask task2 = new TestTask("t2", "2021-01-31/P1M"); taskQueue.add(task2); - taskQueue.manageInternal(); + taskQueue.manageTasks(); Assert.assertFalse(task2.isDone()); Assert.assertTrue(getLockbox().findLocksForTask(task2).isEmpty()); // task3 can run because task2 is still blocked by task1. - final TestTask task3 = new TestTask("t3", Intervals.of("2021-02-01/P1M")); + final TestTask task3 = new TestTask("t3", "2021-02-01/P1M"); taskQueue.add(task3); - taskQueue.manageInternal(); + taskQueue.manageTasks(); Assert.assertFalse(task2.isDone()); Assert.assertTrue(task3.isDone()); Assert.assertTrue(getLockbox().findLocksForTask(task2).isEmpty()); @@ -137,28 +209,15 @@ public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception taskQueue.shutdown(task3.getId(), "Emulating shutdown of task3"); // Now task2 should run. - taskQueue.manageInternal(); + taskQueue.manageTasks(); Assert.assertTrue(task2.isDone()); } @Test public void testShutdownReleasesTaskLock() throws Exception { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); - // Create a Task and add it to the TaskQueue - final TestTask task = new TestTask("t1", Intervals.of("2021-01/P1M")); + final TestTask task = new TestTask("t1", "2021-01/P1M"); taskQueue.add(task); // Acquire a lock for the Task @@ -184,19 +243,7 @@ public void testShutdownReleasesTaskLock() throws Exception @Test public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExistsException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); - final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); + final Task task = new TestTask("t1", "2021-01-01/P1D"); taskQueue.add(task); final List tasks = taskQueue.getTasks(); Assert.assertEquals(1, tasks.size()); @@ -210,9 +257,7 @@ public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExist public void testDefaultTaskContextOverrideDefaultLineageBasedSegmentAllocation() throws EntryExistsException { final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), + final TaskQueue taskQueue = createTaskQueue( new DefaultTaskConfig() { @Override @@ -224,38 +269,24 @@ public Map getContext() ); } }, - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() + new SimpleTaskRunner(actionClientFactory) ); - taskQueue.setActive(true); - final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); + taskQueue.start(); + final Task task = new TestTask("t1", "2021-01-01/P1D"); taskQueue.add(task); final List tasks = taskQueue.getTasks(); Assert.assertEquals(1, tasks.size()); final Task queuedTask = tasks.get(0); Assert.assertFalse( - queuedTask.getContextValue(SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY) + queuedTask.getContextValue( + SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY + ) ); } @Test public void testUserProvidedTaskContextOverrideDefaultLineageBasedSegmentAllocation() throws EntryExistsException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); final Task task = new TestTask( "t1", Intervals.of("2021-01-01/P1D"), @@ -274,31 +305,9 @@ public void testUserProvidedTaskContextOverrideDefaultLineageBasedSegmentAllocat } @Test - public void testLockConfigTakePrecedenceThanDefaultTaskContext() throws EntryExistsException + public void testLockConfigOverridesDefaultTaskContext() throws EntryExistsException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig() - { - @Override - public Map getContext() - { - return ImmutableMap.of( - Tasks.FORCE_TIME_CHUNK_LOCK_KEY, - false - ); - } - }, - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); - final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); + final Task task = new TestTask("t1", "2021-01-01/P1D"); taskQueue.add(task); final List tasks = taskQueue.getTasks(); Assert.assertEquals(1, tasks.size()); @@ -309,25 +318,10 @@ public Map getContext() @Test public void testUserProvidedContextOverrideLockConfig() throws EntryExistsException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); final Task task = new TestTask( "t1", Intervals.of("2021-01-01/P1D"), - ImmutableMap.of( - Tasks.FORCE_TIME_CHUNK_LOCK_KEY, - false - ) + ImmutableMap.of(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, false) ); taskQueue.add(task); final List tasks = taskQueue.getTasks(); @@ -339,19 +333,7 @@ public void testUserProvidedContextOverrideLockConfig() throws EntryExistsExcept @Test public void testTaskStatusWhenExceptionIsThrownInIsReady() throws EntryExistsException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); - final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")) + final Task task = new TestTask("t1", "2021-01-01/P1D") { @Override public boolean isReady(TaskActionClient taskActionClient) @@ -360,57 +342,54 @@ public boolean isReady(TaskActionClient taskActionClient) } }; taskQueue.add(task); - taskQueue.manageInternal(); + taskQueue.manageTasks(); Optional statusOptional = getTaskStorage().getStatus(task.getId()); Assert.assertTrue(statusOptional.isPresent()); - Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode()); - Assert.assertNotNull(statusOptional.get().getErrorMsg()); + + TaskStatus taskStatus = statusOptional.get(); + Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); + + String errorMsg = taskStatus.getErrorMsg(); + Assert.assertNotNull(errorMsg); Assert.assertTrue( - StringUtils.format("Actual message is: %s", statusOptional.get().getErrorMsg()), - statusOptional.get().getErrorMsg().startsWith("Failed while waiting for the task to be ready to run") + StringUtils.format("Actual message is: %s", errorMsg), + errorMsg.startsWith("Failed while waiting for the task to be ready to run") ); } @Test public void testKilledTasksEmitRuntimeMetricWithHttpRemote() throws EntryExistsException, InterruptedException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); final HttpRemoteTaskRunner taskRunner = createHttpRemoteTaskRunner(ImmutableList.of("t1")); - final StubServiceEmitter metricsVerifier = new StubServiceEmitter("druid/overlord", "testHost"); WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); - EasyMock.expect(workerHolder.getWorker()).andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY)).anyTimes(); + EasyMock.expect(workerHolder.getWorker()) + .andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY)) + .anyTimes(); workerHolder.incrementContinuouslyFailedTasksCount(); EasyMock.expectLastCall(); workerHolder.setLastCompletedTaskTime(EasyMock.anyObject()); EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(1); EasyMock.replay(workerHolder); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - taskRunner, - actionClientFactory, - getLockbox(), - metricsVerifier - ); - taskQueue.setActive(true); + + final TaskQueue taskQueue = createTaskQueue(new DefaultTaskConfig(), taskRunner); + taskQueue.start(); + final Task task = new TestTask( "t1", Intervals.of("2021-01-01/P1D"), - ImmutableMap.of( - Tasks.FORCE_TIME_CHUNK_LOCK_KEY, - false - ) + ImmutableMap.of(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, false) ); taskQueue.add(task); - taskQueue.manageInternal(); - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.running(task.getId()), - TaskLocation.create("worker", 1, 2) - ), workerHolder); + taskQueue.manageTasks(); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.running(task.getId()), + TaskLocation.create("worker", 1, 2) + ), + workerHolder + ); while (!taskRunner.getRunningTasks() .stream() .map(TaskRunnerWorkItem::getTaskId) @@ -419,75 +398,63 @@ public void testKilledTasksEmitRuntimeMetricWithHttpRemote() throws EntryExistsE Thread.sleep(100); } taskQueue.shutdown(task.getId(), "shutdown"); - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.failure(task.getId(), "shutdown"), - TaskLocation.create("worker", 1, 2) - ), workerHolder); - taskQueue.manageInternal(); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.failure(task.getId(), "shutdown"), + TaskLocation.create("worker", 1, 2) + ), + workerHolder + ); + taskQueue.manageTasks(); - metricsVerifier.getEvents(); - metricsVerifier.verifyEmitted("task/run/time", 1); + serviceEmitter.verifyEmitted("task/run/time", 1); } private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List runningTasks) { - HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) - .andReturn(druidNodeDiscovery); + .andReturn(new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery()); EasyMock.replay(druidNodeDiscoveryProvider); TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class); for (String taskId : runningTasks) { - EasyMock.expect(taskStorageMock.getStatus(taskId)).andReturn(Optional.of(TaskStatus.running(taskId))); + EasyMock.expect(taskStorageMock.getStatus(taskId)) + .andReturn(Optional.of(TaskStatus.running(taskId))); } EasyMock.replay(taskStorageMock); HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( TestHelper.makeJsonMapper(), - new HttpRemoteTaskRunnerConfig() - { - @Override - public int getPendingTasksRunnerNumThreads() - { - return 3; - } - }, + new HttpRemoteTaskRunnerConfig(), EasyMock.createNiceMock(HttpClient.class), - DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), + Suppliers.ofInstance(DefaultWorkerBehaviorConfig.defaultConfig()), new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createNiceMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), - new StubServiceEmitter("druid/overlord", "testHost") + null, + null, + serviceEmitter ); taskRunner.start(); - taskRunner.registerListener( - new TaskRunnerListener() - { - @Override - public String getListenerId() - { - return "test-listener"; - } - - @Override - public void locationChanged(String taskId, TaskLocation newLocation) - { - // do nothing - } + return taskRunner; + } - @Override - public void statusChanged(String taskId, TaskStatus status) - { - // do nothing - } - }, - Execs.directExecutor() + private TaskQueue createTaskQueue( + DefaultTaskConfig defaultTaskConfig, + TaskRunner taskRunner + ) + { + return new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, null, null, null), + defaultTaskConfig, + getTaskStorage(), + taskRunner, + actionClientFactory, + getLockbox(), + serviceEmitter ); - - return taskRunner; } private static class TestTask extends AbstractBatchIndexTask @@ -495,9 +462,9 @@ private static class TestTask extends AbstractBatchIndexTask private final Interval interval; private boolean done; - private TestTask(String id, Interval interval) + private TestTask(String id, String interval) { - this(id, interval, null); + this(id, Intervals.of(interval), null); } private TestTask(String id, Interval interval, Map context) @@ -554,7 +521,7 @@ public boolean isPerfectRollup() @Override public Granularity getSegmentGranularity() { - return SEGMENT_GRANULARITY; + return Granularities.DAY; } @Override diff --git a/processing/src/main/java/org/apache/druid/common/guava/DSuppliers.java b/processing/src/main/java/org/apache/druid/common/guava/DSuppliers.java index 86d76b11c8c8..2fc09dfd24c9 100644 --- a/processing/src/main/java/org/apache/druid/common/guava/DSuppliers.java +++ b/processing/src/main/java/org/apache/druid/common/guava/DSuppliers.java @@ -29,13 +29,6 @@ public class DSuppliers { public static Supplier of(final AtomicReference ref) { - return new Supplier() - { - @Override - public T get() - { - return ref.get(); - } - }; + return ref::get; } } From c61a2c9db338adcba3b3726438e17529662e7795 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 16 May 2023 22:33:54 +0530 Subject: [PATCH 2/9] Fix tests --- .../druid/indexing/overlord/TaskQueue.java | 5 ++-- .../indexing/overlord/TaskQueueTest.java | 25 ------------------- 2 files changed, 2 insertions(+), 28 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 7732b9b2d266..85f0a1aa3859 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -691,14 +690,14 @@ private synchronized void syncFromStorage() public Map getAndResetSuccessfulTaskCounts() { - Map total = Maps.newHashMap(datasourceToSuccessfulTaskCount); + Map total = new HashMap<>(datasourceToSuccessfulTaskCount); datasourceToSuccessfulTaskCount.clear(); return total; } public Map getAndResetFailedTaskCounts() { - Map total = Maps.newHashMap(datasourceToFailedTaskCount); + Map total = new HashMap<>(datasourceToFailedTaskCount); datasourceToFailedTaskCount.clear(); return total; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 15e0ca759b95..e4270c9c00f9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -146,31 +146,6 @@ public void testAddAfterStopThrowsException() Assert.assertTrue(taskQueue.getTasks().isEmpty()); } - @Test - public void testShutdownAfterStopThrowsException() throws EntryExistsException - { - final TestTask task = new TestTask("t1", "2021-01/2021-02"); - taskQueue.add(task); - Assert.assertEquals(1, taskQueue.getTasks().size()); - - taskQueue.stop(); - Assert.assertThrows( - IllegalStateException.class, - () -> taskQueue.shutdown(task.getId(), "killing after stop") - ); - Assert.assertTrue(taskQueue.getTasks().isEmpty()); - } - - @Test - public void testConcurrencyWithStorageSync() - { - // A: Metadata says remove, another thread says add - // B: Metadata says add, another thread says remove - - // C: tasks changing while sync is going on - // - doesn't matter, we take a snapshot of tasks and then go from there - } - /** * This test verifies releasing all locks of a task when it is not ready to run yet. * From daf123c59bea217036521e5788cfa75a220bcd02 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 17 May 2023 09:23:42 +0530 Subject: [PATCH 3/9] Track success/failed counts consistently --- .../druid/indexing/overlord/TaskQueue.java | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 85f0a1aa3859..b5f526ad2211 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -57,14 +57,16 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; @@ -114,7 +116,7 @@ public class TaskQueue private final TaskLockbox taskLockbox; private final ServiceEmitter emitter; - private final AtomicBoolean managementRequested = new AtomicBoolean(false); + private final BlockingQueue managementRequestQueue = new ArrayBlockingQueue<>(1); private final ExecutorService managerExec = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() @@ -252,10 +254,9 @@ public synchronized void stop() */ private void requestManagement() { - synchronized (managementRequested) { - managementRequested.set(true); - managementRequested.notify(); - } + // do not care if the item fits into the queue: + // if the queue is already full, request has been triggered anyway + managementRequestQueue.offer(new Object()); } /** @@ -274,12 +275,10 @@ private void awaitManagementRequest() throws InterruptedException } // Wait for management to be requested - synchronized (managementRequested) { - while (!managementRequested.get()) { - managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS); - } - managementRequested.compareAndSet(true, false); - } + managementRequestQueue.poll( + MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS, + TimeUnit.MILLISECONDS + ); } /** @@ -690,15 +689,19 @@ private synchronized void syncFromStorage() public Map getAndResetSuccessfulTaskCounts() { - Map total = new HashMap<>(datasourceToSuccessfulTaskCount); - datasourceToSuccessfulTaskCount.clear(); + Set datasources = new HashSet<>(datasourceToSuccessfulTaskCount.keySet()); + + Map total = new HashMap<>(); + datasources.forEach(ds -> total.put(ds, datasourceToSuccessfulTaskCount.remove(ds))); return total; } public Map getAndResetFailedTaskCounts() { - Map total = new HashMap<>(datasourceToFailedTaskCount); - datasourceToFailedTaskCount.clear(); + Set datasources = new HashSet<>(datasourceToFailedTaskCount.keySet()); + + Map total = new HashMap<>(); + datasources.forEach(ds -> total.put(ds, datasourceToFailedTaskCount.remove(ds))); return total; } From 66287a340921e8a5353a55b74ee5fbb84ed46802 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 17 May 2023 20:30:32 +0530 Subject: [PATCH 4/9] Add some javadocs --- .../org/apache/druid/indexing/overlord/TaskQueue.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index b5f526ad2211..29779bac1a15 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -88,11 +88,15 @@ public class TaskQueue /** * Map from Task ID to active task (submitted, running, recently finished). + * This must be a ConcurrentHashMap so that operations for any task id happen + * atomically. */ private final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); /** - * Queue of all active Task IDs. + * Queue of all active Task IDs. Updates to this queue must happen within + * {@code tasks.compute()} or {@code tasks.computeIfAbsent()} to ensure that + * this queue is always in sync with the {@code tasks} map. */ private final BlockingDeque activeTaskIdQueue = new LinkedBlockingDeque<>(); @@ -256,7 +260,7 @@ private void requestManagement() { // do not care if the item fits into the queue: // if the queue is already full, request has been triggered anyway - managementRequestQueue.offer(new Object()); + managementRequestQueue.offer(this); } /** From 893af8eaf9edabc17a12353f1d020e78a9a57dc0 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 17 May 2023 20:44:04 +0530 Subject: [PATCH 5/9] Remove extra changes --- .../druid/indexing/overlord/TaskQueue.java | 1 - .../druid/indexing/overlord/TaskQueueTest.java | 17 +++++++++++++---- .../apache/druid/common/guava/DSuppliers.java | 9 ++++++++- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 29779bac1a15..b76c396de4ec 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -66,7 +66,6 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index e4270c9c00f9..89d3f5da7ef0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -104,6 +104,18 @@ public void tearDown() taskQueue.stop(); } + @Test + public void testAddAndRunTask() throws EntryExistsException + { + final TestTask task = new TestTask("t1", "2021-01/2021-02"); + taskQueue.add(task); + Assert.assertEquals(1, taskQueue.getTasks().size()); + + taskQueue.manageTasks(); + Assert.assertTrue(task.isDone()); + Assert.assertTrue(taskQueue.getTasks().isEmpty()); + } + @Test public void testAddDuplicateTaskThrowsException() throws EntryExistsException { @@ -114,11 +126,8 @@ public void testAddDuplicateTaskThrowsException() throws EntryExistsException () -> taskQueue.add(task) ); + // Verify that the originally added task is still in queue Assert.assertEquals(1, taskQueue.getTasks().size()); - - taskQueue.manageTasks(); - Assert.assertTrue(task.isDone()); - Assert.assertTrue(taskQueue.getTasks().isEmpty()); } @Test diff --git a/processing/src/main/java/org/apache/druid/common/guava/DSuppliers.java b/processing/src/main/java/org/apache/druid/common/guava/DSuppliers.java index 2fc09dfd24c9..86d76b11c8c8 100644 --- a/processing/src/main/java/org/apache/druid/common/guava/DSuppliers.java +++ b/processing/src/main/java/org/apache/druid/common/guava/DSuppliers.java @@ -29,6 +29,13 @@ public class DSuppliers { public static Supplier of(final AtomicReference ref) { - return ref::get; + return new Supplier() + { + @Override + public T get() + { + return ref.get(); + } + }; } } From 4371249894cee91af5dc16b2eec4ae9a7248a4b1 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 17 May 2023 20:58:54 +0530 Subject: [PATCH 6/9] wip: temp changes --- .../indexing/overlord/TaskQueueTest.java | 40 ++++++++++++++++--- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 89d3f5da7ef0..aeb1f1554fc7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -76,6 +76,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.stream.Collectors; @@ -144,6 +145,20 @@ public void testShutdownIsIdempotent() throws EntryExistsException Assert.assertTrue(taskQueue.getTasks().isEmpty()); } + @Test + public void testAddBeforeStartThrowsException() + { + TaskQueue taskQueue = createTaskQueue( + new DefaultTaskConfig(), + new SimpleTaskRunner(actionClientFactory) + ); + + Assert.assertThrows( + IllegalStateException.class, + () -> taskQueue.add(new TestTask("t1", "2020/2021")) + ); + } + @Test public void testAddAfterStopThrowsException() { @@ -155,9 +170,22 @@ public void testAddAfterStopThrowsException() Assert.assertTrue(taskQueue.getTasks().isEmpty()); } + @Test + public void testCompletedTaskIsNotRelaunched() + { + + } + + @Test + public void testAddTaskWhileAnotherTaskIsBeingAdded() + { + // We would want to latch DB operations + // We might also want to latch different sections of the TaskQueue code + } + /** * This test verifies releasing all locks of a task when it is not ready to run yet. - * + *

* This test uses 2 APIs, {@link TaskQueue} APIs and {@link IngestionTestBase} APIs * to emulate the scenario of deadlock. The IngestionTestBase provides low-leve APIs * which you can manipulate {@link TaskLockbox} manually. These APIs should be used @@ -375,10 +403,10 @@ public void testKilledTasksEmitRuntimeMetricWithHttpRemote() throws EntryExistsE workerHolder ); while (!taskRunner.getRunningTasks() - .stream() - .map(TaskRunnerWorkItem::getTaskId) - .collect(Collectors.toList()) - .contains(task.getId())) { + .stream() + .map(TaskRunnerWorkItem::getTaskId) + .collect(Collectors.toList()) + .contains(task.getId())) { Thread.sleep(100); } taskQueue.shutdown(task.getId(), "shutdown"); @@ -399,7 +427,7 @@ private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List runningTask { DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) - .andReturn(new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery()); + .andReturn(new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery()); EasyMock.replay(druidNodeDiscoveryProvider); TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class); for (String taskId : runningTasks) { From 129a1a6a2497bf4acd57cc12fe1313632ad5335f Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 20 May 2023 09:58:35 +0530 Subject: [PATCH 7/9] Fix for spotbugs --- .../druid/indexing/overlord/TaskQueue.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index b76c396de4ec..1aed591e05e0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -119,7 +119,7 @@ public class TaskQueue private final TaskLockbox taskLockbox; private final ServiceEmitter emitter; - private final BlockingQueue managementRequestQueue = new ArrayBlockingQueue<>(1); + private final BlockingQueue managementRequestQueue = new ArrayBlockingQueue<>(1); private final ExecutorService managerExec = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() @@ -178,7 +178,7 @@ public synchronized void start() "Shutting down forcefully as task failed to reacquire lock while becoming leader" ); } - requestManagement(); + requestManagement("Starting TaskQueue"); // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired) // This is called after requesting management as locks need to be cleared after notifyStatus is processed for (Task task : tasksToFail) { @@ -246,7 +246,7 @@ public synchronized void stop() recentlyCompletedTaskIds.clear(); managerExec.shutdownNow(); storageSyncExec.shutdownNow(); - requestManagement(); + requestManagement("Stopping TaskQueue"); } /** @@ -255,11 +255,11 @@ public synchronized void stop() * Callers (such as notifyStatus) can trigger task management by calling * this method. */ - private void requestManagement() + private void requestManagement(String reason) { // do not care if the item fits into the queue: // if the queue is already full, request has been triggered anyway - managementRequestQueue.offer(this); + managementRequestQueue.offer(reason); } /** @@ -278,10 +278,11 @@ private void awaitManagementRequest() throws InterruptedException } // Wait for management to be requested - managementRequestQueue.poll( + String reason = managementRequestQueue.poll( MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS ); + log.debug("Received management request [%s]", reason); } /** @@ -438,7 +439,7 @@ public boolean add(final Task task) throws EntryExistsException // Do not add the task to queue if insert into metadata fails for any reason taskStorage.insert(task, TaskStatus.running(task.getId())); addTaskInternal(task); - requestManagement(); + requestManagement("Adding new task"); return true; } @@ -576,7 +577,7 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r // Cleanup internal state removeTaskInternal(task); - requestManagement(); + requestManagement("Completing task"); } /** @@ -687,7 +688,7 @@ private synchronized void syncFromStorage() newTasks.size(), addedTasks.size(), removedTasks.size() ); - requestManagement(); + requestManagement("Syncing storage"); } public Map getAndResetSuccessfulTaskCounts() From 3d6d499347bc27435b3b2aa7c678eeef44d2ad20 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 15 Jun 2023 19:33:56 +0530 Subject: [PATCH 8/9] Minor fixes --- .../apache/druid/indexing/overlord/TaskQueue.java | 13 ++++++------- .../druid/indexing/overlord/TaskQueueTest.java | 1 - 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 1aed591e05e0..75d31f805180 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -22,12 +22,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import net.thisptr.jackson.jq.internal.misc.Lists; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -171,23 +171,22 @@ public synchronized void start() active = true; // Mark these tasks as failed as they could not reacquire locks - Set tasksToFail = taskLockbox.syncFromStorage().getTasksToFail(); + final Set tasksToFail = taskLockbox.syncFromStorage().getTasksToFail(); for (Task task : tasksToFail) { shutdown( task.getId(), "Shutting down forcefully as task failed to reacquire lock while becoming leader" ); - } - requestManagement("Starting TaskQueue"); - // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired) - // This is called after requesting management as locks need to be cleared after notifyStatus is processed - for (Task task : tasksToFail) { + + // Remove any locks not cleared by shutdown, e.g. those for which TaskLockPosse was not acquired for (TaskLock lock : taskStorage.getLocks(task.getId())) { taskStorage.removeLock(task.getId(), lock); } } log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size()); + requestManagement("Starting TaskQueue"); + // Submit task management job managerExec.submit( () -> { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index aeb1f1554fc7..b88dc1bf06de 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -76,7 +76,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.stream.Collectors; From 2096a00314c44f892d0b69943ff8f1e87c69117a Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 15 Jun 2023 21:43:47 +0530 Subject: [PATCH 9/9] Fix TaskQueueScaleTest --- .../org/apache/druid/indexing/overlord/TaskQueueScaleTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java index d305b0d6c9b2..396c6fe9f0f4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -474,7 +474,7 @@ public String getTaskType() @Override public String getDataSource() { - throw new UnsupportedOperationException(); + return DATASOURCE; } public void setResult(final TaskStatus result)