Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ public class TaskStatusPlus

@Nullable
private final String errorMsg;
@Nullable
private final String groupId;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a more intuitive name for users to understand would be parentId? Renaming it to parentId will also resemble the parent process id concept of most operating systems that many users are familiar with. For non-subtasks, the value would default to the empty string instead of the task id.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds an interesting idea, but I'm not sure if it would make sense with Kafka/Kinesis indexing tasks. The kafka/kinesis supervisor spawns indexing tasks but the supervisor is executed using a thread in the overlord. The tasks have the same groupId which is datasource name where they ingest data into.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't know there's already a notion of task groups. For consistency, we should continue using "group" then.


public TaskStatusPlus(
String id,
@Nullable String groupId,
String type, // nullable for backward compatibility
DateTime createdTime,
DateTime queueInsertionTime,
Expand All @@ -58,6 +61,7 @@ public TaskStatusPlus(
{
this(
id,
groupId,
type,
createdTime,
queueInsertionTime,
Expand All @@ -74,6 +78,7 @@ public TaskStatusPlus(
@JsonCreator
public TaskStatusPlus(
@JsonProperty("id") String id,
@JsonProperty("groupId") @Nullable String groupId,
@JsonProperty("type") @Nullable String type, // nullable for backward compatibility
@JsonProperty("createdTime") DateTime createdTime,
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
Expand All @@ -90,6 +95,7 @@ public TaskStatusPlus(
Preconditions.checkNotNull(duration, "duration");
}
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = groupId;
this.type = type;
this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime");
this.queueInsertionTime = Preconditions.checkNotNull(queueInsertionTime, "queueInsertionTime");
Expand Down Expand Up @@ -117,6 +123,13 @@ public String getId()
return id;
}

@Nullable
@JsonProperty
public String getGroupId()
{
return groupId;
}

@Nullable
@JsonProperty
public String getType()
Expand Down Expand Up @@ -195,6 +208,7 @@ public boolean equals(Object o)
}
TaskStatusPlus that = (TaskStatusPlus) o;
return Objects.equals(getId(), that.getId()) &&
Objects.equals(getGroupId(), that.getGroupId()) &&
Objects.equals(getType(), that.getType()) &&
Objects.equals(getCreatedTime(), that.getCreatedTime()) &&
Objects.equals(getQueueInsertionTime(), that.getQueueInsertionTime()) &&
Expand All @@ -210,6 +224,7 @@ public int hashCode()
{
return Objects.hash(
getId(),
getGroupId(),
getType(),
getCreatedTime(),
getQueueInsertionTime(),
Expand All @@ -226,6 +241,7 @@ public String toString()
{
return "TaskStatusPlus{" +
"id='" + id + '\'' +
", groupId='" + groupId + '\'' +
", type='" + type + '\'' +
", createdTime=" + createdTime +
", queueInsertionTime=" + queueInsertionTime +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public void testSerde() throws IOException
);
final TaskStatusPlus status = new TaskStatusPlus(
"testId",
"testGroupId",
"testType",
DateTimes.nowUtc(),
DateTimes.nowUtc(),
Expand All @@ -71,6 +72,7 @@ public void testJsonAttributes() throws IOException
);
final String json = "{\n"
+ "\"id\": \"testId\",\n"
+ "\"groupId\": \"testGroupId\",\n"
+ "\"type\": \"testType\",\n"
+ "\"createdTime\": \"2018-09-17T06:35:17.392Z\",\n"
+ "\"queueInsertionTime\": \"2018-09-17T06:35:17.392Z\",\n"
Expand Down
3 changes: 2 additions & 1 deletion docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,8 @@ check out the documentation for [ingestion tasks](../ingestion/tasks.html).
|Column|Type|Notes|
|------|-----|-----|
|task_id|STRING|Unique task identifier|
|type|STRING|Task type, for example this value is "index" for indexing tasks. See [tasks-overview](../ingestion/tasks.md)|
|group_id|STRING|Task group ID for this task, the value depends on the task `type`. For example, for native index tasks, it's same as `task_id`, for sub tasks, this value is the parent task's ID|
|type|STRING|Task type, for example this value is "index" for indexing tasks. See [tasks-overview](../ingestion/tasks.html)|
|datasource|STRING|Datasource name being indexed|
|created_time|STRING|Timestamp in ISO8601 format corresponding to when the ingestion task was created. Note that this value is populated for completed and waiting tasks. For running and pending tasks this value is set to 1970-01-01T00:00:00Z|
|queue_insertion_time|STRING|Timestamp in ISO8601 format corresponding to when this task was added to the queue on the Overlord|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataS
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
for (final TaskStuff taskStuff : tasks.values()) {
if (taskStuff.getStatus().isRunnable()) {
TaskInfo t = new TaskInfo(
TaskInfo t = new TaskInfo<>(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing!

taskStuff.getTask().getId(),
taskStuff.getCreatedDate(),
taskStuff.getStatus(),
Expand Down Expand Up @@ -267,7 +267,7 @@ private List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskIn
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
for (final TaskStuff taskStuff : list) {
String id = taskStuff.getTask().getId();
TaskInfo t = new TaskInfo(
TaskInfo t = new TaskInfo<>(
id,
taskStuff.getCreatedDate(),
taskStuff.getStatus(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ public List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInf
{
return ImmutableList.copyOf(
handler.getCompletedTaskInfo(
DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
DateTimes.nowUtc()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reformatting!

.minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
maxTaskStatuses,
datasource
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public Response getTaskStatus(@PathParam("taskid") String taskid)
workItem.getTaskId(),
new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
taskInfo.getCreatedTime(),
// Would be nice to include the real queue insertion time, but the
Expand All @@ -285,6 +286,7 @@ public Response getTaskStatus(@PathParam("taskid") String taskid)
taskid,
new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
taskInfo.getCreatedTime(),
// Would be nice to include the real queue insertion time, but the
Expand Down Expand Up @@ -577,6 +579,7 @@ public Response getTasks(
List<TaskStatusPlus> finalTaskList = new ArrayList<>();
Function<AnyTask, TaskStatusPlus> activeTaskTransformFunc = workItem -> new TaskStatusPlus(
workItem.getTaskId(),
workItem.getTaskGroupId(),
workItem.getTaskType(),
workItem.getCreatedTime(),
workItem.getQueueInsertionTime(),
Expand All @@ -590,6 +593,7 @@ public Response getTasks(

Function<TaskInfo<Task, TaskStatus>, TaskStatusPlus> completeTaskTransformFunc = taskInfo -> new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
taskInfo.getCreatedTime(),
// Would be nice to include the real queue insertion time, but the
Expand Down Expand Up @@ -626,6 +630,7 @@ public Response getTasks(
allActiveTasks.add(
new AnyTask(
task.getId(),
task.getTask() == null ? null : task.getTask().getGroupId(),
task.getTask() == null ? null : task.getTask().getType(),
SettableFuture.create(),
task.getDataSource(),
Expand Down Expand Up @@ -990,6 +995,7 @@ private List<TaskStatusPlus> securedTaskStatusPlus(

private static class AnyTask extends TaskRunnerWorkItem
{
private final String taskGroupId;
private final String taskType;
private final String dataSource;
private final TaskState taskState;
Expand All @@ -1000,6 +1006,7 @@ private static class AnyTask extends TaskRunnerWorkItem

AnyTask(
String taskId,
String taskGroupId,
String taskType,
ListenableFuture<TaskStatus> result,
String dataSource,
Expand All @@ -1011,6 +1018,7 @@ private static class AnyTask extends TaskRunnerWorkItem
)
{
super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH);
this.taskGroupId = taskGroupId;
this.taskType = taskType;
this.dataSource = dataSource;
this.taskState = state;
Expand Down Expand Up @@ -1038,6 +1046,11 @@ public String getDataSource()
return dataSource;
}

public String getTaskGroupId()
{
return taskGroupId;
}

public TaskState getTaskState()
{
return taskState;
Expand Down Expand Up @@ -1070,6 +1083,7 @@ public AnyTask withTaskState(
{
return new AnyTask(
getTaskId(),
getTaskGroupId(),
getTaskType(),
getResult(),
getDataSource(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -52,6 +53,7 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
Expand Down Expand Up @@ -138,16 +140,24 @@ class LocalIndexingServiceClient extends NoopIndexingServiceClient
public String runTask(Object taskObject)
{
final Task subTask = (Task) taskObject;
try {
getTaskStorage().insert(subTask, TaskStatus.running(subTask.getId()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the subtask only starts running after subTask.run() is called below. What cleans up the task running status created here if:

  • The task never starts running (task is not ready, so subTask.run() is not called)
  • The task starts running but unexpectedly fails during execution

If those scenarios need to be considered, then added tests for them will be useful.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, good point, I set the task status to failure if any of those condition occurs. Not sure how/where to unit test for those cases though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is a test class, so my earlier comment can be disregarded

}
catch (EntryExistsException e) {
throw new RuntimeException(e);
}
tasks.put(subTask.getId(), service.submit(() -> {
try {
final TaskToolbox toolbox = createTaskToolbox(subTask);
if (subTask.isReady(toolbox.getTaskActionClient())) {
return subTask.run(toolbox);
} else {
getTaskStorage().setStatus(TaskStatus.failure(subTask.getId()));
throw new ISE("task[%s] is not ready", subTask.getId());
}
}
catch (Exception e) {
getTaskStorage().setStatus(TaskStatus.failure(subTask.getId(), e.getMessage()));
throw new RuntimeException(e);
}
}));
Expand All @@ -158,6 +168,8 @@ public String runTask(Object taskObject)
public TaskStatusResponse getTaskStatus(String taskId)
{
final Future<TaskStatus> taskStatusFuture = tasks.get(taskId);
final Optional<Task> task = getTaskStorage().getTask(taskId);
final String groupId = task.isPresent() ? task.get().getGroupId() : null;
if (taskStatusFuture != null) {
try {
if (taskStatusFuture.isDone()) {
Expand All @@ -166,6 +178,7 @@ public TaskStatusResponse getTaskStatus(String taskId)
taskId,
new TaskStatusPlus(
taskId,
groupId,
SinglePhaseSubTask.TYPE,
DateTimes.EPOCH,
DateTimes.EPOCH,
Expand All @@ -182,6 +195,7 @@ public TaskStatusResponse getTaskStatus(String taskId)
taskId,
new TaskStatusPlus(
taskId,
groupId,
SinglePhaseSubTask.TYPE,
DateTimes.EPOCH,
DateTimes.EPOCH,
Expand All @@ -203,6 +217,7 @@ public TaskStatusResponse getTaskStatus(String taskId)
taskId,
new TaskStatusPlus(
taskId,
groupId,
SinglePhaseSubTask.TYPE,
DateTimes.EPOCH,
DateTimes.EPOCH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ public SinglePhaseSubTask newSubTask(int numAttempts)
getId(),
new TaskStatusPlus(
subTask.getId(),
subTask.getGroupId(),
subTask.getType(),
DateTimes.EPOCH,
DateTimes.EPOCH,
Expand Down Expand Up @@ -708,6 +709,7 @@ void setState(TaskState state)
taskHistories.computeIfAbsent(specId, k -> new ArrayList<>()).add(
new TaskStatusPlus(
getId(),
getGroupId(),
getType(),
DateTimes.EPOCH,
DateTimes.EPOCH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public TaskStatusResponse getTaskStatus(String taskId)
taskId,
new TaskStatusPlus(
taskId,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is taskId passed in as the groupId argument? Would using "groupId" or a GROUP_ID named constant work here?

If groupId is changed to parentId, then a named constant NO_PARENT_ID = "" would work well here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it might be fine to use the taskId as the groupId since the default value of groupId is the taskId. But, since TaskMonitor is used to monitor sub tasks of the parallel indexing task, it could be more realistic to use the groupId of the taskId of the supervisor task.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used taskId because getTaskStatus only takes a taskId string, instead of a Taskobject, so there was no easy way to get the real groupId without changing the the interface IndexingServiceClient and also as Jihoon mentioned because default is taskId. This particular test works with any value, eg groupId can be any string or null.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this particular test, the groupId is always "groupId" as seen here. You can use it if you want.

"groupId",
"testTask",
DateTimes.EPOCH,
DateTimes.EPOCH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,13 @@ public void testGetTaskStatus() throws Exception
final TaskStatus status = TaskStatus.running("mytask");

EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("mytask"))
.andReturn(new TaskInfo<>(task.getId(), DateTimes.of("2018-01-01"), status, task.getDataSource(), task));
.andReturn(new TaskInfo(
task.getId(),
DateTimes.of("2018-01-01"),
status,
task.getDataSource(),
task
));

EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("othertask"))
.andReturn(null);
Expand Down Expand Up @@ -1029,6 +1035,7 @@ public void testGetTaskStatus() throws Exception
new TaskStatusResponse(
"mytask",
new TaskStatusPlus(
"mytask",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here about id and groupId having the same value and changing it if groupId is renamed to parentId

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, it does matter, the value of groupId is null in NoopTask, so it gets set to taskId in AbstractTask and this test checks the TaskStatusResponse later, so this will fail if groupId is not set to taskId

"mytask",
"noop",
DateTimes.of("2018-01-01"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
"task_id": "index_auth_test_2030-04-30T01:13:31.893Z",
"type": null,
"group_id": null,
"datasource": "auth_test",
"created_time": "2030-04-30T01:13:31.893Z",
"queue_insertion_time": "1970-01-01T00:00:00.000Z",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public class SystemSchema extends AbstractSchema
static final RowSignature TASKS_SIGNATURE = RowSignature
.builder()
.add("task_id", ValueType.STRING)
.add("group_id", ValueType.STRING)
.add("type", ValueType.STRING)
.add("datasource", ValueType.STRING)
.add("created_time", ValueType.STRING)
Expand Down Expand Up @@ -649,6 +650,7 @@ public Object[] current()
}
return new Object[]{
task.getId(),
task.getGroupId(),
task.getType(),
task.getDataSource(),
toStringOrNull(task.getCreatedTime()),
Expand Down
Loading