diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 1eab403585fd..84c37956f9f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -312,7 +312,7 @@ public Map getSuccessfulTaskCount() { Optional taskQueue = getTaskQueue(); if (taskQueue.isPresent()) { - return taskQueue.get().getSuccessfulTaskCount(); + return taskQueue.get().getAndResetSuccessfulTaskCounts(); } else { return null; } @@ -323,7 +323,7 @@ public Map getFailedTaskCount() { Optional taskQueue = getTaskQueue(); if (taskQueue.isPresent()) { - return taskQueue.get().getFailedTaskCount(); + return taskQueue.get().getAndResetFailedTaskCounts(); } else { return null; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 0beccce3dd03..3019310313e5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -22,18 +22,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.errorprone.annotations.concurrent.GuardedBy; -import org.apache.druid.annotations.SuppressFBWarnings; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.Counters; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.task.IndexTaskUtils; @@ -44,7 +41,6 @@ import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -53,26 +49,24 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.EntryExistsException; -import org.apache.druid.utils.CollectionUtils; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -86,20 +80,35 @@ */ public class TaskQueue { - private static final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60); - private static final long MIN_WAIT_TIME_MS = 100; + private static final long MANAGEMENT_WAIT_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60); + private static final long MIN_WAIT_TIME_MILLIS = 100; - // Task ID -> Task, for tasks that are active in some way (submitted, running, or finished and to-be-cleaned-up). - @GuardedBy("giant") - private final LinkedHashMap tasks = new LinkedHashMap<>(); + private static final EmittingLogger log = new EmittingLogger(TaskQueue.class); + + /** + * Map from Task ID to active task (submitted, running, recently finished). + * This must be a ConcurrentHashMap so that operations for any task id happen + * atomically. + */ + private final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); + + /** + * Queue of all active Task IDs. Updates to this queue must happen within + * {@code tasks.compute()} or {@code tasks.computeIfAbsent()} to ensure that + * this queue is always in sync with the {@code tasks} map. + */ + private final BlockingDeque activeTaskIdQueue = new LinkedBlockingDeque<>(); - // Task ID -> Future from the TaskRunner - @GuardedBy("giant") - private final Map> taskFutures = new HashMap<>(); + /** + * Tasks that have already been submitted to the TaskRunner. + */ + private final Set submittedTaskIds = Sets.newConcurrentHashSet(); - // Tasks that are in the process of being cleaned up by notifyStatus. Prevents manageInternal from re-launching them. - @GuardedBy("giant") - private final Set recentlyCompletedTasks = new HashSet<>(); + /** + * Tasks that have recently completed and are being cleaned up. These tasks + * should not be relaunched by task management. + */ + private final Set recentlyCompletedTaskIds = Sets.newConcurrentHashSet(); private final TaskLockConfig lockConfig; private final TaskQueueConfig config; @@ -110,9 +119,8 @@ public class TaskQueue private final TaskLockbox taskLockbox; private final ServiceEmitter emitter; - private final ReentrantLock giant = new ReentrantLock(true); - @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") - private final BlockingQueue managementMayBeNecessary = new ArrayBlockingQueue<>(8); + private final BlockingQueue managementRequestQueue = new ArrayBlockingQueue<>(1); + private final ExecutorService managerExec = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setDaemon(false) @@ -126,14 +134,8 @@ public class TaskQueue private volatile boolean active = false; - private static final EmittingLogger log = new EmittingLogger(TaskQueue.class); - - private final ConcurrentHashMap totalSuccessfulTaskCount = new ConcurrentHashMap<>(); - private final ConcurrentHashMap totalFailedTaskCount = new ConcurrentHashMap<>(); - @GuardedBy("totalSuccessfulTaskCount") - private Map prevTotalSuccessfulTaskCount = new HashMap<>(); - @GuardedBy("totalFailedTaskCount") - private Map prevTotalFailedTaskCount = new HashMap<>(); + private final ConcurrentHashMap datasourceToSuccessfulTaskCount = new ConcurrentHashMap<>(); + private final ConcurrentHashMap datasourceToFailedTaskCount = new ConcurrentHashMap<>(); public TaskQueue( TaskLockConfig lockConfig, @@ -156,314 +158,245 @@ public TaskQueue( this.emitter = Preconditions.checkNotNull(emitter, "emitter"); } - @VisibleForTesting - void setActive(boolean active) - { - this.active = active; - } - /** * Starts this task queue. Allows {@link #add(Task)} to accept new tasks. */ @LifecycleStart - public void start() + public synchronized void start() { - giant.lock(); + Preconditions.checkState(!active, "queue must be stopped"); + + // Mark queue as active only after first sync is complete + syncFromStorage(); + active = true; + + // Mark these tasks as failed as they could not reacquire locks + final Set tasksToFail = taskLockbox.syncFromStorage().getTasksToFail(); + for (Task task : tasksToFail) { + shutdown( + task.getId(), + "Shutting down forcefully as task failed to reacquire lock while becoming leader" + ); - try { - Preconditions.checkState(!active, "queue must be stopped"); - active = true; - syncFromStorage(); - // Mark these tasks as failed as they could not reacuire the lock - // Clean up needs to happen after tasks have been synced from storage - Set tasksToFail = taskLockbox.syncFromStorage().getTasksToFail(); - for (Task task : tasksToFail) { - shutdown(task.getId(), - "Shutting down forcefully as task failed to reacquire lock while becoming leader"); + // Remove any locks not cleared by shutdown, e.g. those for which TaskLockPosse was not acquired + for (TaskLock lock : taskStorage.getLocks(task.getId())) { + taskStorage.removeLock(task.getId(), lock); } - managerExec.submit( - new Runnable() - { - @Override - public void run() - { - while (true) { - try { - manage(); - break; - } - catch (InterruptedException e) { - log.info("Interrupted, exiting!"); - break; - } - catch (Exception e) { - final long restartDelay = config.getRestartDelay().getMillis(); - log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit(); - try { - Thread.sleep(restartDelay); - } - catch (InterruptedException e2) { - log.info("Interrupted, exiting!"); - break; - } - } - } + } + log.info("Cleaned up [%d] tasks which failed to reacquire locks.", tasksToFail.size()); + + requestManagement("Starting TaskQueue"); + + // Submit task management job + managerExec.submit( + () -> { + log.info("Beginning task management in [%s].", config.getStartDelay()); + long startDelayMillis = config.getStartDelay().getMillis(); + while (active) { + try { + Thread.sleep(startDelayMillis); + runTaskManagement(); + } + catch (InterruptedException e) { + log.info("Interrupted, stopping task management."); + break; + } + catch (Exception e) { + startDelayMillis = config.getRestartDelay().getMillis(); + log.makeAlert(e, "Failed to manage").addData("restartDelay", startDelayMillis).emit(); } } - ); - ScheduledExecutors.scheduleAtFixedRate( - storageSyncExec, - config.getStorageSyncRate(), - new Callable() - { - @Override - public ScheduledExecutors.Signal call() - { - try { - syncFromStorage(); - } - catch (Exception e) { - if (active) { - log.makeAlert(e, "Failed to sync with storage").emit(); - } - } - if (active) { - return ScheduledExecutors.Signal.REPEAT; - } else { - return ScheduledExecutors.Signal.STOP; - } + } + ); + + // Schedule storage sync job + ScheduledExecutors.scheduleAtFixedRate( + storageSyncExec, + config.getStorageSyncRate(), + () -> { + if (!active) { + log.info("Stopping storage sync as TaskQueue has been stopped"); + return ScheduledExecutors.Signal.STOP; + } + + try { + syncFromStorage(); + } + catch (Exception e) { + if (active) { + log.makeAlert(e, "Failed to sync with storage").emit(); } } - ); - requestManagement(); - // Remove any unacquired locks from storage (shutdown only clears entries for which a TaskLockPosse was acquired) - // This is called after requesting management as locks need to be cleared after notifyStatus is processed - for (Task task : tasksToFail) { - for (TaskLock lock : taskStorage.getLocks(task.getId())) { - taskStorage.removeLock(task.getId(), lock); + + return active ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP; } - } - } - finally { - giant.unlock(); - } + ); } /** * Shuts down the queue. */ @LifecycleStop - public void stop() - { - giant.lock(); - - try { - tasks.clear(); - taskFutures.clear(); - active = false; - managerExec.shutdownNow(); - storageSyncExec.shutdownNow(); - requestManagement(); - } - finally { - giant.unlock(); - } - } - - public boolean isActive() + public synchronized void stop() { - return active; + active = false; + tasks.clear(); + submittedTaskIds.clear(); + recentlyCompletedTaskIds.clear(); + managerExec.shutdownNow(); + storageSyncExec.shutdownNow(); + requestManagement("Stopping TaskQueue"); } /** * Request management from the management thread. Non-blocking. - * - * Other callers (such as notifyStatus) should trigger activity on the - * TaskQueue thread by requesting management here. + *

+ * Callers (such as notifyStatus) can trigger task management by calling + * this method. */ - void requestManagement() + private void requestManagement(String reason) { - // use a BlockingQueue since the offer/poll/wait behaviour is simple - // and very easy to reason about - - // the request has to be offer (non blocking), since someone might request - // while already holding giant lock - // do not care if the item fits into the queue: // if the queue is already full, request has been triggered anyway - managementMayBeNecessary.offer(this); + managementRequestQueue.offer(reason); } /** - * Await for an event to manage. - * - * This should only be called from the management thread to wait for activity. + * Waits for a management request to be triggered by another thread. * - * @param nanos - * @throws InterruptedException + * @throws InterruptedException if the thread is interrupted while waiting. */ - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = "using queue as notification mechanism, result has no value") - void awaitManagementNanos(long nanos) throws InterruptedException + private void awaitManagementRequest() throws InterruptedException { // mitigate a busy loop, it can get pretty busy when there are a lot of start/stops try { - Thread.sleep(MIN_WAIT_TIME_MS); + Thread.sleep(MIN_WAIT_TIME_MILLIS); } catch (InterruptedException e) { throw new RuntimeException(e); } - // wait for an item, if an item arrives (or is already available), complete immediately - // (does not actually matter what the item is) - managementMayBeNecessary.poll(nanos - (TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS); - - // there may have been multiple requests, clear them all - managementMayBeNecessary.clear(); + // Wait for management to be requested + String reason = managementRequestQueue.poll( + MANAGEMENT_WAIT_TIMEOUT_MILLIS - MIN_WAIT_TIME_MILLIS, + TimeUnit.MILLISECONDS + ); + log.debug("Received management request [%s]", reason); } /** * Main task runner management loop. Meant to run forever, or, at least until we're stopped. */ - private void manage() throws InterruptedException + private void runTaskManagement() throws InterruptedException { - log.info("Beginning management in %s.", config.getStartDelay()); - Thread.sleep(config.getStartDelay().getMillis()); - // Ignore return value- we'll get the IDs and futures from getKnownTasks later. taskRunner.restore(); while (active) { - manageInternal(); - - // awaitNanos because management may become necessary without this condition signalling, - // due to e.g. tasks becoming ready when other folks mess with the TaskLockbox. - awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS); + manageTasks(); + awaitManagementRequest(); } } @VisibleForTesting - void manageInternal() + void manageTasks() { - Set knownTaskIds = new HashSet<>(); - Map> runnerTaskFutures = new HashMap<>(); - - giant.lock(); - - try { - manageInternalCritical(knownTaskIds, runnerTaskFutures); - } - finally { - giant.unlock(); - } - - manageInternalPostCritical(knownTaskIds, runnerTaskFutures); + runReadyTasks(); + killUnknownTasks(); } /** - * Management loop critical section tasks. - * - * @param knownTaskIds will be modified - filled with known task IDs - * @param runnerTaskFutures will be modified - filled with futures related to getting the running tasks + * Submits ready tasks to the TaskRunner. + *

+ * This method should be called only by the management thread. */ - @GuardedBy("giant") - private void manageInternalCritical( - final Set knownTaskIds, - final Map> runnerTaskFutures - ) + private synchronized void runReadyTasks() { // Task futures available from the taskRunner + final Map> runnerTaskFutures = new HashMap<>(); for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) { - if (!recentlyCompletedTasks.contains(workItem.getTaskId())) { - // Don't do anything with tasks that have recently finished; notifyStatus will handle it. + if (!recentlyCompletedTaskIds.contains(workItem.getTaskId())) { + // Don't do anything with recently completed tasks; notifyStatus will handle it. runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult()); } } + // Attain futures for all active tasks (assuming they are ready to run). - // Copy tasks list, as notifyStatus may modify it. - for (final Task task : ImmutableList.copyOf(tasks.values())) { - if (recentlyCompletedTasks.contains(task.getId())) { - // Don't do anything with tasks that have recently finished; notifyStatus will handle it. - continue; + for (final String taskId : Lists.newArrayList(activeTaskIdQueue)) { + final Task task = tasks.get(taskId); + if (task == null || recentlyCompletedTaskIds.contains(taskId)) { + // Don't do anything for unknown tasks or recently completed tasks + } else if (submittedTaskIds.contains(taskId)) { + // Re-trigger execution of pending task to avoid unnecessary delays + // see https://github.com/apache/druid/pull/6991 + if (isTaskPending(task)) { + taskRunner.run(task); + } + } else if (runnerTaskFutures.containsKey(taskId)) { + attachCallbacks(task, runnerTaskFutures.get(taskId)); + submittedTaskIds.add(taskId); + } else if (isTaskReady(task)) { + log.info("Asking taskRunner to run ready task [%s].", taskId); + attachCallbacks(task, taskRunner.run(task)); + submittedTaskIds.add(taskId); + } else { + // Release all locks (possibly acquired by task.isReady()) if task is not ready + taskLockbox.unlockAll(task); } + } + } - knownTaskIds.add(task.getId()); - - if (!taskFutures.containsKey(task.getId())) { - final ListenableFuture runnerTaskFuture; - if (runnerTaskFutures.containsKey(task.getId())) { - runnerTaskFuture = runnerTaskFutures.get(task.getId()); - } else { - // Task should be running, so run it. - final boolean taskIsReady; - try { - taskIsReady = task.isReady(taskActionClientFactory.create(task)); - } - catch (Exception e) { - log.warn(e, "Exception thrown during isReady for task: %s", task.getId()); - final String errorMessage; - if (e instanceof MaxAllowedLocksExceededException) { - errorMessage = e.getMessage(); - } else { - errorMessage = "Failed while waiting for the task to be ready to run. " - + "See overlord logs for more details."; - } - notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage); - continue; - } - if (taskIsReady) { - log.info("Asking taskRunner to run: %s", task.getId()); - runnerTaskFuture = taskRunner.run(task); - } else { - // Task.isReady() can internally lock intervals or segments. - // We should release them if the task is not ready. - taskLockbox.unlockAll(task); - continue; - } - } - taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture)); - } else if (isTaskPending(task)) { - // if the taskFutures contain this task and this task is pending, also let the taskRunner - // to run it to guarantee it will be assigned to run - // see https://github.com/apache/druid/pull/6991 - taskRunner.run(task); + private boolean isTaskReady(Task task) + { + try { + return task.isReady(taskActionClientFactory.create(task)); + } + catch (Exception e) { + log.warn(e, "Error while checking if task [%s] is ready to run.", task.getId()); + final String errorMessage; + if (e instanceof MaxAllowedLocksExceededException) { + errorMessage = e.getMessage(); + } else { + errorMessage = "Failed while waiting for the task to be ready to run. " + + "See overlord logs for more details."; } + notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), errorMessage); + return false; } } - @VisibleForTesting - private void manageInternalPostCritical( - final Set knownTaskIds, - final Map> runnerTaskFutures - ) + /** + * Kills tasks not present in the set of known tasks. + */ + private void killUnknownTasks() { - // Kill tasks that shouldn't be running - final Set tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds); - if (!tasksToKill.isEmpty()) { - log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size()); - - // On large installations running several thousands of tasks, - // concatenating the list of known task ids can be compupationally expensive. - final boolean logKnownTaskIds = log.isDebugEnabled(); - final String reason = logKnownTaskIds - ? StringUtils.format("Task is not in knownTaskIds[%s]", knownTaskIds) - : "Task is not in knownTaskIds"; - - for (final String taskId : tasksToKill) { - try { - taskRunner.shutdown(taskId, reason); - } - catch (Exception e) { - log.warn(e, "TaskRunner failed to clean up task: %s", taskId); - } + final Set knownTaskIds = tasks.keySet(); + final Set runnerTaskIds = new HashSet<>(submittedTaskIds); + taskRunner.getKnownTasks().forEach(item -> runnerTaskIds.add(item.getTaskId())); + + final Set tasksToKill = Sets.difference(runnerTaskIds, knownTaskIds); + tasksToKill.removeAll(recentlyCompletedTaskIds); + if (tasksToKill.isEmpty()) { + return; + } + + log.info("Asking TaskRunner to clean up [%,d] unknown tasks.", tasksToKill.size()); + log.debug("Killing all tasks not present in known task ids [%s].", knownTaskIds); + + for (final String taskId : tasksToKill) { + try { + taskRunner.shutdown(taskId, "Killing unknown task"); + } + catch (Exception e) { + log.warn(e, "TaskRunner failed to shutdown unknown task [%s]", taskId); } } } private boolean isTaskPending(Task task) { - return taskRunner.getPendingTasks() - .stream() + return taskRunner.getPendingTasks().stream() .anyMatch(workItem -> workItem.getTaskId().equals(task.getId())); } @@ -478,6 +411,13 @@ private boolean isTaskPending(Task task) */ public boolean add(final Task task) throws EntryExistsException { + Preconditions.checkState(active, "Queue is not active!"); + Preconditions.checkNotNull(task, "task"); + Preconditions.checkState( + tasks.size() < config.getMaxSize(), + "Task queue is already full, max size is [%,d].", config.getMaxSize() + ); + // Before adding the task, validate the ID, so it can be safely used in file paths, znodes, etc. IdUtils.validateId("Task ID", task.getId()); @@ -488,58 +428,56 @@ public boolean add(final Task task) throws EntryExistsException // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec. task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock()); defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent); - // Every task shuold use the lineage-based segment allocation protocol unless it is explicitly set to + // Every task should use the lineage-based segment allocation protocol unless it is explicitly set to // using the legacy protocol. task.addToContextIfAbsent( SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY, SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION ); - giant.lock(); - - try { - Preconditions.checkState(active, "Queue is not active!"); - Preconditions.checkNotNull(task, "task"); - Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize()); - - // If this throws with any sort of exception, including TaskExistsException, we don't want to - // insert the task into our queue. So don't catch it. - taskStorage.insert(task, TaskStatus.running(task.getId())); - addTaskInternal(task); - requestManagement(); - return true; - } - finally { - giant.unlock(); - } + // Do not add the task to queue if insert into metadata fails for any reason + taskStorage.insert(task, TaskStatus.running(task.getId())); + addTaskInternal(task); + requestManagement("Adding new task"); + return true; } - @GuardedBy("giant") + /** + * Atomically adds this task to the TaskQueue. + */ private void addTaskInternal(final Task task) { - final Task existingTask = tasks.putIfAbsent(task.getId(), task); - - if (existingTask == null) { - taskLockbox.add(task); - } else if (!existingTask.equals(task)) { - throw new ISE("Cannot add task ID [%s] with same ID as task that has already been added", task.getId()); - } + tasks.computeIfAbsent( + task.getId(), + taskId -> { + activeTaskIdQueue.add(taskId); + taskLockbox.add(task); + return task; + } + ); } /** - * Removes a task from {@link #tasks} and {@link #taskLockbox}, if it exists. Returns whether the task was - * removed or not. + * Atomically removes this task from the TaskQueue. */ - @GuardedBy("giant") - private boolean removeTaskInternal(final String taskId) + private void removeTaskInternal(final Task task) { - final Task task = tasks.remove(taskId); - if (task != null) { - taskLockbox.remove(task); - return true; - } else { - return false; - } + tasks.compute( + task.getId(), + (taskId, existingTask) -> { + if (existingTask != null) { + taskLockbox.remove(existingTask); + activeTaskIdQueue.remove(taskId); + } else { + log.warn("Removing unknown task [%s]", taskId); + } + + submittedTaskIds.remove(taskId); + recentlyCompletedTaskIds.remove(taskId); + + return null; + } + ); } /** @@ -551,16 +489,9 @@ private boolean removeTaskInternal(final String taskId) */ public void shutdown(final String taskId, String reasonFormat, Object... args) { - giant.lock(); - - try { - final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId")); - if (task != null) { - notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args); - } - } - finally { - giant.unlock(); + final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId")); + if (task != null) { + notifyStatus(task, TaskStatus.failure(taskId, StringUtils.format(reasonFormat, args)), reasonFormat, args); } } @@ -573,16 +504,9 @@ public void shutdown(final String taskId, String reasonFormat, Object... args) */ public void shutdownWithSuccess(final String taskId, String reasonFormat, Object... args) { - giant.lock(); - - try { - final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId")); - if (task != null) { - notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args); - } - } - finally { - giant.unlock(); + final Task task = tasks.get(Preconditions.checkNotNull(taskId, "taskId")); + if (task != null) { + notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args); } } @@ -615,14 +539,8 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r return; } - // Critical section: add this task to recentlyCompletedTasks, so it isn't managed while being cleaned up. - giant.lock(); - try { - recentlyCompletedTasks.add(task.getId()); - } - finally { - giant.unlock(); - } + // Add this task to recentlyCompletedTasks, so it isn't managed while being cleaned up. + recentlyCompletedTaskIds.add(task.getId()); final TaskLocation taskLocation = taskRunner.getTaskLocation(task.getId()); @@ -631,7 +549,8 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r try { final Optional previousStatus = taskStorage.getStatus(task.getId()); if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) { - log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit(); + log.makeAlert("Ignoring notification for already-complete task") + .addData("task", task.getId()).emit(); } else { taskStorage.setStatus(taskStatus.withLocation(taskLocation)); } @@ -652,24 +571,12 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r catch (Throwable e) { // If task runner shutdown fails, continue with the task shutdown routine. We'll come back and try to // shut it down again later in manageInternalPostCritical, once it's removed from the "tasks" map. - log.warn(e, "TaskRunner failed to cleanup task after completion: %s", task.getId()); + log.warn(e, "TaskRunner failed to cleanup task [%s] after completion", task.getId()); } - // Critical section: remove this task from all of our tracking data structures. - giant.lock(); - try { - if (removeTaskInternal(task.getId())) { - taskFutures.remove(task.getId()); - } else { - log.warn("Unknown task completed: %s", task.getId()); - } - - recentlyCompletedTasks.remove(task.getId()); - requestManagement(); - } - finally { - giant.unlock(); - } + // Cleanup internal state + removeTaskInternal(task); + requestManagement("Completing task"); } /** @@ -677,10 +584,8 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r * appropriate updates. * * @param statusFuture a task status future - * - * @return the same future, for convenience */ - private ListenableFuture attachCallbacks(final Task task, final ListenableFuture statusFuture) + private void attachCallbacks(final Task task, final ListenableFuture statusFuture) { final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); IndexTaskUtils.setTaskDimensions(metricBuilder, task); @@ -692,7 +597,7 @@ private ListenableFuture attachCallbacks(final Task task, final List @Override public void onSuccess(final TaskStatus status) { - log.info("Received %s status for task: %s", status.getStatusCode(), status.getId()); + log.info("Received status [%s] for task [%s]", status.getStatusCode(), status.getId()); handleStatus(status); } @@ -705,7 +610,7 @@ public void onFailure(final Throwable t) .addData("dataSource", task.getDataSource()) .emit(); handleStatus( - TaskStatus.failure(task.getId(), "Failed to run this task. See overlord logs for more details.") + TaskStatus.failure(task.getId(), "Failed to run task. See overlord logs for more details.") ); } @@ -715,7 +620,7 @@ private void handleStatus(final TaskStatus status) // If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set // after we check and before we commit the database transaction, but better than nothing. if (!active) { - log.info("Abandoning task due to shutdown: %s", task.getId()); + log.info("Not handling status of task [%s] as TaskQueue is stopped.", task.getId()); return; } @@ -727,16 +632,15 @@ private void handleStatus(final TaskStatus status) emitter.emit(metricBuilder.build("task/run/time", status.getDuration())); log.info( - "Task %s: %s (%d run duration)", - status.getStatusCode(), - task.getId(), - status.getDuration() + "Completed task [%s] with status [%s] in [%d] ms.", + task.getId(), status.getStatusCode(), status.getDuration() ); + final String datasource = task.getDataSource(); if (status.isSuccess()) { - Counters.incrementAndGetLong(totalSuccessfulTaskCount, task.getDataSource()); + datasourceToSuccessfulTaskCount.compute(datasource, (ds, c) -> c == null ? 1L : c + 1); } else { - Counters.incrementAndGetLong(totalFailedTaskCount, task.getDataSource()); + datasourceToFailedTaskCount.compute(datasource, (ds, c) -> c == null ? 1L : c + 1); } } } @@ -749,131 +653,73 @@ private void handleStatus(final TaskStatus status) } } ); - return statusFuture; } /** - * Resync the contents of this task queue with our storage facility. Useful to make sure our in-memory state - * corresponds to the storage facility even if the latter is manually modified. + * Resync the contents of this task queue with our storage facility. + * Useful to make sure our in-memory state corresponds to the storage facility + * even if the latter is manually modified. + *

+ * This method must be called only when queue is {@link #active}, except when + * starting up. */ - private void syncFromStorage() + private synchronized void syncFromStorage() { - giant.lock(); - - try { - if (active) { - final Map newTasks = toTaskIDMap(taskStorage.getActiveTasks()); - final int tasksSynced = newTasks.size(); - final Map oldTasks = new HashMap<>(tasks); - - // Calculate differences on IDs instead of Task Objects. - Set commonIds = Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet())); - for (String taskID : commonIds) { - newTasks.remove(taskID); - oldTasks.remove(taskID); - } - Collection addedTasks = newTasks.values(); - Collection removedTasks = oldTasks.values(); - - // Clean up removed Tasks - for (Task task : removedTasks) { - removeTaskInternal(task.getId()); - } - - // Add newly Added tasks to the queue - for (Task task : addedTasks) { - addTaskInternal(task); - } - - log.info( - "Synced %d tasks from storage (%d tasks added, %d tasks removed).", - tasksSynced, - addedTasks.size(), - removedTasks.size() - ); - requestManagement(); - } else { - log.info("Not active. Skipping storage sync."); - } - } - catch (Exception e) { - log.warn(e, "Failed to sync tasks from storage!"); - throw new RuntimeException(e); - } - finally { - giant.unlock(); - } - } + final Map newTasks = taskStorage.getActiveTasks().stream().collect( + Collectors.toMap(Task::getId, Function.identity()) + ); + final Map oldTasks = new HashMap<>(tasks); + + // Calculate differences on IDs instead of Task Objects. + Set commonIds = Sets.intersection(newTasks.keySet(), oldTasks.keySet()); + for (String taskId : commonIds) { + newTasks.remove(taskId); + oldTasks.remove(taskId); + } + Collection addedTasks = newTasks.values(); + Collection removedTasks = oldTasks.values(); + + // Add new tasks and clean up removed tasks + addedTasks.forEach(this::addTaskInternal); + removedTasks.forEach(this::removeTaskInternal); + log.info( + "Synced [%d] tasks from storage. Added [%d] tasks, removed [%d] tasks.", + newTasks.size(), addedTasks.size(), removedTasks.size() + ); - private static Map toTaskIDMap(List taskList) - { - Map rv = new HashMap<>(); - for (Task task : taskList) { - rv.put(task.getId(), task); - } - return rv; + requestManagement("Syncing storage"); } - private Map getDeltaValues(Map total, Map prev) + public Map getAndResetSuccessfulTaskCounts() { - return total.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L))); - } + Set datasources = new HashSet<>(datasourceToSuccessfulTaskCount.keySet()); - public Map getSuccessfulTaskCount() - { - Map total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get); - synchronized (totalSuccessfulTaskCount) { - Map delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); - prevTotalSuccessfulTaskCount = total; - return delta; - } + Map total = new HashMap<>(); + datasources.forEach(ds -> total.put(ds, datasourceToSuccessfulTaskCount.remove(ds))); + return total; } - public Map getFailedTaskCount() + public Map getAndResetFailedTaskCounts() { - Map total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get); - synchronized (totalFailedTaskCount) { - Map delta = getDeltaValues(total, prevTotalFailedTaskCount); - prevTotalFailedTaskCount = total; - return delta; - } - } + Set datasources = new HashSet<>(datasourceToFailedTaskCount.keySet()); - Map getCurrentTaskDatasources() - { - giant.lock(); - try { - return tasks.values().stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); - } - finally { - giant.unlock(); - } + Map total = new HashMap<>(); + datasources.forEach(ds -> total.put(ds, datasourceToFailedTaskCount.remove(ds))); + return total; } public Map getRunningTaskCount() { - Map taskDatasources = getCurrentTaskDatasources(); - return taskRunner.getRunningTasks() - .stream() - .collect(Collectors.toMap( - e -> taskDatasources.getOrDefault(e.getTaskId(), ""), - e -> 1L, - Long::sum - )); + return taskRunner.getRunningTasks().stream().collect( + Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum) + ); } public Map getPendingTaskCount() { - Map taskDatasources = getCurrentTaskDatasources(); - return taskRunner.getPendingTasks() - .stream() - .collect(Collectors.toMap( - e -> taskDatasources.getOrDefault(e.getTaskId(), ""), - e -> 1L, - Long::sum - )); + return taskRunner.getPendingTasks().stream().collect( + Collectors.toMap(TaskRunnerWorkItem::getDataSource, task -> 1L, Long::sum) + ); } public Map getWaitingTaskCount() @@ -883,25 +729,14 @@ public Map getWaitingTaskCount() .map(TaskRunnerWorkItem::getTaskId) .collect(Collectors.toSet()); - giant.lock(); - try { - return tasks.values().stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())) - .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); - } - finally { - giant.unlock(); - } + return tasks.values().stream() + .filter(task -> !runnerKnownTaskIds.contains(task.getId())) + .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); } @VisibleForTesting List getTasks() { - giant.lock(); - try { - return new ArrayList<>(tasks.values()); - } - finally { - giant.unlock(); - } + return new ArrayList<>(tasks.values()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 6b5927bf1791..026bb9aa18ba 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -733,9 +733,7 @@ private TaskQueue setUpTaskQueue(TaskStorage ts, TaskRunner tr) throws Exception @After public void tearDown() { - if (taskQueue.isActive()) { - taskQueue.stop(); - } + taskQueue.stop(); } @Test @@ -1505,9 +1503,7 @@ private TaskStatus runTask(final Task task) throws Exception // Since multiple tasks can be run in a single unit test using runTask(), hence this check and synchronization synchronized (this) { - if (!taskQueue.isActive()) { - taskQueue.start(); - } + taskQueue.start(); } taskQueue.add(dummyTask); taskQueue.add(task); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java index d305b0d6c9b2..396c6fe9f0f4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -474,7 +474,7 @@ public String getTaskType() @Override public String getDataSource() { - throw new UnsupportedOperationException(); + return DATASOURCE; } public void setResult(final TaskStatus result) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 6388cdd573cd..b88dc1bf06de 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -20,12 +20,11 @@ package org.apache.druid.indexing.overlord; import com.google.common.base.Optional; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.curator.framework.CuratorFramework; -import org.apache.druid.common.guava.DSuppliers; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.indexer.TaskLocation; @@ -57,20 +56,18 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.segment.TestHelper; -import org.apache.druid.server.initialization.IndexerZkConfig; -import org.apache.druid.server.initialization.ZkPathsConfig; -import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.Interval; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -80,16 +77,114 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; public class TaskQueueTest extends IngestionTestBase { - private static final Granularity SEGMENT_GRANULARITY = Granularities.DAY; + private TaskActionClientFactory actionClientFactory; + private StubServiceEmitter serviceEmitter; + + private TaskQueue taskQueue; + + @Before + public void setup() + { + actionClientFactory = createActionClientFactory(); + serviceEmitter = new StubServiceEmitter("taskQueueTest", "localhost"); + taskQueue = createTaskQueue( + new DefaultTaskConfig(), + new SimpleTaskRunner(actionClientFactory) + ); + taskQueue.start(); + } + + @After + public void tearDown() + { + taskQueue.stop(); + } + + @Test + public void testAddAndRunTask() throws EntryExistsException + { + final TestTask task = new TestTask("t1", "2021-01/2021-02"); + taskQueue.add(task); + Assert.assertEquals(1, taskQueue.getTasks().size()); + + taskQueue.manageTasks(); + Assert.assertTrue(task.isDone()); + Assert.assertTrue(taskQueue.getTasks().isEmpty()); + } + + @Test + public void testAddDuplicateTaskThrowsException() throws EntryExistsException + { + final TestTask task = new TestTask("t1", "2021-01/2021-02"); + taskQueue.add(task); + Assert.assertThrows( + EntryExistsException.class, + () -> taskQueue.add(task) + ); + + // Verify that the originally added task is still in queue + Assert.assertEquals(1, taskQueue.getTasks().size()); + } + + @Test + public void testShutdownIsIdempotent() throws EntryExistsException + { + final TestTask task = new TestTask("t1", "2021-01/2021-02"); + taskQueue.add(task); + Assert.assertEquals(1, taskQueue.getTasks().size()); + + taskQueue.shutdown(task.getId(), "killing"); + Assert.assertTrue(taskQueue.getTasks().isEmpty()); + + taskQueue.shutdown(task.getId(), "killing again"); + Assert.assertTrue(taskQueue.getTasks().isEmpty()); + } + + @Test + public void testAddBeforeStartThrowsException() + { + TaskQueue taskQueue = createTaskQueue( + new DefaultTaskConfig(), + new SimpleTaskRunner(actionClientFactory) + ); + + Assert.assertThrows( + IllegalStateException.class, + () -> taskQueue.add(new TestTask("t1", "2020/2021")) + ); + } + + @Test + public void testAddAfterStopThrowsException() + { + taskQueue.stop(); + + final TestTask task = new TestTask("t1", "2021-01/2021-02"); + Assert.assertThrows(IllegalStateException.class, () -> taskQueue.add(task)); + + Assert.assertTrue(taskQueue.getTasks().isEmpty()); + } + + @Test + public void testCompletedTaskIsNotRelaunched() + { + + } + + @Test + public void testAddTaskWhileAnotherTaskIsBeingAdded() + { + // We would want to latch DB operations + // We might also want to latch different sections of the TaskQueue code + } /** * This test verifies releasing all locks of a task when it is not ready to run yet. - * + *

* This test uses 2 APIs, {@link TaskQueue} APIs and {@link IngestionTestBase} APIs * to emulate the scenario of deadlock. The IngestionTestBase provides low-leve APIs * which you can manipulate {@link TaskLockbox} manually. These APIs should be used @@ -99,35 +194,23 @@ public class TaskQueueTest extends IngestionTestBase @Test public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); // task1 emulates a case when there is a task that was issued before task2 and acquired locks conflicting // to task2. - final TestTask task1 = new TestTask("t1", Intervals.of("2021-01/P1M")); + final TestTask task1 = new TestTask("t1", "2021-01/P1M"); // Manually get locks for task1. task2 cannot be ready because of task1. prepareTaskForLocking(task1); Assert.assertTrue(task1.isReady(actionClientFactory.create(task1))); - final TestTask task2 = new TestTask("t2", Intervals.of("2021-01-31/P1M")); + final TestTask task2 = new TestTask("t2", "2021-01-31/P1M"); taskQueue.add(task2); - taskQueue.manageInternal(); + taskQueue.manageTasks(); Assert.assertFalse(task2.isDone()); Assert.assertTrue(getLockbox().findLocksForTask(task2).isEmpty()); // task3 can run because task2 is still blocked by task1. - final TestTask task3 = new TestTask("t3", Intervals.of("2021-02-01/P1M")); + final TestTask task3 = new TestTask("t3", "2021-02-01/P1M"); taskQueue.add(task3); - taskQueue.manageInternal(); + taskQueue.manageTasks(); Assert.assertFalse(task2.isDone()); Assert.assertTrue(task3.isDone()); Assert.assertTrue(getLockbox().findLocksForTask(task2).isEmpty()); @@ -137,28 +220,15 @@ public void testManageInternalReleaseLockWhenTaskIsNotReady() throws Exception taskQueue.shutdown(task3.getId(), "Emulating shutdown of task3"); // Now task2 should run. - taskQueue.manageInternal(); + taskQueue.manageTasks(); Assert.assertTrue(task2.isDone()); } @Test public void testShutdownReleasesTaskLock() throws Exception { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); - // Create a Task and add it to the TaskQueue - final TestTask task = new TestTask("t1", Intervals.of("2021-01/P1M")); + final TestTask task = new TestTask("t1", "2021-01/P1M"); taskQueue.add(task); // Acquire a lock for the Task @@ -184,19 +254,7 @@ public void testShutdownReleasesTaskLock() throws Exception @Test public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExistsException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); - final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); + final Task task = new TestTask("t1", "2021-01-01/P1D"); taskQueue.add(task); final List tasks = taskQueue.getTasks(); Assert.assertEquals(1, tasks.size()); @@ -210,9 +268,7 @@ public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExist public void testDefaultTaskContextOverrideDefaultLineageBasedSegmentAllocation() throws EntryExistsException { final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), + final TaskQueue taskQueue = createTaskQueue( new DefaultTaskConfig() { @Override @@ -224,38 +280,24 @@ public Map getContext() ); } }, - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() + new SimpleTaskRunner(actionClientFactory) ); - taskQueue.setActive(true); - final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); + taskQueue.start(); + final Task task = new TestTask("t1", "2021-01-01/P1D"); taskQueue.add(task); final List tasks = taskQueue.getTasks(); Assert.assertEquals(1, tasks.size()); final Task queuedTask = tasks.get(0); Assert.assertFalse( - queuedTask.getContextValue(SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY) + queuedTask.getContextValue( + SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY + ) ); } @Test public void testUserProvidedTaskContextOverrideDefaultLineageBasedSegmentAllocation() throws EntryExistsException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); final Task task = new TestTask( "t1", Intervals.of("2021-01-01/P1D"), @@ -274,31 +316,9 @@ public void testUserProvidedTaskContextOverrideDefaultLineageBasedSegmentAllocat } @Test - public void testLockConfigTakePrecedenceThanDefaultTaskContext() throws EntryExistsException + public void testLockConfigOverridesDefaultTaskContext() throws EntryExistsException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig() - { - @Override - public Map getContext() - { - return ImmutableMap.of( - Tasks.FORCE_TIME_CHUNK_LOCK_KEY, - false - ); - } - }, - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); - final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); + final Task task = new TestTask("t1", "2021-01-01/P1D"); taskQueue.add(task); final List tasks = taskQueue.getTasks(); Assert.assertEquals(1, tasks.size()); @@ -309,25 +329,10 @@ public Map getContext() @Test public void testUserProvidedContextOverrideLockConfig() throws EntryExistsException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); final Task task = new TestTask( "t1", Intervals.of("2021-01-01/P1D"), - ImmutableMap.of( - Tasks.FORCE_TIME_CHUNK_LOCK_KEY, - false - ) + ImmutableMap.of(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, false) ); taskQueue.add(task); final List tasks = taskQueue.getTasks(); @@ -339,19 +344,7 @@ public void testUserProvidedContextOverrideLockConfig() throws EntryExistsExcept @Test public void testTaskStatusWhenExceptionIsThrownInIsReady() throws EntryExistsException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - new SimpleTaskRunner(actionClientFactory), - actionClientFactory, - getLockbox(), - new NoopServiceEmitter() - ); - taskQueue.setActive(true); - final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")) + final Task task = new TestTask("t1", "2021-01-01/P1D") { @Override public boolean isReady(TaskActionClient taskActionClient) @@ -360,134 +353,119 @@ public boolean isReady(TaskActionClient taskActionClient) } }; taskQueue.add(task); - taskQueue.manageInternal(); + taskQueue.manageTasks(); Optional statusOptional = getTaskStorage().getStatus(task.getId()); Assert.assertTrue(statusOptional.isPresent()); - Assert.assertEquals(TaskState.FAILED, statusOptional.get().getStatusCode()); - Assert.assertNotNull(statusOptional.get().getErrorMsg()); + + TaskStatus taskStatus = statusOptional.get(); + Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); + + String errorMsg = taskStatus.getErrorMsg(); + Assert.assertNotNull(errorMsg); Assert.assertTrue( - StringUtils.format("Actual message is: %s", statusOptional.get().getErrorMsg()), - statusOptional.get().getErrorMsg().startsWith("Failed while waiting for the task to be ready to run") + StringUtils.format("Actual message is: %s", errorMsg), + errorMsg.startsWith("Failed while waiting for the task to be ready to run") ); } @Test public void testKilledTasksEmitRuntimeMetricWithHttpRemote() throws EntryExistsException, InterruptedException { - final TaskActionClientFactory actionClientFactory = createActionClientFactory(); final HttpRemoteTaskRunner taskRunner = createHttpRemoteTaskRunner(ImmutableList.of("t1")); - final StubServiceEmitter metricsVerifier = new StubServiceEmitter("druid/overlord", "testHost"); WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class); - EasyMock.expect(workerHolder.getWorker()).andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY)).anyTimes(); + EasyMock.expect(workerHolder.getWorker()) + .andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY)) + .anyTimes(); workerHolder.incrementContinuouslyFailedTasksCount(); EasyMock.expectLastCall(); workerHolder.setLastCompletedTaskTime(EasyMock.anyObject()); EasyMock.expect(workerHolder.getContinuouslyFailedTasksCount()).andReturn(1); EasyMock.replay(workerHolder); - final TaskQueue taskQueue = new TaskQueue( - new TaskLockConfig(), - new TaskQueueConfig(null, null, null, null), - new DefaultTaskConfig(), - getTaskStorage(), - taskRunner, - actionClientFactory, - getLockbox(), - metricsVerifier - ); - taskQueue.setActive(true); + + final TaskQueue taskQueue = createTaskQueue(new DefaultTaskConfig(), taskRunner); + taskQueue.start(); + final Task task = new TestTask( "t1", Intervals.of("2021-01-01/P1D"), - ImmutableMap.of( - Tasks.FORCE_TIME_CHUNK_LOCK_KEY, - false - ) + ImmutableMap.of(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, false) ); taskQueue.add(task); - taskQueue.manageInternal(); - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.running(task.getId()), - TaskLocation.create("worker", 1, 2) - ), workerHolder); + taskQueue.manageTasks(); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.running(task.getId()), + TaskLocation.create("worker", 1, 2) + ), + workerHolder + ); while (!taskRunner.getRunningTasks() - .stream() - .map(TaskRunnerWorkItem::getTaskId) - .collect(Collectors.toList()) - .contains(task.getId())) { + .stream() + .map(TaskRunnerWorkItem::getTaskId) + .collect(Collectors.toList()) + .contains(task.getId())) { Thread.sleep(100); } taskQueue.shutdown(task.getId(), "shutdown"); - taskRunner.taskAddedOrUpdated(TaskAnnouncement.create( - task, - TaskStatus.failure(task.getId(), "shutdown"), - TaskLocation.create("worker", 1, 2) - ), workerHolder); - taskQueue.manageInternal(); + taskRunner.taskAddedOrUpdated( + TaskAnnouncement.create( + task, + TaskStatus.failure(task.getId(), "shutdown"), + TaskLocation.create("worker", 1, 2) + ), + workerHolder + ); + taskQueue.manageTasks(); - metricsVerifier.getEvents(); - metricsVerifier.verifyEmitted("task/run/time", 1); + serviceEmitter.verifyEmitted("task/run/time", 1); } private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List runningTasks) { - HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) - .andReturn(druidNodeDiscovery); + .andReturn(new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery()); EasyMock.replay(druidNodeDiscoveryProvider); TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class); for (String taskId : runningTasks) { - EasyMock.expect(taskStorageMock.getStatus(taskId)).andReturn(Optional.of(TaskStatus.running(taskId))); + EasyMock.expect(taskStorageMock.getStatus(taskId)) + .andReturn(Optional.of(TaskStatus.running(taskId))); } EasyMock.replay(taskStorageMock); HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( TestHelper.makeJsonMapper(), - new HttpRemoteTaskRunnerConfig() - { - @Override - public int getPendingTasksRunnerNumThreads() - { - return 3; - } - }, + new HttpRemoteTaskRunnerConfig(), EasyMock.createNiceMock(HttpClient.class), - DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), + Suppliers.ofInstance(DefaultWorkerBehaviorConfig.defaultConfig()), new NoopProvisioningStrategy<>(), druidNodeDiscoveryProvider, EasyMock.createNiceMock(TaskStorage.class), - EasyMock.createNiceMock(CuratorFramework.class), - new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), - new StubServiceEmitter("druid/overlord", "testHost") + null, + null, + serviceEmitter ); taskRunner.start(); - taskRunner.registerListener( - new TaskRunnerListener() - { - @Override - public String getListenerId() - { - return "test-listener"; - } - - @Override - public void locationChanged(String taskId, TaskLocation newLocation) - { - // do nothing - } + return taskRunner; + } - @Override - public void statusChanged(String taskId, TaskStatus status) - { - // do nothing - } - }, - Execs.directExecutor() + private TaskQueue createTaskQueue( + DefaultTaskConfig defaultTaskConfig, + TaskRunner taskRunner + ) + { + return new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, null, null, null), + defaultTaskConfig, + getTaskStorage(), + taskRunner, + actionClientFactory, + getLockbox(), + serviceEmitter ); - - return taskRunner; } private static class TestTask extends AbstractBatchIndexTask @@ -495,9 +473,9 @@ private static class TestTask extends AbstractBatchIndexTask private final Interval interval; private boolean done; - private TestTask(String id, Interval interval) + private TestTask(String id, String interval) { - this(id, interval, null); + this(id, Intervals.of(interval), null); } private TestTask(String id, Interval interval, Map context) @@ -554,7 +532,7 @@ public boolean isPerfectRollup() @Override public Granularity getSegmentGranularity() { - return SEGMENT_GRANULARITY; + return Granularities.DAY; } @Override