Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public HeapMemoryTaskStorage(TaskStorageConfig config)
}

@Override
public void insert(Task task, TaskStatus status)
public TaskInfo<Task, TaskStatus> insert(Task task, TaskStatus status)
{
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(status, "status");
Expand All @@ -94,6 +94,7 @@ public void insert(Task task, TaskStatus status)
}

log.info("Inserted task[%s] with status[%s]", task.getId(), status);
return TaskStuff.toTaskInfo(newTaskStuff);
}

@Override
Expand Down Expand Up @@ -159,6 +160,18 @@ public List<Task> getActiveTasks()
return listBuilder.build();
}

@Override
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfos()
{
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
for (final TaskStuff taskStuff : tasks.values()) {
if (taskStuff.getStatus().isRunnable()) {
listBuilder.add(TaskStuff.toTaskInfo(taskStuff));
}
}
Comment thread
jtuglu1 marked this conversation as resolved.
return listBuilder.build();
}

@Override
public List<Task> getActiveTasksByDatasource(String datasource)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.Collections;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void stop()
}

@Override
public void insert(final Task task, final TaskStatus status)
public TaskInfo<Task, TaskStatus> insert(final Task task, final TaskStatus status)
{
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(status, "status");
Expand All @@ -123,11 +124,12 @@ public void insert(final Task task, final TaskStatus status)
);

log.info("Inserting task [%s] with status [%s].", task.getId(), status);
final DateTime insertionTime = DateTimes.nowUtc();

try {
handler.insert(
task.getId(),
DateTimes.nowUtc(),
insertionTime,
task.getDataSource(),
task,
status.isRunnable(),
Expand All @@ -142,6 +144,14 @@ public void insert(final Task task, final TaskStatus status)
catch (Exception e) {
throw new RuntimeException(e);
}

return new TaskInfo<>(
task.getId(),
insertionTime,
status,
task.getDataSource(),
task
);
}

@Override
Expand Down Expand Up @@ -182,10 +192,17 @@ public List<Task> getActiveTasks()
{
// filter out taskInfo with a null 'task' which should only happen in practice if we are missing a jackson module
// and don't know what to do with the payload, so we won't be able to make use of it anyway
return handler.getTaskInfos(Collections.singletonMap(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null)
return getActiveTaskInfos().stream()
.map(TaskInfo::getTask)
.collect(Collectors.toList());
}

@Override
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfos()
{
return handler.getTaskInfos(Map.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null)
.stream()
.filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null)
.map(TaskInfo::getTask)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@
import org.apache.druid.indexing.overlord.http.TaskStateLookup;
import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;

Expand Down Expand Up @@ -147,6 +145,13 @@ public Optional<TaskStatus> getTaskStatus(final String taskId)
@Nullable
public TaskInfo<Task, TaskStatus> getTaskInfo(String taskId)
{
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
final Optional<TaskInfo<Task, TaskStatus>> taskStatus = taskQueue.get().getActiveTaskInfo(taskId);
if (taskStatus.isPresent()) {
return taskStatus.get();
}
}
return storage.getTaskInfo(taskId);
}

Expand All @@ -156,12 +161,10 @@ public List<TaskStatusPlus> getAllActiveTasks()
if (taskQueue.isPresent()) {
// Serve active task statuses from memory
final List<TaskStatusPlus> taskStatusPlusList = new ArrayList<>();
final List<TaskInfo<Task, TaskStatus>> activeTasks = taskQueue.get().getTaskInfos();

// Use a dummy created time as this is not used by the caller, just needs to be non-null
final DateTime createdTime = DateTimes.nowUtc();

final List<Task> activeTasks = taskQueue.get().getTasks();
for (Task task : activeTasks) {
for (TaskInfo<Task, TaskStatus> taskInfo : activeTasks) {
final Task task = taskInfo.getTask();
final Optional<TaskStatus> statusOptional = taskQueue.get().getTaskStatus(task.getId());
if (statusOptional.isPresent()) {
final TaskStatus status = statusOptional.get();
Expand All @@ -170,8 +173,8 @@ public List<TaskStatusPlus> getAllActiveTasks()
task.getId(),
task.getGroupId(),
task.getType(),
createdTime,
createdTime,
taskInfo.getCreatedTime(),
taskInfo.getCreatedTime(),
status.getStatusCode(),
null,
status.getDuration(),
Expand Down
Loading
Loading