diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java index 34733af08bb8..23d36a688c9d 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java @@ -42,9 +42,12 @@ public class TaskStatusPlus @Nullable private final String errorMsg; + @Nullable + private final String groupId; public TaskStatusPlus( String id, + @Nullable String groupId, String type, // nullable for backward compatibility DateTime createdTime, DateTime queueInsertionTime, @@ -58,6 +61,7 @@ public TaskStatusPlus( { this( id, + groupId, type, createdTime, queueInsertionTime, @@ -74,6 +78,7 @@ public TaskStatusPlus( @JsonCreator public TaskStatusPlus( @JsonProperty("id") String id, + @JsonProperty("groupId") @Nullable String groupId, @JsonProperty("type") @Nullable String type, // nullable for backward compatibility @JsonProperty("createdTime") DateTime createdTime, @JsonProperty("queueInsertionTime") DateTime queueInsertionTime, @@ -90,6 +95,7 @@ public TaskStatusPlus( Preconditions.checkNotNull(duration, "duration"); } this.id = Preconditions.checkNotNull(id, "id"); + this.groupId = groupId; this.type = type; this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime"); this.queueInsertionTime = Preconditions.checkNotNull(queueInsertionTime, "queueInsertionTime"); @@ -117,6 +123,13 @@ public String getId() return id; } + @Nullable + @JsonProperty + public String getGroupId() + { + return groupId; + } + @Nullable @JsonProperty public String getType() @@ -195,6 +208,7 @@ public boolean equals(Object o) } TaskStatusPlus that = (TaskStatusPlus) o; return Objects.equals(getId(), that.getId()) && + Objects.equals(getGroupId(), that.getGroupId()) && Objects.equals(getType(), that.getType()) && Objects.equals(getCreatedTime(), that.getCreatedTime()) && Objects.equals(getQueueInsertionTime(), that.getQueueInsertionTime()) && @@ -210,6 +224,7 @@ public int hashCode() { return Objects.hash( getId(), + getGroupId(), getType(), getCreatedTime(), getQueueInsertionTime(), @@ -226,6 +241,7 @@ public String toString() { return "TaskStatusPlus{" + "id='" + id + '\'' + + ", groupId='" + groupId + '\'' + ", type='" + type + '\'' + ", createdTime=" + createdTime + ", queueInsertionTime=" + queueInsertionTime + diff --git a/core/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java b/core/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java index 9d4817226bdf..03a8c6da7617 100644 --- a/core/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java +++ b/core/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java @@ -46,6 +46,7 @@ public void testSerde() throws IOException ); final TaskStatusPlus status = new TaskStatusPlus( "testId", + "testGroupId", "testType", DateTimes.nowUtc(), DateTimes.nowUtc(), @@ -71,6 +72,7 @@ public void testJsonAttributes() throws IOException ); final String json = "{\n" + "\"id\": \"testId\",\n" + + "\"groupId\": \"testGroupId\",\n" + "\"type\": \"testType\",\n" + "\"createdTime\": \"2018-09-17T06:35:17.392Z\",\n" + "\"queueInsertionTime\": \"2018-09-17T06:35:17.392Z\",\n" diff --git a/docs/querying/sql.md b/docs/querying/sql.md index 7a59390cef0c..4fdd2575b50d 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -751,7 +751,8 @@ check out the documentation for [ingestion tasks](../ingestion/tasks.html). |Column|Type|Notes| |------|-----|-----| |task_id|STRING|Unique task identifier| -|type|STRING|Task type, for example this value is "index" for indexing tasks. See [tasks-overview](../ingestion/tasks.md)| +|group_id|STRING|Task group ID for this task, the value depends on the task `type`. For example, for native index tasks, it's same as `task_id`, for sub tasks, this value is the parent task's ID| +|type|STRING|Task type, for example this value is "index" for indexing tasks. See [tasks-overview](../ingestion/tasks.html)| |datasource|STRING|Datasource name being indexed| |created_time|STRING|Timestamp in ISO8601 format corresponding to when the ingestion task was created. Note that this value is populated for completed and waiting tasks. For running and pending tasks this value is set to 1970-01-01T00:00:00Z| |queue_insertion_time|STRING|Timestamp in ISO8601 format corresponding to when this task was added to the queue on the Overlord| diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index a800a35f2d9f..fe22f82be19e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -203,7 +203,7 @@ public List> getActiveTaskInfo(@Nullable String dataS final ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (final TaskStuff taskStuff : tasks.values()) { if (taskStuff.getStatus().isRunnable()) { - TaskInfo t = new TaskInfo( + TaskInfo t = new TaskInfo<>( taskStuff.getTask().getId(), taskStuff.getCreatedDate(), taskStuff.getStatus(), @@ -267,7 +267,7 @@ private List> getRecentlyCreatedAlreadyFinishedTaskIn final ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (final TaskStuff taskStuff : list) { String id = taskStuff.getTask().getId(); - TaskInfo t = new TaskInfo( + TaskInfo t = new TaskInfo<>( id, taskStuff.getCreatedDate(), taskStuff.getStatus(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 256d62b48e32..186aa18224e3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -215,7 +215,8 @@ public List> getRecentlyCreatedAlreadyFinishedTaskInf { return ImmutableList.copyOf( handler.getCompletedTaskInfo( - DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), + DateTimes.nowUtc() + .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), maxTaskStatuses, datasource ) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 2dc68394eb6f..6226abd68bcb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -264,6 +264,7 @@ public Response getTaskStatus(@PathParam("taskid") String taskid) workItem.getTaskId(), new TaskStatusPlus( taskInfo.getId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), taskInfo.getCreatedTime(), // Would be nice to include the real queue insertion time, but the @@ -285,6 +286,7 @@ public Response getTaskStatus(@PathParam("taskid") String taskid) taskid, new TaskStatusPlus( taskInfo.getId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), taskInfo.getCreatedTime(), // Would be nice to include the real queue insertion time, but the @@ -577,6 +579,7 @@ public Response getTasks( List finalTaskList = new ArrayList<>(); Function activeTaskTransformFunc = workItem -> new TaskStatusPlus( workItem.getTaskId(), + workItem.getTaskGroupId(), workItem.getTaskType(), workItem.getCreatedTime(), workItem.getQueueInsertionTime(), @@ -590,6 +593,7 @@ public Response getTasks( Function, TaskStatusPlus> completeTaskTransformFunc = taskInfo -> new TaskStatusPlus( taskInfo.getId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), taskInfo.getCreatedTime(), // Would be nice to include the real queue insertion time, but the @@ -626,6 +630,7 @@ public Response getTasks( allActiveTasks.add( new AnyTask( task.getId(), + task.getTask() == null ? null : task.getTask().getGroupId(), task.getTask() == null ? null : task.getTask().getType(), SettableFuture.create(), task.getDataSource(), @@ -990,6 +995,7 @@ private List securedTaskStatusPlus( private static class AnyTask extends TaskRunnerWorkItem { + private final String taskGroupId; private final String taskType; private final String dataSource; private final TaskState taskState; @@ -1000,6 +1006,7 @@ private static class AnyTask extends TaskRunnerWorkItem AnyTask( String taskId, + String taskGroupId, String taskType, ListenableFuture result, String dataSource, @@ -1011,6 +1018,7 @@ private static class AnyTask extends TaskRunnerWorkItem ) { super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH); + this.taskGroupId = taskGroupId; this.taskType = taskType; this.dataSource = dataSource; this.taskState = state; @@ -1038,6 +1046,11 @@ public String getDataSource() return dataSource; } + public String getTaskGroupId() + { + return taskGroupId; + } + public TaskState getTaskState() { return taskState; @@ -1070,6 +1083,7 @@ public AnyTask withTaskState( { return new AnyTask( getTaskId(), + getTaskGroupId(), getTaskType(), getResult(), getDataSource(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 945316bb1769..62a360d5b2af 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; +import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -52,6 +53,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.NoopDataSegmentKiller; @@ -138,16 +140,24 @@ class LocalIndexingServiceClient extends NoopIndexingServiceClient public String runTask(Object taskObject) { final Task subTask = (Task) taskObject; + try { + getTaskStorage().insert(subTask, TaskStatus.running(subTask.getId())); + } + catch (EntryExistsException e) { + throw new RuntimeException(e); + } tasks.put(subTask.getId(), service.submit(() -> { try { final TaskToolbox toolbox = createTaskToolbox(subTask); if (subTask.isReady(toolbox.getTaskActionClient())) { return subTask.run(toolbox); } else { + getTaskStorage().setStatus(TaskStatus.failure(subTask.getId())); throw new ISE("task[%s] is not ready", subTask.getId()); } } catch (Exception e) { + getTaskStorage().setStatus(TaskStatus.failure(subTask.getId(), e.getMessage())); throw new RuntimeException(e); } })); @@ -158,6 +168,8 @@ public String runTask(Object taskObject) public TaskStatusResponse getTaskStatus(String taskId) { final Future taskStatusFuture = tasks.get(taskId); + final Optional task = getTaskStorage().getTask(taskId); + final String groupId = task.isPresent() ? task.get().getGroupId() : null; if (taskStatusFuture != null) { try { if (taskStatusFuture.isDone()) { @@ -166,6 +178,7 @@ public TaskStatusResponse getTaskStatus(String taskId) taskId, new TaskStatusPlus( taskId, + groupId, SinglePhaseSubTask.TYPE, DateTimes.EPOCH, DateTimes.EPOCH, @@ -182,6 +195,7 @@ public TaskStatusResponse getTaskStatus(String taskId) taskId, new TaskStatusPlus( taskId, + groupId, SinglePhaseSubTask.TYPE, DateTimes.EPOCH, DateTimes.EPOCH, @@ -203,6 +217,7 @@ public TaskStatusResponse getTaskStatus(String taskId) taskId, new TaskStatusPlus( taskId, + groupId, SinglePhaseSubTask.TYPE, DateTimes.EPOCH, DateTimes.EPOCH, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 90bf132a921f..31465234cdaa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -606,6 +606,7 @@ public SinglePhaseSubTask newSubTask(int numAttempts) getId(), new TaskStatusPlus( subTask.getId(), + subTask.getGroupId(), subTask.getType(), DateTimes.EPOCH, DateTimes.EPOCH, @@ -708,6 +709,7 @@ void setState(TaskState state) taskHistories.computeIfAbsent(specId, k -> new ArrayList<>()).add( new TaskStatusPlus( getId(), + getGroupId(), getType(), DateTimes.EPOCH, DateTimes.EPOCH, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java index e5f85cb5dc99..604376891cf3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java @@ -196,6 +196,7 @@ public TaskStatusResponse getTaskStatus(String taskId) taskId, new TaskStatusPlus( taskId, + "groupId", "testTask", DateTimes.EPOCH, DateTimes.EPOCH, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 159192562798..a2971cecb888 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -1001,7 +1001,13 @@ public void testGetTaskStatus() throws Exception final TaskStatus status = TaskStatus.running("mytask"); EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("mytask")) - .andReturn(new TaskInfo<>(task.getId(), DateTimes.of("2018-01-01"), status, task.getDataSource(), task)); + .andReturn(new TaskInfo( + task.getId(), + DateTimes.of("2018-01-01"), + status, + task.getDataSource(), + task + )); EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("othertask")) .andReturn(null); @@ -1029,6 +1035,7 @@ public void testGetTaskStatus() throws Exception new TaskStatusResponse( "mytask", new TaskStatusPlus( + "mytask", "mytask", "noop", DateTimes.of("2018-01-01"), diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json index d27d7661eb52..3f0e48fca71e 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json @@ -2,6 +2,7 @@ { "task_id": "index_auth_test_2030-04-30T01:13:31.893Z", "type": null, + "group_id": null, "datasource": "auth_test", "created_time": "2030-04-30T01:13:31.893Z", "queue_insertion_time": "1970-01-01T00:00:00.000Z", diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 0b171bdc6feb..08f768da6de2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -165,6 +165,7 @@ public class SystemSchema extends AbstractSchema static final RowSignature TASKS_SIGNATURE = RowSignature .builder() .add("task_id", ValueType.STRING) + .add("group_id", ValueType.STRING) .add("type", ValueType.STRING) .add("datasource", ValueType.STRING) .add("created_time", ValueType.STRING) @@ -649,6 +650,7 @@ public Object[] current() } return new Object[]{ task.getId(), + task.getGroupId(), task.getType(), task.getDataSource(), toStringOrNull(task.getCreatedTime()), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 4f0b519a3716..3118af043da9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -466,7 +466,7 @@ public void testGetTableMap() final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks"); final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl()); final List sysFields = sysRowType.getFieldList(); - Assert.assertEquals(13, sysFields.size()); + Assert.assertEquals(14, sysFields.size()); Assert.assertEquals("task_id", sysFields.get(0).getName()); Assert.assertEquals(SqlTypeName.VARCHAR, sysFields.get(0).getType().getSqlTypeName()); @@ -1012,6 +1012,7 @@ public void testTasksTable() throws Exception String json = "[{\n" + "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n" + + "\t\"groupId\": \"group_index_wikipedia_2018-09-20T22:33:44.911Z\",\n" + "\t\"type\": \"index\",\n" + "\t\"createdTime\": \"2018-09-20T22:33:44.922Z\",\n" + "\t\"queueInsertionTime\": \"1970-01-01T00:00:00.000Z\",\n" @@ -1027,6 +1028,7 @@ public void testTasksTable() throws Exception + "\t\"errorMsg\": null\n" + "}, {\n" + "\t\"id\": \"index_wikipedia_2018-09-21T18:38:47.773Z\",\n" + + "\t\"groupId\": \"group_index_wikipedia_2018-09-21T18:38:47.773Z\",\n" + "\t\"type\": \"index\",\n" + "\t\"createdTime\": \"2018-09-21T18:38:47.873Z\",\n" + "\t\"queueInsertionTime\": \"2018-09-21T18:38:47.910Z\",\n" @@ -1077,17 +1079,35 @@ public Object get(String name) Object[] row0 = rows.get(0); Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row0[0].toString()); - Assert.assertEquals("FAILED", row0[5].toString()); - Assert.assertEquals("NONE", row0[6].toString()); - Assert.assertEquals(-1L, row0[7]); - Assert.assertEquals("testHost:1234", row0[8]); + Assert.assertEquals("group_index_wikipedia_2018-09-20T22:33:44.911Z", row0[1].toString()); + Assert.assertEquals("index", row0[2].toString()); + Assert.assertEquals("wikipedia", row0[3].toString()); + Assert.assertEquals("2018-09-20T22:33:44.922Z", row0[4].toString()); + Assert.assertEquals("1970-01-01T00:00:00.000Z", row0[5].toString()); + Assert.assertEquals("FAILED", row0[6].toString()); + Assert.assertEquals("NONE", row0[7].toString()); + Assert.assertEquals(-1L, row0[8]); + Assert.assertEquals("testHost:1234", row0[9]); + Assert.assertEquals("testHost", row0[10]); + Assert.assertEquals(1234L, row0[11]); + Assert.assertEquals(-1L, row0[12]); + Assert.assertEquals(null, row0[13]); Object[] row1 = rows.get(1); Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row1[0].toString()); - Assert.assertEquals("RUNNING", row1[5].toString()); + Assert.assertEquals("group_index_wikipedia_2018-09-21T18:38:47.773Z", row1[1].toString()); + Assert.assertEquals("index", row1[2].toString()); + Assert.assertEquals("wikipedia", row1[3].toString()); + Assert.assertEquals("2018-09-21T18:38:47.873Z", row1[4].toString()); + Assert.assertEquals("2018-09-21T18:38:47.910Z", row1[5].toString()); Assert.assertEquals("RUNNING", row1[6].toString()); - Assert.assertEquals(0L, row1[7]); - Assert.assertEquals("192.168.1.6:8100", row1[8]); + Assert.assertEquals("RUNNING", row1[7].toString()); + Assert.assertEquals(0L, row1[8]); + Assert.assertEquals("192.168.1.6:8100", row1[9]); + Assert.assertEquals("192.168.1.6", row1[10]); + Assert.assertEquals(8100L, row1[11]); + Assert.assertEquals(-1L, row1[12]); + Assert.assertEquals(null, row1[13]); // Verify value types. verifyTypes(rows, SystemSchema.TASKS_SIGNATURE);