From c94bb1277af108e144498bcab6bd52d8447d3dc4 Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Wed, 27 Aug 2025 23:42:10 -0700 Subject: [PATCH 1/5] Poll from TaskQueue::activeTasks before fetching from DB --- .../overlord/HeapMemoryTaskStorage.java | 15 ++- .../overlord/MetadataTaskStorage.java | 23 +++- .../indexing/overlord/TaskQueryTool.java | 25 +++-- .../druid/indexing/overlord/TaskQueue.java | 100 +++++++++++------- .../druid/indexing/overlord/TaskStorage.java | 11 +- .../org/apache/druid/indexer/TaskInfo.java | 5 + 6 files changed, 130 insertions(+), 49 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index 5109abe2377d..5f01a5fcc56a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -76,7 +76,7 @@ public HeapMemoryTaskStorage(TaskStorageConfig config) } @Override - public void insert(Task task, TaskStatus status) + public TaskInfo insert(Task task, TaskStatus status) { Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(status, "status"); @@ -94,6 +94,7 @@ public void insert(Task task, TaskStatus status) } log.info("Inserted task[%s] with status[%s]", task.getId(), status); + return TaskStuff.toTaskInfo(newTaskStuff); } @Override @@ -159,6 +160,18 @@ public List getActiveTasks() return listBuilder.build(); } + @Override + public List> getActiveTaskInfos() + { + final ImmutableList.Builder> listBuilder = ImmutableList.builder(); + for (final TaskStuff taskStuff : tasks.values()) { + if (taskStuff.getStatus().isRunnable()) { + listBuilder.add(TaskStuff.toTaskInfo(taskStuff)); + } + } + return listBuilder.build(); + } + @Override public List getActiveTasksByDatasource(String datasource) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 683b9e805492..51fffc49d274 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -46,6 +46,7 @@ import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; +import org.joda.time.DateTime; import javax.annotation.Nullable; import java.util.Collections; @@ -111,7 +112,7 @@ public void stop() } @Override - public void insert(final Task task, final TaskStatus status) + public TaskInfo insert(final Task task, final TaskStatus status) { Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(status, "status"); @@ -123,11 +124,12 @@ public void insert(final Task task, final TaskStatus status) ); log.info("Inserting task [%s] with status [%s].", task.getId(), status); + final DateTime insertionTime = DateTimes.nowUtc(); try { handler.insert( task.getId(), - DateTimes.nowUtc(), + insertionTime, task.getDataSource(), task, status.isRunnable(), @@ -142,6 +144,14 @@ public void insert(final Task task, final TaskStatus status) catch (Exception e) { throw new RuntimeException(e); } + + return new TaskInfo<>( + task.getId(), + insertionTime, + status, + task.getDataSource(), + task + ); } @Override @@ -189,6 +199,15 @@ public List getActiveTasks() .collect(Collectors.toList()); } + @Override + public List> getActiveTaskInfos() + { + return handler.getTaskInfos(Collections.singletonMap(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null) + .stream() + .filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) + .collect(Collectors.toList()); + } + @Override public List getActiveTasksByDatasource(String datasource) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index 8e4830611bee..1606c0775fd8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -33,7 +33,6 @@ import org.apache.druid.indexing.overlord.http.TaskStateLookup; import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; @@ -41,7 +40,6 @@ import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; -import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; @@ -106,6 +104,10 @@ public Map> getActiveLocks(List lockFil public List> getActiveTaskInfo(@Nullable String dataSource) { + final Optional taskQueue = taskMaster.getTaskQueue(); + if (taskQueue.isPresent()) { + taskQueue.get().getActiveTasksForDatasource(dataSource); + } return storage.getTaskInfos(TaskLookup.activeTasksOnly(), dataSource); } @@ -147,6 +149,13 @@ public Optional getTaskStatus(final String taskId) @Nullable public TaskInfo getTaskInfo(String taskId) { + final Optional taskQueue = taskMaster.getTaskQueue(); + if (taskQueue.isPresent()) { + final Optional> taskStatus = taskQueue.get().getActiveTaskInfo(taskId); + if (taskStatus.isPresent()) { + return taskStatus.get(); + } + } return storage.getTaskInfo(taskId); } @@ -156,12 +165,10 @@ public List getAllActiveTasks() if (taskQueue.isPresent()) { // Serve active task statuses from memory final List taskStatusPlusList = new ArrayList<>(); + final List> activeTasks = taskQueue.get().getTaskInfos(); - // Use a dummy created time as this is not used by the caller, just needs to be non-null - final DateTime createdTime = DateTimes.nowUtc(); - - final List activeTasks = taskQueue.get().getTasks(); - for (Task task : activeTasks) { + for (TaskInfo taskInfo : activeTasks) { + final Task task = taskInfo.getTask(); final Optional statusOptional = taskQueue.get().getTaskStatus(task.getId()); if (statusOptional.isPresent()) { final TaskStatus status = statusOptional.get(); @@ -170,8 +177,8 @@ public List getAllActiveTasks() task.getId(), task.getGroupId(), task.getType(), - createdTime, - createdTime, + taskInfo.getCreatedTime(), + taskInfo.getCreatedTime(), status.getStatusCode(), null, status.getDuration(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index f18c3b4465e5..b9c56d0218e9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -37,6 +37,7 @@ import org.apache.druid.error.EntryAlreadyExists; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.Counters; @@ -425,7 +426,7 @@ private void startPendingTaskOnRunner(TaskEntry entry, ListenableFuture taskInfo = taskStorage.insert(task, TaskStatus.running(task.getId())); // Note: the TaskEntry created for this task doesn't actually use the `insertTime` timestamp, it uses a new // timestamp created in the ctor. This prevents races from occurring while syncFromStorage() is happening. - addTaskInternal(task, insertTime); + addTaskInternal(taskInfo, insertTime); requestManagement(); return true; } @@ -548,27 +549,32 @@ public boolean add(final Task task) } @GuardedBy("startStopLock") - private void addTaskInternal(final Task task, final DateTime updateTime) + private void addTaskInternal(final TaskInfo taskInfo, final DateTime updateTime) { final AtomicBoolean added = new AtomicBoolean(false); final TaskEntry entry = addOrUpdateTaskEntry( - task.getId(), + taskInfo.getId(), prevEntry -> { if (prevEntry == null) { added.set(true); - return new TaskEntry(task); + return new TaskEntry(taskInfo); } else if (prevEntry.lastUpdatedTime.isBefore(updateTime)) { prevEntry.lastUpdatedTime = updateTime; } + // Ensure we keep the current status up-to-date + if (!prevEntry.taskInfo.getStatus().equals(taskInfo.getStatus())) { + prevEntry.taskInfo = taskInfo; + } + return prevEntry; } ); if (added.get()) { - taskLockbox.add(task); - } else if (!entry.task.equals(task)) { - throw new ISE("Cannot add task[%s] as a different task for the same ID has already been added.", task.getId()); + taskLockbox.add(taskInfo.getTask()); + } else if (!entry.taskInfo.getTask().equals(taskInfo.getTask())) { + throw new ISE("Cannot add task[%s] as a different task for the same ID has already been added.", taskInfo.getId()); } } @@ -584,14 +590,14 @@ private void addTaskInternal(final Task task, final DateTime updateTime) @GuardedBy("startStopLock") private boolean removeTaskInternal(final String taskId, final DateTime deleteTime) { - final AtomicReference removedTask = new AtomicReference<>(); + final AtomicReference> removedTask = new AtomicReference<>(); addOrUpdateTaskEntry( taskId, prevEntry -> { // Remove the task only if it is complete OR it doesn't have a more recent update if (prevEntry != null && (prevEntry.isComplete || prevEntry.lastUpdatedTime.isBefore(deleteTime))) { - removedTask.set(prevEntry.task); + removedTask.set(prevEntry.taskInfo); // Remove this taskId from activeTasks by mapping it to null return null; } @@ -601,7 +607,7 @@ private boolean removeTaskInternal(final String taskId, final DateTime deleteTim ); if (removedTask.get() != null) { - removeTaskLock(removedTask.get()); + removeTaskLock(removedTask.get().getTask()); return true; } return false; @@ -686,7 +692,7 @@ private void notifyStatus(final TaskEntry entry, final TaskStatus taskStatus, St return; } - final Task task = entry.task; + final Task task = entry.taskInfo.getTask(); Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(taskStatus, "status"); Preconditions.checkState(active, "Queue is not active!"); @@ -697,6 +703,9 @@ private void notifyStatus(final TaskEntry entry, final TaskStatus taskStatus, St taskStatus.getId() ); + // Always update the task status associated with this entry + entry.taskInfo = entry.taskInfo.withStatus(taskStatus); + if (!taskStatus.isComplete()) { // Nothing to do for incomplete statuses. return; @@ -835,26 +844,24 @@ void syncFromStorage() try { if (active) { - final Map newTasks = - CollectionUtils.toMap(taskStorage.getActiveTasks(), Task::getId, Function.identity()); - final Map oldTasks = - CollectionUtils.mapValues(activeTasks, entry -> entry.task); + final Map> newTasks = CollectionUtils.toMap(taskStorage.getActiveTaskInfos(), (taskInfo) -> taskInfo.getTask().getId(), Function.identity()); + final Map> oldTasks = CollectionUtils.mapValues(activeTasks, entry -> entry.taskInfo); // Identify the tasks that have been added or removed from the storage - final MapDifference mapDifference = Maps.difference(oldTasks, newTasks); - final Collection addedTasks = mapDifference.entriesOnlyOnRight().values(); - final Collection removedTasks = mapDifference.entriesOnlyOnLeft().values(); + final MapDifference> mapDifference = Maps.difference(oldTasks, newTasks); + final Collection> addedTasks = mapDifference.entriesOnlyOnRight().values(); + final Collection> removedTasks = mapDifference.entriesOnlyOnLeft().values(); // Remove tasks not present in metadata store if their lastUpdatedTime is before syncStartTime int numTasksRemoved = 0; - for (Task task : removedTasks) { + for (TaskInfo task : removedTasks) { if (removeTaskInternal(task.getId(), syncStartTime)) { ++numTasksRemoved; } } // Add new tasks present in metadata store if their lastUpdatedTime is before syncStartTime - for (Task task : addedTasks) { + for (TaskInfo task : addedTasks) { addTaskInternal(task, syncStartTime); } @@ -921,7 +928,7 @@ private Map getCurrentTaskDatasources() { return activeTasks.values().stream() .filter(entry -> !entry.isComplete) - .map(entry -> entry.task) + .map(entry -> entry.taskInfo.getTask()) .collect(Collectors.toMap(Task::getId, TaskQueue::getMetricKey)); } @@ -958,7 +965,7 @@ public Map getWaitingTaskCount() return activeTasks.values().stream() .filter(entry -> !entry.isComplete) - .map(entry -> entry.task) + .map(entry -> entry.taskInfo.getTask()) .filter(task -> !runnerKnownTaskIds.contains(task.getId())) .collect(Collectors.toMap(TaskQueue::getMetricKey, task -> 1L, Long::sum)); } @@ -998,16 +1005,16 @@ public CoordinatorRunStats getQueueStats() */ public Optional getActiveTask(String id) { - final TaskEntry entry = activeTasks.get(id); - if (entry == null) { + final Optional> taskInfo = getActiveTaskInfo(id); + if (!taskInfo.isPresent()) { return Optional.absent(); } - Task task = entry.task; + Task task = taskInfo.get().getTask(); if (task != null) { try { // Write and read the value using a mapper with password redaction mixin. - task = passwordRedactingMapper.readValue(passwordRedactingMapper.writeValueAsString(entry.task), Task.class); + task = passwordRedactingMapper.readValue(passwordRedactingMapper.writeValueAsString(task), Task.class); } catch (JsonProcessingException e) { log.error(e, "Failed to serialize or deserialize task with id [%s].", task.getId()); @@ -1020,12 +1027,33 @@ public Optional getActiveTask(String id) } /** - * List of all active and completed tasks currently being managed by this - * TaskQueue. + * Returns an optional TaskInfo + * @param taskId + * @return + */ + public Optional> getActiveTaskInfo(String taskId) + { + final TaskEntry entry = activeTasks.get(taskId); + if (entry == null) { + return Optional.absent(); + } + return Optional.of(entry.taskInfo); + } + + /** + * List of all active and completed tasks currently being managed by this TaskQueue. + */ + public List> getTaskInfos() + { + return activeTasks.values().stream().map(entry -> entry.taskInfo).collect(Collectors.toList()); + } + + /** + * List of all active and completed tasks currently being managed by this TaskQueue. */ public List getTasks() { - return activeTasks.values().stream().map(entry -> entry.task).collect(Collectors.toList()); + return activeTasks.values().stream().map(entry -> entry.taskInfo.getTask()).collect(Collectors.toList()); } /** @@ -1035,9 +1063,9 @@ public Map getActiveTasksForDatasource(String datasource) { return activeTasks.values().stream().filter( entry -> !entry.isComplete - && entry.task.getDataSource().equals(datasource) + && entry.taskInfo.getDataSource().equals(datasource) ).map( - entry -> entry.task + entry -> entry.taskInfo.getTask() ).collect( Collectors.toMap(Task::getId, Function.identity()) ); @@ -1140,15 +1168,15 @@ private void removeTaskLock(Task task) */ static class TaskEntry { - private final Task task; + private TaskInfo taskInfo; private DateTime lastUpdatedTime; private ListenableFuture future = null; private boolean isComplete = false; - TaskEntry(Task task) + TaskEntry(TaskInfo taskInfo) { - this.task = task; + this.taskInfo = taskInfo; this.lastUpdatedTime = DateTimes.nowUtc(); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index b231b3f37c28..bbe507087823 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -40,8 +40,9 @@ public interface TaskStorage * * @param task task to add * @param status task status + * @return A TaskInfo object representing the task information that was committed */ - void insert(Task task, TaskStatus status); + TaskInfo insert(Task task, TaskStatus status); /** * Persists task status in the storage facility. This method should throw an exception if the task status lifecycle @@ -117,6 +118,14 @@ public interface TaskStorage */ List getActiveTasks(); + /** + * Returns a list of currently running or pending tasks as stored in the storage facility. No particular order + * is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. + * + * @return list of active tasks + */ + List> getActiveTaskInfos(); + /** * Returns a list of currently running or pending tasks as stored in the storage facility. No particular order * is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. diff --git a/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java b/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java index aa9aa1097db5..2f848381c654 100644 --- a/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java +++ b/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java @@ -76,4 +76,9 @@ public EntryType getTask() { return task; } + + public TaskInfo withStatus(StatusType newStatus) + { + return new TaskInfo<>(id, createdTime, newStatus, dataSource, task); + } } From 0b5aced5d9507171fc9ed0eaf77fc592e358da0a Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Thu, 28 Aug 2025 00:25:15 -0700 Subject: [PATCH 2/5] Nits --- .../indexing/overlord/TaskQueryTool.java | 4 --- .../druid/indexing/overlord/TaskQueue.java | 18 ++++++------ .../druid/indexing/overlord/TaskStorage.java | 6 ++-- .../overlord/http/OverlordResourceTest.java | 29 +++++++++++-------- .../org/apache/druid/indexer/TaskInfo.java | 7 ++++- 5 files changed, 35 insertions(+), 29 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index 1606c0775fd8..d33c0a2769b7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -104,10 +104,6 @@ public Map> getActiveLocks(List lockFil public List> getActiveTaskInfo(@Nullable String dataSource) { - final Optional taskQueue = taskMaster.getTaskQueue(); - if (taskQueue.isPresent()) { - taskQueue.get().getActiveTasksForDatasource(dataSource); - } return storage.getTaskInfos(TaskLookup.activeTasksOnly(), dataSource); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index b9c56d0218e9..3b6515ca2760 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -251,6 +251,7 @@ public void start() } } } + } ); ScheduledExecutors.scheduleAtFixedRate( @@ -574,7 +575,7 @@ private void addTaskInternal(final TaskInfo taskInfo, final Da if (added.get()) { taskLockbox.add(taskInfo.getTask()); } else if (!entry.taskInfo.getTask().equals(taskInfo.getTask())) { - throw new ISE("Cannot add task[%s] as a different task for the same ID has already been added.", taskInfo.getId()); + throw new ISE("Cannot add task[%s] as a different task for the same ID has already been added.", taskInfo.getTask().getId()); } } @@ -703,9 +704,6 @@ private void notifyStatus(final TaskEntry entry, final TaskStatus taskStatus, St taskStatus.getId() ); - // Always update the task status associated with this entry - entry.taskInfo = entry.taskInfo.withStatus(taskStatus); - if (!taskStatus.isComplete()) { // Nothing to do for incomplete statuses. return; @@ -717,6 +715,8 @@ private void notifyStatus(final TaskEntry entry, final TaskStatus taskStatus, St // Mark this task as complete, so it isn't managed while being cleaned up. entry.isComplete = true; + // Update the task status associated with this entry + entry.taskInfo = entry.taskInfo.withNewStatus(taskStatus); final TaskLocation taskLocation = taskRunner.getTaskLocation(task.getId()); @@ -1027,9 +1027,9 @@ public Optional getActiveTask(String id) } /** - * Returns an optional TaskInfo + * Polls {@link #activeTasks} for the task with the corresponding {@code taskId} * @param taskId - * @return + * @return an optional TaskInfo */ public Optional> getActiveTaskInfo(String taskId) { @@ -1041,7 +1041,7 @@ public Optional> getActiveTaskInfo(String taskId) } /** - * List of all active and completed tasks currently being managed by this TaskQueue. + * List of all active and completed task infos currently being managed by this TaskQueue. */ public List> getTaskInfos() { @@ -1053,11 +1053,11 @@ public List> getTaskInfos() */ public List getTasks() { - return activeTasks.values().stream().map(entry -> entry.taskInfo.getTask()).collect(Collectors.toList()); + return getTaskInfos().stream().map(TaskInfo::getTask).collect(Collectors.toList()); } /** - * Returns the list of currently active tasks for the given datasource. + * Returns a map of currently active tasks for the given datasource. */ public Map getActiveTasksForDatasource(String datasource) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index bbe507087823..32f1a9372cb1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -40,7 +40,7 @@ public interface TaskStorage * * @param task task to add * @param status task status - * @return A TaskInfo object representing the task information that was committed + * @return a TaskInfo object representing the task information that was committed to the storage facility */ TaskInfo insert(Task task, TaskStatus status); @@ -119,10 +119,10 @@ public interface TaskStorage List getActiveTasks(); /** - * Returns a list of currently running or pending tasks as stored in the storage facility. No particular order + * Returns a list of currently running or pending task infos as stored in the storage facility. No particular order * is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. * - * @return list of active tasks + * @return list of active task infos */ List> getActiveTaskInfos(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index afa67d41bea7..6e63bdaa0bbc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -131,7 +131,7 @@ public void setUp() provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class); authConfig = EasyMock.createMock(AuthConfig.class); overlord = EasyMock.createStrictMock(DruidOverlord.class); - taskMaster = EasyMock.createStrictMock(TaskMaster.class); + taskMaster = EasyMock.createMock(TaskMaster.class); taskStorage = EasyMock.createStrictMock(TaskStorage.class); taskLockbox = EasyMock.createStrictMock(GlobalTaskLockbox.class); taskQueryTool = new TaskQueryTool( @@ -999,21 +999,26 @@ public void testGetTaskStatus() throws Exception final Task task = NoopTask.create(); final String taskId = task.getId(); final TaskStatus status = TaskStatus.running(taskId); + final TaskInfo taskInfo = new TaskInfo<>( + task.getId(), + DateTimes.of("2018-01-01"), + status, + task.getDataSource(), + task + ); - EasyMock.expect(taskQueryTool.getTaskInfo(taskId)) - .andReturn(new TaskInfo( - task.getId(), - DateTimes.of("2018-01-01"), - status, - task.getDataSource(), - task - )); + // Simulate in-memory queue for noop task + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); - EasyMock.expect(taskQueryTool.getTaskInfo("othertask")) - .andReturn(null); + EasyMock.expect(taskQueue.getActiveTaskInfo(taskId)).andReturn(Optional.of(taskInfo)); EasyMock.>expect(taskRunner.getKnownTasks()) - .andReturn(ImmutableList.of()); + .andReturn(ImmutableList.of(new MockTaskRunnerWorkItem(taskId))).anyTimes(); + EasyMock.expect(taskRunner.getRunnerTaskState(taskId)).andReturn(RunnerTaskState.RUNNING); + + // Simulate task storage fetch for "othertask" + EasyMock.expect(taskQueue.getActiveTaskInfo("othertask")).andReturn(Optional.absent()); + EasyMock.expect(taskStorage.getTaskInfo("othertask")).andReturn(null); replayAll(); diff --git a/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java b/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java index 2f848381c654..31a7c76b2e1d 100644 --- a/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java +++ b/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java @@ -77,7 +77,12 @@ public EntryType getTask() return task; } - public TaskInfo withStatus(StatusType newStatus) + /** + * Returns a copy of this TaskInfo object with a new StatusType + * @param newStatus + * @return a new TaskInfo + */ + public TaskInfo withNewStatus(StatusType newStatus) { return new TaskInfo<>(id, createdTime, newStatus, dataSource, task); } From abfab8ce5695927d932af1e1c431af8d38d01f0e Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Fri, 29 Aug 2025 00:55:02 -0700 Subject: [PATCH 3/5] Address comments --- .../overlord/MetadataTaskStorage.java | 4 +-- .../druid/indexing/overlord/TaskQueue.java | 32 +++++++++++-------- .../overlord/http/OverlordResourceTest.java | 20 ++++++------ .../org/apache/druid/indexer/TaskInfo.java | 8 ++--- 4 files changed, 35 insertions(+), 29 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 51fffc49d274..aceb48192e4c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -192,7 +192,7 @@ public List getActiveTasks() { // filter out taskInfo with a null 'task' which should only happen in practice if we are missing a jackson module // and don't know what to do with the payload, so we won't be able to make use of it anyway - return handler.getTaskInfos(Collections.singletonMap(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null) + return handler.getTaskInfos(Map.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null) .stream() .filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) .map(TaskInfo::getTask) @@ -202,7 +202,7 @@ public List getActiveTasks() @Override public List> getActiveTaskInfos() { - return handler.getTaskInfos(Collections.singletonMap(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null) + return handler.getTaskInfos(Map.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null) .stream() .filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) .collect(Collectors.toList()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 3b6515ca2760..68a38596517c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -251,7 +251,6 @@ public void start() } } } - } ); ScheduledExecutors.scheduleAtFixedRate( @@ -560,12 +559,8 @@ private void addTaskInternal(final TaskInfo taskInfo, final Da added.set(true); return new TaskEntry(taskInfo); } else if (prevEntry.lastUpdatedTime.isBefore(updateTime)) { - prevEntry.lastUpdatedTime = updateTime; - } - - // Ensure we keep the current status up-to-date - if (!prevEntry.taskInfo.getStatus().equals(taskInfo.getStatus())) { - prevEntry.taskInfo = taskInfo; + // Ensure we keep the current status up-to-date + prevEntry.updateStatus(taskInfo.getStatus(), updateTime); } return prevEntry; @@ -716,7 +711,7 @@ private void notifyStatus(final TaskEntry entry, final TaskStatus taskStatus, St // Mark this task as complete, so it isn't managed while being cleaned up. entry.isComplete = true; // Update the task status associated with this entry - entry.taskInfo = entry.taskInfo.withNewStatus(taskStatus); + entry.taskInfo = entry.taskInfo.withStatus(taskStatus); final TaskLocation taskLocation = taskRunner.getTaskLocation(task.getId()); @@ -844,8 +839,10 @@ void syncFromStorage() try { if (active) { - final Map> newTasks = CollectionUtils.toMap(taskStorage.getActiveTaskInfos(), (taskInfo) -> taskInfo.getTask().getId(), Function.identity()); - final Map> oldTasks = CollectionUtils.mapValues(activeTasks, entry -> entry.taskInfo); + final Map> newTasks = + CollectionUtils.toMap(taskStorage.getActiveTaskInfos(), (taskInfo) -> taskInfo.getTask().getId(), Function.identity()); + final Map> oldTasks = + CollectionUtils.mapValues(activeTasks, entry -> entry.taskInfo); // Identify the tasks that have been added or removed from the storage final MapDifference> mapDifference = Maps.difference(oldTasks, newTasks); @@ -1027,9 +1024,8 @@ public Optional getActiveTask(String id) } /** - * Polls {@link #activeTasks} for the task with the corresponding {@code taskId} - * @param taskId - * @return an optional TaskInfo + * Gets the {@link TaskInfo} for the given {@code taskId} from {@link #activeTasks} if present, + * otherwise returns an empty optional. */ public Optional> getActiveTaskInfo(String taskId) { @@ -1179,6 +1175,16 @@ static class TaskEntry this.taskInfo = taskInfo; this.lastUpdatedTime = DateTimes.nowUtc(); } + + /** + * Updates the {@link TaskStatus} for the task associated with this {@link TaskEntry} and sets the corresponding + * update time. + */ + public void updateStatus(TaskStatus status, DateTime updateTime) + { + this.taskInfo.withStatus(status); + this.lastUpdatedTime = updateTime; + } } private static RowKey getMetricKey(final Task task) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 6e63bdaa0bbc..a6988feeeb1f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -131,7 +131,7 @@ public void setUp() provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class); authConfig = EasyMock.createMock(AuthConfig.class); overlord = EasyMock.createStrictMock(DruidOverlord.class); - taskMaster = EasyMock.createMock(TaskMaster.class); + taskMaster = EasyMock.createStrictMock(TaskMaster.class); taskStorage = EasyMock.createStrictMock(TaskStorage.class); taskLockbox = EasyMock.createStrictMock(GlobalTaskLockbox.class); taskQueryTool = new TaskQueryTool( @@ -1007,18 +1007,20 @@ public void testGetTaskStatus() throws Exception task ); - // Simulate in-memory queue for noop task - EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); - - EasyMock.expect(taskQueue.getActiveTaskInfo(taskId)).andReturn(Optional.of(taskInfo)); + // For noop, simulate in-memory hit + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).once(); + EasyMock.expect(taskQueue.getActiveTaskInfo(taskId)).andReturn(Optional.of(taskInfo)).once(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).once(); EasyMock.>expect(taskRunner.getKnownTasks()) .andReturn(ImmutableList.of(new MockTaskRunnerWorkItem(taskId))).anyTimes(); - EasyMock.expect(taskRunner.getRunnerTaskState(taskId)).andReturn(RunnerTaskState.RUNNING); + EasyMock.expect(taskRunner.getRunnerTaskState(taskId)).andReturn(RunnerTaskState.RUNNING).once(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).once(); - // Simulate task storage fetch for "othertask" - EasyMock.expect(taskQueue.getActiveTaskInfo("othertask")).andReturn(Optional.absent()); - EasyMock.expect(taskStorage.getTaskInfo("othertask")).andReturn(null); + // For "othertask", simulate in-memory miss, then task storage read + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).once(); + EasyMock.expect(taskQueue.getActiveTaskInfo("othertask")).andReturn(Optional.absent()).once(); + EasyMock.expect(taskStorage.getTaskInfo("othertask")).andReturn(null).once(); replayAll(); diff --git a/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java b/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java index 31a7c76b2e1d..2616f9daf02b 100644 --- a/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java +++ b/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java @@ -78,12 +78,10 @@ public EntryType getTask() } /** - * Returns a copy of this TaskInfo object with a new StatusType - * @param newStatus - * @return a new TaskInfo + * Returns a copy of this TaskInfo object with the given status. */ - public TaskInfo withNewStatus(StatusType newStatus) + public TaskInfo withStatus(StatusType status) { - return new TaskInfo<>(id, createdTime, newStatus, dataSource, task); + return new TaskInfo<>(id, createdTime, status, dataSource, task); } } From 3e2af110a40d5b602bac961e6098cedb7fcef589 Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Fri, 29 Aug 2025 01:34:46 -0700 Subject: [PATCH 4/5] Add TaskEntry::getTask() --- .../druid/indexing/overlord/TaskQueue.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 68a38596517c..b82dccdf1a7b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -426,7 +426,7 @@ private void startPendingTaskOnRunner(TaskEntry entry, ListenableFuture taskInfo, final Da if (added.get()) { taskLockbox.add(taskInfo.getTask()); - } else if (!entry.taskInfo.getTask().equals(taskInfo.getTask())) { + } else if (!entry.getTask().equals(taskInfo.getTask())) { throw new ISE("Cannot add task[%s] as a different task for the same ID has already been added.", taskInfo.getTask().getId()); } } @@ -688,7 +688,7 @@ private void notifyStatus(final TaskEntry entry, final TaskStatus taskStatus, St return; } - final Task task = entry.taskInfo.getTask(); + final Task task = entry.getTask(); Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(taskStatus, "status"); Preconditions.checkState(active, "Queue is not active!"); @@ -925,7 +925,7 @@ private Map getCurrentTaskDatasources() { return activeTasks.values().stream() .filter(entry -> !entry.isComplete) - .map(entry -> entry.taskInfo.getTask()) + .map(entry -> entry.getTask()) .collect(Collectors.toMap(Task::getId, TaskQueue::getMetricKey)); } @@ -962,7 +962,7 @@ public Map getWaitingTaskCount() return activeTasks.values().stream() .filter(entry -> !entry.isComplete) - .map(entry -> entry.taskInfo.getTask()) + .map(entry -> entry.getTask()) .filter(task -> !runnerKnownTaskIds.contains(task.getId())) .collect(Collectors.toMap(TaskQueue::getMetricKey, task -> 1L, Long::sum)); } @@ -1061,7 +1061,7 @@ public Map getActiveTasksForDatasource(String datasource) entry -> !entry.isComplete && entry.taskInfo.getDataSource().equals(datasource) ).map( - entry -> entry.taskInfo.getTask() + entry -> entry.getTask() ).collect( Collectors.toMap(Task::getId, Function.identity()) ); @@ -1176,6 +1176,14 @@ static class TaskEntry this.lastUpdatedTime = DateTimes.nowUtc(); } + /** + * Returns the task associated with this {@link TaskEntry} + */ + public Task getTask() + { + return taskInfo.getTask(); + } + /** * Updates the {@link TaskStatus} for the task associated with this {@link TaskEntry} and sets the corresponding * update time. From e3177e4f0e77446632d45045a4456f5c6a133803 Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Fri, 29 Aug 2025 23:42:20 -0700 Subject: [PATCH 5/5] Updates + more tests --- .../overlord/MetadataTaskStorage.java | 8 +-- .../druid/indexing/overlord/TaskQueue.java | 36 +++------- .../indexing/overlord/TaskQueueTest.java | 66 +++++++++++++++++++ 3 files changed, 79 insertions(+), 31 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index aceb48192e4c..ae78bc2c3ff1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -192,11 +192,9 @@ public List getActiveTasks() { // filter out taskInfo with a null 'task' which should only happen in practice if we are missing a jackson module // and don't know what to do with the payload, so we won't be able to make use of it anyway - return handler.getTaskInfos(Map.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null) - .stream() - .filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) - .map(TaskInfo::getTask) - .collect(Collectors.toList()); + return getActiveTaskInfos().stream() + .map(TaskInfo::getTask) + .collect(Collectors.toList()); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index b82dccdf1a7b..d7024eabfe5d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -559,7 +559,6 @@ private void addTaskInternal(final TaskInfo taskInfo, final Da added.set(true); return new TaskEntry(taskInfo); } else if (prevEntry.lastUpdatedTime.isBefore(updateTime)) { - // Ensure we keep the current status up-to-date prevEntry.updateStatus(taskInfo.getStatus(), updateTime); } @@ -570,7 +569,7 @@ private void addTaskInternal(final TaskInfo taskInfo, final Da if (added.get()) { taskLockbox.add(taskInfo.getTask()); } else if (!entry.getTask().equals(taskInfo.getTask())) { - throw new ISE("Cannot add task[%s] as a different task for the same ID has already been added.", taskInfo.getTask().getId()); + throw new ISE("Cannot add task[%s] as a different task for the same ID has already been added.", taskInfo.getId()); } } @@ -710,8 +709,7 @@ private void notifyStatus(final TaskEntry entry, final TaskStatus taskStatus, St // Mark this task as complete, so it isn't managed while being cleaned up. entry.isComplete = true; - // Update the task status associated with this entry - entry.taskInfo = entry.taskInfo.withStatus(taskStatus); + entry.updateStatus(taskStatus, DateTimes.nowUtc()); final TaskLocation taskLocation = taskRunner.getTaskLocation(task.getId()); @@ -840,7 +838,7 @@ void syncFromStorage() try { if (active) { final Map> newTasks = - CollectionUtils.toMap(taskStorage.getActiveTaskInfos(), (taskInfo) -> taskInfo.getTask().getId(), Function.identity()); + CollectionUtils.toMap(taskStorage.getActiveTaskInfos(), TaskInfo::getId, Function.identity()); final Map> oldTasks = CollectionUtils.mapValues(activeTasks, entry -> entry.taskInfo); @@ -880,15 +878,6 @@ void syncFromStorage() } } - private static Map toTaskIDMap(List taskList) - { - Map rv = new HashMap<>(); - for (Task task : taskList) { - rv.put(task.getId(), task); - } - return rv; - } - private Map getDeltaValues(Map total, Map prev) { final Map deltaValues = new HashMap<>(); @@ -925,7 +914,7 @@ private Map getCurrentTaskDatasources() { return activeTasks.values().stream() .filter(entry -> !entry.isComplete) - .map(entry -> entry.getTask()) + .map(TaskEntry::getTask) .collect(Collectors.toMap(Task::getId, TaskQueue::getMetricKey)); } @@ -962,7 +951,7 @@ public Map getWaitingTaskCount() return activeTasks.values().stream() .filter(entry -> !entry.isComplete) - .map(entry -> entry.getTask()) + .map(TaskEntry::getTask) .filter(task -> !runnerKnownTaskIds.contains(task.getId())) .collect(Collectors.toMap(TaskQueue::getMetricKey, task -> 1L, Long::sum)); } @@ -1030,10 +1019,7 @@ public Optional getActiveTask(String id) public Optional> getActiveTaskInfo(String taskId) { final TaskEntry entry = activeTasks.get(taskId); - if (entry == null) { - return Optional.absent(); - } - return Optional.of(entry.taskInfo); + return entry == null ? Optional.absent() : Optional.of(entry.taskInfo); } /** @@ -1060,9 +1046,7 @@ public Map getActiveTasksForDatasource(String datasource) return activeTasks.values().stream().filter( entry -> !entry.isComplete && entry.taskInfo.getDataSource().equals(datasource) - ).map( - entry -> entry.getTask() - ).collect( + ).map(TaskEntry::getTask).collect( Collectors.toMap(Task::getId, Function.identity()) ); } @@ -1179,7 +1163,7 @@ static class TaskEntry /** * Returns the task associated with this {@link TaskEntry} */ - public Task getTask() + Task getTask() { return taskInfo.getTask(); } @@ -1188,9 +1172,9 @@ public Task getTask() * Updates the {@link TaskStatus} for the task associated with this {@link TaskEntry} and sets the corresponding * update time. */ - public void updateStatus(TaskStatus status, DateTime updateTime) + void updateStatus(TaskStatus status, DateTime updateTime) { - this.taskInfo.withStatus(status); + this.taskInfo = this.taskInfo.withStatus(status); this.lastUpdatedTime = updateTime; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index d03a5c8c2cf9..0cb35ba39175 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -38,6 +38,7 @@ import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.error.DruidException; import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -624,6 +625,71 @@ public void testGetActiveTaskRedactsPassword() throws JsonProcessingException Assert.assertEquals(taskInStorageAsString, taskInQueueAsString); } + @Test + public void testTaskShutdownUpdatesTaskStatusInTaskQueue() + { + final String shutdownReason = "Test shutdown reason"; + final TaskStatus shutdownStatus = TaskStatus.failure("shutdown-test-task", shutdownReason); + final TestTask task = new TestTask("shutdown-test-task", Intervals.of("2021-01-01/P1D")); + taskQueue.add(task); + + final Optional> activeInfoOpt = taskQueue.getActiveTaskInfo(task.getId()); + Assert.assertTrue(activeInfoOpt.isPresent()); + Assert.assertEquals(TaskState.RUNNING, activeInfoOpt.get().getStatus().getStatusCode()); + + taskQueue.shutdown(task.getId(), shutdownReason); + + final Optional> afterShutdownInfoOpt = taskQueue.getActiveTaskInfo(task.getId()); + Assert.assertTrue(afterShutdownInfoOpt.isPresent()); + Assert.assertEquals(shutdownStatus, afterShutdownInfoOpt.get().getStatus()); + Assert.assertEquals(shutdownStatus, getTaskStorage().getStatus(task.getId()).get()); + } + + @Test + public void testTaskSuccessUpdatesTaskStatusInTaskQueue() throws Exception + { + final TaskStatus successStatus = TaskStatus.success("success-test-task"); + final TestTask task = new TestTask("success-test-task", Intervals.of("2021-01-01/P1D")); + taskQueue.add(task); + taskQueue.manageQueuedTasks(); + + // ensure success callback has fired + Thread.sleep(100); + Assert.assertTrue(task.isDone()); + + final Optional> activeInfoOpt = taskQueue.getActiveTaskInfo(task.getId()); + Assert.assertTrue(activeInfoOpt.isPresent()); + Assert.assertEquals(successStatus, activeInfoOpt.get().getStatus()); + Assert.assertEquals(successStatus, getTaskStorage().getStatus(task.getId()).get()); + } + + @Test + public void testTaskFailureUpdatesTaskStatusInTaskQueue() throws Exception + { + final TaskStatus failedStatus = TaskStatus.failure("failure-test-task", "error"); + final TestTask task = new TestTask("failure-test-task", Intervals.of("2021-01-01/P1D")) + { + @Override + public TaskStatus runTask(TaskToolbox toolbox) + { + super.done = true; + return failedStatus; + } + }; + + taskQueue.add(task); + taskQueue.manageQueuedTasks(); + + // ensure failed callback has fired + Thread.sleep(100); + Assert.assertTrue(task.isDone()); + + final Optional> activeInfoOpt = taskQueue.getActiveTaskInfo(task.getId()); + Assert.assertTrue(activeInfoOpt.isPresent()); + Assert.assertEquals(failedStatus, activeInfoOpt.get().getStatus()); + Assert.assertEquals(failedStatus, getTaskStorage().getStatus(task.getId()).get()); + } + private HttpRemoteTaskRunner createHttpRemoteTaskRunner() { final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider