diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index 5109abe2377d..5f01a5fcc56a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -76,7 +76,7 @@ public HeapMemoryTaskStorage(TaskStorageConfig config) } @Override - public void insert(Task task, TaskStatus status) + public TaskInfo insert(Task task, TaskStatus status) { Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(status, "status"); @@ -94,6 +94,7 @@ public void insert(Task task, TaskStatus status) } log.info("Inserted task[%s] with status[%s]", task.getId(), status); + return TaskStuff.toTaskInfo(newTaskStuff); } @Override @@ -159,6 +160,18 @@ public List getActiveTasks() return listBuilder.build(); } + @Override + public List> getActiveTaskInfos() + { + final ImmutableList.Builder> listBuilder = ImmutableList.builder(); + for (final TaskStuff taskStuff : tasks.values()) { + if (taskStuff.getStatus().isRunnable()) { + listBuilder.add(TaskStuff.toTaskInfo(taskStuff)); + } + } + return listBuilder.build(); + } + @Override public List getActiveTasksByDatasource(String datasource) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 683b9e805492..ae78bc2c3ff1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -46,6 +46,7 @@ import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; +import org.joda.time.DateTime; import javax.annotation.Nullable; import java.util.Collections; @@ -111,7 +112,7 @@ public void stop() } @Override - public void insert(final Task task, final TaskStatus status) + public TaskInfo insert(final Task task, final TaskStatus status) { Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(status, "status"); @@ -123,11 +124,12 @@ public void insert(final Task task, final TaskStatus status) ); log.info("Inserting task [%s] with status [%s].", task.getId(), status); + final DateTime insertionTime = DateTimes.nowUtc(); try { handler.insert( task.getId(), - DateTimes.nowUtc(), + insertionTime, task.getDataSource(), task, status.isRunnable(), @@ -142,6 +144,14 @@ public void insert(final Task task, final TaskStatus status) catch (Exception e) { throw new RuntimeException(e); } + + return new TaskInfo<>( + task.getId(), + insertionTime, + status, + task.getDataSource(), + task + ); } @Override @@ -182,10 +192,17 @@ public List getActiveTasks() { // filter out taskInfo with a null 'task' which should only happen in practice if we are missing a jackson module // and don't know what to do with the payload, so we won't be able to make use of it anyway - return handler.getTaskInfos(Collections.singletonMap(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null) + return getActiveTaskInfos().stream() + .map(TaskInfo::getTask) + .collect(Collectors.toList()); + } + + @Override + public List> getActiveTaskInfos() + { + return handler.getTaskInfos(Map.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null) .stream() .filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) - .map(TaskInfo::getTask) .collect(Collectors.toList()); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index 8e4830611bee..d33c0a2769b7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -33,7 +33,6 @@ import org.apache.druid.indexing.overlord.http.TaskStateLookup; import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; @@ -41,7 +40,6 @@ import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; -import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; @@ -147,6 +145,13 @@ public Optional getTaskStatus(final String taskId) @Nullable public TaskInfo getTaskInfo(String taskId) { + final Optional taskQueue = taskMaster.getTaskQueue(); + if (taskQueue.isPresent()) { + final Optional> taskStatus = taskQueue.get().getActiveTaskInfo(taskId); + if (taskStatus.isPresent()) { + return taskStatus.get(); + } + } return storage.getTaskInfo(taskId); } @@ -156,12 +161,10 @@ public List getAllActiveTasks() if (taskQueue.isPresent()) { // Serve active task statuses from memory final List taskStatusPlusList = new ArrayList<>(); + final List> activeTasks = taskQueue.get().getTaskInfos(); - // Use a dummy created time as this is not used by the caller, just needs to be non-null - final DateTime createdTime = DateTimes.nowUtc(); - - final List activeTasks = taskQueue.get().getTasks(); - for (Task task : activeTasks) { + for (TaskInfo taskInfo : activeTasks) { + final Task task = taskInfo.getTask(); final Optional statusOptional = taskQueue.get().getTaskStatus(task.getId()); if (statusOptional.isPresent()) { final TaskStatus status = statusOptional.get(); @@ -170,8 +173,8 @@ public List getAllActiveTasks() task.getId(), task.getGroupId(), task.getType(), - createdTime, - createdTime, + taskInfo.getCreatedTime(), + taskInfo.getCreatedTime(), status.getStatusCode(), null, status.getDuration(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index f18c3b4465e5..d7024eabfe5d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -37,6 +37,7 @@ import org.apache.druid.error.EntryAlreadyExists; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.Counters; @@ -425,7 +426,7 @@ private void startPendingTaskOnRunner(TaskEntry entry, ListenableFuture taskInfo = taskStorage.insert(task, TaskStatus.running(task.getId())); // Note: the TaskEntry created for this task doesn't actually use the `insertTime` timestamp, it uses a new // timestamp created in the ctor. This prevents races from occurring while syncFromStorage() is happening. - addTaskInternal(task, insertTime); + addTaskInternal(taskInfo, insertTime); requestManagement(); return true; } @@ -548,17 +549,17 @@ public boolean add(final Task task) } @GuardedBy("startStopLock") - private void addTaskInternal(final Task task, final DateTime updateTime) + private void addTaskInternal(final TaskInfo taskInfo, final DateTime updateTime) { final AtomicBoolean added = new AtomicBoolean(false); final TaskEntry entry = addOrUpdateTaskEntry( - task.getId(), + taskInfo.getId(), prevEntry -> { if (prevEntry == null) { added.set(true); - return new TaskEntry(task); + return new TaskEntry(taskInfo); } else if (prevEntry.lastUpdatedTime.isBefore(updateTime)) { - prevEntry.lastUpdatedTime = updateTime; + prevEntry.updateStatus(taskInfo.getStatus(), updateTime); } return prevEntry; @@ -566,9 +567,9 @@ private void addTaskInternal(final Task task, final DateTime updateTime) ); if (added.get()) { - taskLockbox.add(task); - } else if (!entry.task.equals(task)) { - throw new ISE("Cannot add task[%s] as a different task for the same ID has already been added.", task.getId()); + taskLockbox.add(taskInfo.getTask()); + } else if (!entry.getTask().equals(taskInfo.getTask())) { + throw new ISE("Cannot add task[%s] as a different task for the same ID has already been added.", taskInfo.getId()); } } @@ -584,14 +585,14 @@ private void addTaskInternal(final Task task, final DateTime updateTime) @GuardedBy("startStopLock") private boolean removeTaskInternal(final String taskId, final DateTime deleteTime) { - final AtomicReference removedTask = new AtomicReference<>(); + final AtomicReference> removedTask = new AtomicReference<>(); addOrUpdateTaskEntry( taskId, prevEntry -> { // Remove the task only if it is complete OR it doesn't have a more recent update if (prevEntry != null && (prevEntry.isComplete || prevEntry.lastUpdatedTime.isBefore(deleteTime))) { - removedTask.set(prevEntry.task); + removedTask.set(prevEntry.taskInfo); // Remove this taskId from activeTasks by mapping it to null return null; } @@ -601,7 +602,7 @@ private boolean removeTaskInternal(final String taskId, final DateTime deleteTim ); if (removedTask.get() != null) { - removeTaskLock(removedTask.get()); + removeTaskLock(removedTask.get().getTask()); return true; } return false; @@ -686,7 +687,7 @@ private void notifyStatus(final TaskEntry entry, final TaskStatus taskStatus, St return; } - final Task task = entry.task; + final Task task = entry.getTask(); Preconditions.checkNotNull(task, "task"); Preconditions.checkNotNull(taskStatus, "status"); Preconditions.checkState(active, "Queue is not active!"); @@ -708,6 +709,7 @@ private void notifyStatus(final TaskEntry entry, final TaskStatus taskStatus, St // Mark this task as complete, so it isn't managed while being cleaned up. entry.isComplete = true; + entry.updateStatus(taskStatus, DateTimes.nowUtc()); final TaskLocation taskLocation = taskRunner.getTaskLocation(task.getId()); @@ -835,26 +837,26 @@ void syncFromStorage() try { if (active) { - final Map newTasks = - CollectionUtils.toMap(taskStorage.getActiveTasks(), Task::getId, Function.identity()); - final Map oldTasks = - CollectionUtils.mapValues(activeTasks, entry -> entry.task); + final Map> newTasks = + CollectionUtils.toMap(taskStorage.getActiveTaskInfos(), TaskInfo::getId, Function.identity()); + final Map> oldTasks = + CollectionUtils.mapValues(activeTasks, entry -> entry.taskInfo); // Identify the tasks that have been added or removed from the storage - final MapDifference mapDifference = Maps.difference(oldTasks, newTasks); - final Collection addedTasks = mapDifference.entriesOnlyOnRight().values(); - final Collection removedTasks = mapDifference.entriesOnlyOnLeft().values(); + final MapDifference> mapDifference = Maps.difference(oldTasks, newTasks); + final Collection> addedTasks = mapDifference.entriesOnlyOnRight().values(); + final Collection> removedTasks = mapDifference.entriesOnlyOnLeft().values(); // Remove tasks not present in metadata store if their lastUpdatedTime is before syncStartTime int numTasksRemoved = 0; - for (Task task : removedTasks) { + for (TaskInfo task : removedTasks) { if (removeTaskInternal(task.getId(), syncStartTime)) { ++numTasksRemoved; } } // Add new tasks present in metadata store if their lastUpdatedTime is before syncStartTime - for (Task task : addedTasks) { + for (TaskInfo task : addedTasks) { addTaskInternal(task, syncStartTime); } @@ -876,15 +878,6 @@ void syncFromStorage() } } - private static Map toTaskIDMap(List taskList) - { - Map rv = new HashMap<>(); - for (Task task : taskList) { - rv.put(task.getId(), task); - } - return rv; - } - private Map getDeltaValues(Map total, Map prev) { final Map deltaValues = new HashMap<>(); @@ -921,7 +914,7 @@ private Map getCurrentTaskDatasources() { return activeTasks.values().stream() .filter(entry -> !entry.isComplete) - .map(entry -> entry.task) + .map(TaskEntry::getTask) .collect(Collectors.toMap(Task::getId, TaskQueue::getMetricKey)); } @@ -958,7 +951,7 @@ public Map getWaitingTaskCount() return activeTasks.values().stream() .filter(entry -> !entry.isComplete) - .map(entry -> entry.task) + .map(TaskEntry::getTask) .filter(task -> !runnerKnownTaskIds.contains(task.getId())) .collect(Collectors.toMap(TaskQueue::getMetricKey, task -> 1L, Long::sum)); } @@ -998,16 +991,16 @@ public CoordinatorRunStats getQueueStats() */ public Optional getActiveTask(String id) { - final TaskEntry entry = activeTasks.get(id); - if (entry == null) { + final Optional> taskInfo = getActiveTaskInfo(id); + if (!taskInfo.isPresent()) { return Optional.absent(); } - Task task = entry.task; + Task task = taskInfo.get().getTask(); if (task != null) { try { // Write and read the value using a mapper with password redaction mixin. - task = passwordRedactingMapper.readValue(passwordRedactingMapper.writeValueAsString(entry.task), Task.class); + task = passwordRedactingMapper.readValue(passwordRedactingMapper.writeValueAsString(task), Task.class); } catch (JsonProcessingException e) { log.error(e, "Failed to serialize or deserialize task with id [%s].", task.getId()); @@ -1020,25 +1013,40 @@ public Optional getActiveTask(String id) } /** - * List of all active and completed tasks currently being managed by this - * TaskQueue. + * Gets the {@link TaskInfo} for the given {@code taskId} from {@link #activeTasks} if present, + * otherwise returns an empty optional. + */ + public Optional> getActiveTaskInfo(String taskId) + { + final TaskEntry entry = activeTasks.get(taskId); + return entry == null ? Optional.absent() : Optional.of(entry.taskInfo); + } + + /** + * List of all active and completed task infos currently being managed by this TaskQueue. + */ + public List> getTaskInfos() + { + return activeTasks.values().stream().map(entry -> entry.taskInfo).collect(Collectors.toList()); + } + + /** + * List of all active and completed tasks currently being managed by this TaskQueue. */ public List getTasks() { - return activeTasks.values().stream().map(entry -> entry.task).collect(Collectors.toList()); + return getTaskInfos().stream().map(TaskInfo::getTask).collect(Collectors.toList()); } /** - * Returns the list of currently active tasks for the given datasource. + * Returns a map of currently active tasks for the given datasource. */ public Map getActiveTasksForDatasource(String datasource) { return activeTasks.values().stream().filter( entry -> !entry.isComplete - && entry.task.getDataSource().equals(datasource) - ).map( - entry -> entry.task - ).collect( + && entry.taskInfo.getDataSource().equals(datasource) + ).map(TaskEntry::getTask).collect( Collectors.toMap(Task::getId, Function.identity()) ); } @@ -1140,17 +1148,35 @@ private void removeTaskLock(Task task) */ static class TaskEntry { - private final Task task; + private TaskInfo taskInfo; private DateTime lastUpdatedTime; private ListenableFuture future = null; private boolean isComplete = false; - TaskEntry(Task task) + TaskEntry(TaskInfo taskInfo) { - this.task = task; + this.taskInfo = taskInfo; this.lastUpdatedTime = DateTimes.nowUtc(); } + + /** + * Returns the task associated with this {@link TaskEntry} + */ + Task getTask() + { + return taskInfo.getTask(); + } + + /** + * Updates the {@link TaskStatus} for the task associated with this {@link TaskEntry} and sets the corresponding + * update time. + */ + void updateStatus(TaskStatus status, DateTime updateTime) + { + this.taskInfo = this.taskInfo.withStatus(status); + this.lastUpdatedTime = updateTime; + } } private static RowKey getMetricKey(final Task task) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index b231b3f37c28..32f1a9372cb1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -40,8 +40,9 @@ public interface TaskStorage * * @param task task to add * @param status task status + * @return a TaskInfo object representing the task information that was committed to the storage facility */ - void insert(Task task, TaskStatus status); + TaskInfo insert(Task task, TaskStatus status); /** * Persists task status in the storage facility. This method should throw an exception if the task status lifecycle @@ -117,6 +118,14 @@ public interface TaskStorage */ List getActiveTasks(); + /** + * Returns a list of currently running or pending task infos as stored in the storage facility. No particular order + * is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. + * + * @return list of active task infos + */ + List> getActiveTaskInfos(); + /** * Returns a list of currently running or pending tasks as stored in the storage facility. No particular order * is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index d03a5c8c2cf9..0cb35ba39175 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -38,6 +38,7 @@ import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.error.DruidException; import org.apache.druid.indexer.RunnerTaskState; +import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -624,6 +625,71 @@ public void testGetActiveTaskRedactsPassword() throws JsonProcessingException Assert.assertEquals(taskInStorageAsString, taskInQueueAsString); } + @Test + public void testTaskShutdownUpdatesTaskStatusInTaskQueue() + { + final String shutdownReason = "Test shutdown reason"; + final TaskStatus shutdownStatus = TaskStatus.failure("shutdown-test-task", shutdownReason); + final TestTask task = new TestTask("shutdown-test-task", Intervals.of("2021-01-01/P1D")); + taskQueue.add(task); + + final Optional> activeInfoOpt = taskQueue.getActiveTaskInfo(task.getId()); + Assert.assertTrue(activeInfoOpt.isPresent()); + Assert.assertEquals(TaskState.RUNNING, activeInfoOpt.get().getStatus().getStatusCode()); + + taskQueue.shutdown(task.getId(), shutdownReason); + + final Optional> afterShutdownInfoOpt = taskQueue.getActiveTaskInfo(task.getId()); + Assert.assertTrue(afterShutdownInfoOpt.isPresent()); + Assert.assertEquals(shutdownStatus, afterShutdownInfoOpt.get().getStatus()); + Assert.assertEquals(shutdownStatus, getTaskStorage().getStatus(task.getId()).get()); + } + + @Test + public void testTaskSuccessUpdatesTaskStatusInTaskQueue() throws Exception + { + final TaskStatus successStatus = TaskStatus.success("success-test-task"); + final TestTask task = new TestTask("success-test-task", Intervals.of("2021-01-01/P1D")); + taskQueue.add(task); + taskQueue.manageQueuedTasks(); + + // ensure success callback has fired + Thread.sleep(100); + Assert.assertTrue(task.isDone()); + + final Optional> activeInfoOpt = taskQueue.getActiveTaskInfo(task.getId()); + Assert.assertTrue(activeInfoOpt.isPresent()); + Assert.assertEquals(successStatus, activeInfoOpt.get().getStatus()); + Assert.assertEquals(successStatus, getTaskStorage().getStatus(task.getId()).get()); + } + + @Test + public void testTaskFailureUpdatesTaskStatusInTaskQueue() throws Exception + { + final TaskStatus failedStatus = TaskStatus.failure("failure-test-task", "error"); + final TestTask task = new TestTask("failure-test-task", Intervals.of("2021-01-01/P1D")) + { + @Override + public TaskStatus runTask(TaskToolbox toolbox) + { + super.done = true; + return failedStatus; + } + }; + + taskQueue.add(task); + taskQueue.manageQueuedTasks(); + + // ensure failed callback has fired + Thread.sleep(100); + Assert.assertTrue(task.isDone()); + + final Optional> activeInfoOpt = taskQueue.getActiveTaskInfo(task.getId()); + Assert.assertTrue(activeInfoOpt.isPresent()); + Assert.assertEquals(failedStatus, activeInfoOpt.get().getStatus()); + Assert.assertEquals(failedStatus, getTaskStorage().getStatus(task.getId()).get()); + } + private HttpRemoteTaskRunner createHttpRemoteTaskRunner() { final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index afa67d41bea7..a6988feeeb1f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -999,21 +999,28 @@ public void testGetTaskStatus() throws Exception final Task task = NoopTask.create(); final String taskId = task.getId(); final TaskStatus status = TaskStatus.running(taskId); + final TaskInfo taskInfo = new TaskInfo<>( + task.getId(), + DateTimes.of("2018-01-01"), + status, + task.getDataSource(), + task + ); - EasyMock.expect(taskQueryTool.getTaskInfo(taskId)) - .andReturn(new TaskInfo( - task.getId(), - DateTimes.of("2018-01-01"), - status, - task.getDataSource(), - task - )); - - EasyMock.expect(taskQueryTool.getTaskInfo("othertask")) - .andReturn(null); + // For noop, simulate in-memory hit + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).once(); + EasyMock.expect(taskQueue.getActiveTaskInfo(taskId)).andReturn(Optional.of(taskInfo)).once(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).once(); EasyMock.>expect(taskRunner.getKnownTasks()) - .andReturn(ImmutableList.of()); + .andReturn(ImmutableList.of(new MockTaskRunnerWorkItem(taskId))).anyTimes(); + EasyMock.expect(taskRunner.getRunnerTaskState(taskId)).andReturn(RunnerTaskState.RUNNING).once(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).once(); + + // For "othertask", simulate in-memory miss, then task storage read + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).once(); + EasyMock.expect(taskQueue.getActiveTaskInfo("othertask")).andReturn(Optional.absent()).once(); + EasyMock.expect(taskStorage.getTaskInfo("othertask")).andReturn(null).once(); replayAll(); diff --git a/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java b/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java index aa9aa1097db5..2616f9daf02b 100644 --- a/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java +++ b/processing/src/main/java/org/apache/druid/indexer/TaskInfo.java @@ -76,4 +76,12 @@ public EntryType getTask() { return task; } + + /** + * Returns a copy of this TaskInfo object with the given status. + */ + public TaskInfo withStatus(StatusType status) + { + return new TaskInfo<>(id, createdTime, status, dataSource, task); + } }