From fc0b9749bf3d7cfdae28e4f7655395efe63a69e8 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 14 Aug 2019 16:20:22 -0700 Subject: [PATCH 1/9] Add group_id to overlord tasks API and sys.tasks table --- .../org/apache/druid/indexer/TaskInfo.java | 48 +++++- .../apache/druid/indexer/TaskStatusPlus.java | 14 ++ .../druid/indexer/TaskStatusPlusTest.java | 2 + .../overlord/HeapMemoryTaskStorage.java | 12 +- .../overlord/MetadataTaskStorage.java | 37 ++++- .../overlord/http/OverlordResource.java | 14 ++ ...stractParallelIndexSupervisorTaskTest.java | 13 ++ ...rallelIndexSupervisorTaskResourceTest.java | 2 + .../task/batch/parallel/TaskMonitorTest.java | 1 + .../IndexerMetadataStorageAdapterTest.java | 12 +- .../overlord/http/OverlordResourceTest.java | 148 ++++++++++++------ .../SQLMetadataStorageActionHandler.java | 2 +- .../sql/calcite/schema/SystemSchema.java | 2 + .../sql/calcite/schema/SystemSchemaTest.java | 28 +++- 14 files changed, 266 insertions(+), 69 deletions(-) diff --git a/core/src/main/java/org/apache/druid/indexer/TaskInfo.java b/core/src/main/java/org/apache/druid/indexer/TaskInfo.java index 411d87af6a90..4a03ba089134 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskInfo.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskInfo.java @@ -30,13 +30,14 @@ public class TaskInfo { private final String id; + private String groupId; private final DateTime createdTime; private final StatusType status; private final String dataSource; @Nullable private final EntryType task; - public TaskInfo( + private TaskInfo( String id, DateTime createdTime, StatusType status, @@ -51,11 +52,56 @@ public TaskInfo( this.task = task; } + private TaskInfo( + String id, + String groupId, + DateTime createdTime, + StatusType status, + String dataSource, + @Nullable EntryType task + ) + { + this.id = Preconditions.checkNotNull(id, "id"); + this.groupId = Preconditions.checkNotNull(groupId, "groupId"); + this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime"); + this.status = Preconditions.checkNotNull(status, "status"); + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.task = task; + } + + public static TaskInfo createTaskInfo( + String id, + DateTime createdTime, + StatusType status, + String dataSource, + @Nullable EntryType task + ) + { + return new TaskInfo(id, createdTime, status, dataSource, task); + } + + public static TaskInfo createTaskInfoWithGroupId( + String id, + String groupId, + DateTime createdTime, + StatusType status, + String dataSource, + @Nullable EntryType task + ) + { + return new TaskInfo(id, groupId, createdTime, status, dataSource, task); + } + public String getId() { return id; } + public String getGroupId() + { + return groupId; + } + public DateTime getCreatedTime() { return createdTime; diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java index 34733af08bb8..789e5b8deb44 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java @@ -31,6 +31,7 @@ public class TaskStatusPlus { private final String id; + private final String groupId; private final String type; private final DateTime createdTime; private final DateTime queueInsertionTime; @@ -45,6 +46,7 @@ public class TaskStatusPlus public TaskStatusPlus( String id, + String groupId, String type, // nullable for backward compatibility DateTime createdTime, DateTime queueInsertionTime, @@ -58,6 +60,7 @@ public TaskStatusPlus( { this( id, + groupId, type, createdTime, queueInsertionTime, @@ -74,6 +77,7 @@ public TaskStatusPlus( @JsonCreator public TaskStatusPlus( @JsonProperty("id") String id, + @JsonProperty("groupId") String groupId, @JsonProperty("type") @Nullable String type, // nullable for backward compatibility @JsonProperty("createdTime") DateTime createdTime, @JsonProperty("queueInsertionTime") DateTime queueInsertionTime, @@ -90,6 +94,7 @@ public TaskStatusPlus( Preconditions.checkNotNull(duration, "duration"); } this.id = Preconditions.checkNotNull(id, "id"); + this.groupId = Preconditions.checkNotNull(groupId, "groupId"); this.type = type; this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime"); this.queueInsertionTime = Preconditions.checkNotNull(queueInsertionTime, "queueInsertionTime"); @@ -117,6 +122,12 @@ public String getId() return id; } + @JsonProperty + public String getGroupId() + { + return groupId; + } + @Nullable @JsonProperty public String getType() @@ -195,6 +206,7 @@ public boolean equals(Object o) } TaskStatusPlus that = (TaskStatusPlus) o; return Objects.equals(getId(), that.getId()) && + Objects.equals(getGroupId(), that.getGroupId()) && Objects.equals(getType(), that.getType()) && Objects.equals(getCreatedTime(), that.getCreatedTime()) && Objects.equals(getQueueInsertionTime(), that.getQueueInsertionTime()) && @@ -210,6 +222,7 @@ public int hashCode() { return Objects.hash( getId(), + getGroupId(), getType(), getCreatedTime(), getQueueInsertionTime(), @@ -226,6 +239,7 @@ public String toString() { return "TaskStatusPlus{" + "id='" + id + '\'' + + "groupId='" + groupId + '\'' + ", type='" + type + '\'' + ", createdTime=" + createdTime + ", queueInsertionTime=" + queueInsertionTime + diff --git a/core/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java b/core/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java index 9d4817226bdf..03a8c6da7617 100644 --- a/core/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java +++ b/core/src/test/java/org/apache/druid/indexer/TaskStatusPlusTest.java @@ -46,6 +46,7 @@ public void testSerde() throws IOException ); final TaskStatusPlus status = new TaskStatusPlus( "testId", + "testGroupId", "testType", DateTimes.nowUtc(), DateTimes.nowUtc(), @@ -71,6 +72,7 @@ public void testJsonAttributes() throws IOException ); final String json = "{\n" + "\"id\": \"testId\",\n" + + "\"groupId\": \"testGroupId\",\n" + "\"type\": \"testType\",\n" + "\"createdTime\": \"2018-09-17T06:35:17.392Z\",\n" + "\"queueInsertionTime\": \"2018-09-17T06:35:17.392Z\",\n" diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index a800a35f2d9f..6ba5b26e5b94 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -159,8 +159,9 @@ public TaskInfo getTaskInfo(String taskId) Preconditions.checkNotNull(taskId, "taskId"); final TaskStuff taskStuff = tasks.get(taskId); if (taskStuff != null) { - return new TaskInfo<>( + return TaskInfo.createTaskInfoWithGroupId( taskStuff.getTask().getId(), + taskStuff.getTask().getGroupId(), taskStuff.getCreatedDate(), taskStuff.getStatus(), taskStuff.getDataSource(), @@ -203,8 +204,9 @@ public List> getActiveTaskInfo(@Nullable String dataS final ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (final TaskStuff taskStuff : tasks.values()) { if (taskStuff.getStatus().isRunnable()) { - TaskInfo t = new TaskInfo( + TaskInfo t = TaskInfo.createTaskInfoWithGroupId( taskStuff.getTask().getId(), + taskStuff.getTask().getGroupId(), taskStuff.getCreatedDate(), taskStuff.getStatus(), taskStuff.getDataSource(), @@ -267,8 +269,9 @@ private List> getRecentlyCreatedAlreadyFinishedTaskIn final ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (final TaskStuff taskStuff : list) { String id = taskStuff.getTask().getId(); - TaskInfo t = new TaskInfo( + TaskInfo t = TaskInfo.createTaskInfoWithGroupId( id, + taskStuff.getTask().getGroupId(), taskStuff.getCreatedDate(), taskStuff.getStatus(), taskStuff.getDataSource(), @@ -297,8 +300,9 @@ private List> getNRecentlyCreatedAlreadyFinishedTaskI final ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (final TaskStuff taskStuff : list) { String id = taskStuff.getTask().getId(); - TaskInfo t = new TaskInfo<>( + TaskInfo t = TaskInfo.createTaskInfoWithGroupId( id, + taskStuff.getTask().getGroupId(), taskStuff.getCreatedDate(), taskStuff.getStatus(), taskStuff.getDataSource(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 256d62b48e32..21ad346413a1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -183,7 +183,15 @@ public Optional getStatus(final String taskId) @Override public TaskInfo getTaskInfo(String taskId) { - return handler.getTaskInfo(taskId); + final TaskInfo taskInfo = handler.getTaskInfo(taskId); + return TaskInfo.createTaskInfoWithGroupId( + taskInfo.getId(), + taskInfo.getTask().getGroupId(), + taskInfo.getCreatedTime(), + taskInfo.getStatus(), + taskInfo.getDataSource(), + taskInfo.getTask() + ); } @Override @@ -201,9 +209,19 @@ public List getActiveTasks() @Override public List> getActiveTaskInfo(@Nullable String dataSource) { - return ImmutableList.copyOf( + final List> taskInfoImmutableList = ImmutableList.copyOf( handler.getActiveTaskInfo(dataSource) ); + return taskInfoImmutableList.stream() + .map(t -> TaskInfo.createTaskInfoWithGroupId( + t.getId(), + t.getTask().getGroupId(), + t.getCreatedTime(), + t.getStatus(), + t.getDataSource(), + t.getTask() + )) + .collect(Collectors.toList()); } @Override @@ -213,13 +231,24 @@ public List> getRecentlyCreatedAlreadyFinishedTaskInf @Nullable String datasource ) { - return ImmutableList.copyOf( + final List> taskInfoImmutableList = ImmutableList.copyOf( handler.getCompletedTaskInfo( - DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), + DateTimes.nowUtc() + .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), maxTaskStatuses, datasource ) ); + return taskInfoImmutableList.stream() + .map(t -> TaskInfo.createTaskInfoWithGroupId( + t.getId(), + t.getTask().getGroupId(), + t.getCreatedTime(), + t.getStatus(), + t.getDataSource(), + t.getTask() + )) + .collect(Collectors.toList()); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index b1539589ec99..d5c46565ee20 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -264,6 +264,7 @@ public Response getTaskStatus(@PathParam("taskid") String taskid) workItem.getTaskId(), new TaskStatusPlus( taskInfo.getId(), + taskInfo.getGroupId(), taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), taskInfo.getCreatedTime(), // Would be nice to include the real queue insertion time, but the @@ -285,6 +286,7 @@ public Response getTaskStatus(@PathParam("taskid") String taskid) taskid, new TaskStatusPlus( taskInfo.getId(), + taskInfo.getGroupId(), taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), taskInfo.getCreatedTime(), // Would be nice to include the real queue insertion time, but the @@ -577,6 +579,7 @@ public Response getTasks( List finalTaskList = new ArrayList<>(); Function activeTaskTransformFunc = workItem -> new TaskStatusPlus( workItem.getTaskId(), + workItem.getTaskGroupId(), workItem.getTaskType(), workItem.getCreatedTime(), workItem.getQueueInsertionTime(), @@ -590,6 +593,7 @@ public Response getTasks( Function, TaskStatusPlus> completeTaskTransformFunc = taskInfo -> new TaskStatusPlus( taskInfo.getId(), + taskInfo.getGroupId(), taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), taskInfo.getCreatedTime(), // Would be nice to include the real queue insertion time, but the @@ -626,6 +630,7 @@ public Response getTasks( allActiveTasks.add( new AnyTask( task.getId(), + task.getGroupId(), task.getTask() == null ? null : task.getTask().getType(), SettableFuture.create(), task.getDataSource(), @@ -990,6 +995,7 @@ private List securedTaskStatusPlus( private static class AnyTask extends TaskRunnerWorkItem { + private final String taskGroupId; private final String taskType; private final String dataSource; private final TaskState taskState; @@ -1000,6 +1006,7 @@ private static class AnyTask extends TaskRunnerWorkItem AnyTask( String taskId, + String taskGroupId, String taskType, ListenableFuture result, String dataSource, @@ -1011,6 +1018,7 @@ private static class AnyTask extends TaskRunnerWorkItem ) { super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH); + this.taskGroupId = taskGroupId; this.taskType = taskType; this.dataSource = dataSource; this.taskState = state; @@ -1038,6 +1046,11 @@ public String getDataSource() return dataSource; } + public String getTaskGroupId() + { + return taskGroupId; + } + public TaskState getTaskState() { return taskState; @@ -1070,6 +1083,7 @@ public AnyTask withTaskState( { return new AnyTask( getTaskId(), + getTaskGroupId(), getTaskType(), getResult(), getDataSource(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index f780428f7021..43ab87e73898 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; +import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListeningExecutorService; @@ -50,6 +51,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.NoopDataSegmentKiller; @@ -116,6 +118,12 @@ class LocalIndexingServiceClient extends NoopIndexingServiceClient public String runTask(Object taskObject) { final ParallelIndexSubTask subTask = (ParallelIndexSubTask) taskObject; + try { + getTaskStorage().insert(subTask, TaskStatus.running(subTask.getId())); + } + catch (EntryExistsException e) { + throw new RuntimeException(e); + } tasks.put(subTask.getId(), service.submit(() -> { try { final TaskToolbox toolbox = createTaskToolbox(subTask); @@ -136,6 +144,8 @@ public String runTask(Object taskObject) public TaskStatusResponse getTaskStatus(String taskId) { final Future taskStatusFuture = tasks.get(taskId); + final Optional task = getTaskStorage().getTask(taskId); + final String groupId = task.isPresent() ? task.orNull().getGroupId() : taskId; if (taskStatusFuture != null) { try { if (taskStatusFuture.isDone()) { @@ -144,6 +154,7 @@ public TaskStatusResponse getTaskStatus(String taskId) taskId, new TaskStatusPlus( taskId, + groupId, "index_sub", DateTimes.EPOCH, DateTimes.EPOCH, @@ -160,6 +171,7 @@ public TaskStatusResponse getTaskStatus(String taskId) taskId, new TaskStatusPlus( taskId, + groupId, "index_sub", DateTimes.EPOCH, DateTimes.EPOCH, @@ -181,6 +193,7 @@ public TaskStatusResponse getTaskStatus(String taskId) taskId, new TaskStatusPlus( taskId, + groupId, "index_sub", DateTimes.EPOCH, DateTimes.EPOCH, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index f2b151e2521a..70d1a7d7bf52 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -607,6 +607,7 @@ public ParallelIndexSubTask newSubTask(int numAttempts) getId(), new TaskStatusPlus( subTask.getId(), + subTask.getGroupId(), subTask.getType(), DateTimes.EPOCH, DateTimes.EPOCH, @@ -706,6 +707,7 @@ void setState(TaskState state) taskHistories.computeIfAbsent(specId, k -> new ArrayList<>()).add( new TaskStatusPlus( getId(), + getGroupId(), getType(), DateTimes.EPOCH, DateTimes.EPOCH, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java index e5f85cb5dc99..5eab6aacb0f5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java @@ -195,6 +195,7 @@ public TaskStatusResponse getTaskStatus(String taskId) return new TaskStatusResponse( taskId, new TaskStatusPlus( + taskId, taskId, "testTask", DateTimes.EPOCH, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java index 8cb45ade0f09..8e5699fed2b3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java @@ -61,15 +61,17 @@ public void setup() public void testDeletePendingSegments() { final List> taskInfos = ImmutableList.of( - new TaskInfo<>( + TaskInfo.createTaskInfoWithGroupId( "id1", + "group_id1", DateTimes.of("2017-12-01"), TaskStatus.running("id1"), "dataSource", NoopTask.create("id1", 0) ), - new TaskInfo<>( + TaskInfo.createTaskInfoWithGroupId( "id1", + "group_id1", DateTimes.of("2017-12-02"), TaskStatus.running("id2"), "dataSource", @@ -93,15 +95,17 @@ public void testDeletePendingSegments() public void testDeletePendingSegmentsOfRunningTasks() { final ImmutableList> taskInfos = ImmutableList.of( - new TaskInfo<>( + TaskInfo.createTaskInfoWithGroupId( "id1", + "group_id1", DateTimes.of("2017-11-01"), TaskStatus.running("id1"), "dataSource", NoopTask.create("id1", 0) ), - new TaskInfo<>( + TaskInfo.createTaskInfoWithGroupId( "id1", + "group_id1", DateTimes.of("2017-12-02"), TaskStatus.running("id2"), "dataSource", diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 159192562798..185da469106b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -197,29 +197,33 @@ public void testSecuredGetWaitingTask() expectAuthorizationTokenCheck(); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_1", + "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_2", + "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_3", + "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "deny", getTaskWithIdAndDatasource("id_3", "deny") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_4", + "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_4"), "deny", @@ -264,22 +268,25 @@ public void testSecuredGetCompleteTasks() EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_1", + "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "deny", getTaskWithIdAndDatasource("id_1", "deny") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_2", + "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_3", + "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "allow", @@ -318,15 +325,17 @@ public void testSecuredGetRunningTasks() ); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_1", + "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "deny", getTaskWithIdAndDatasource("id_1", "deny") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_2", + "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", @@ -358,22 +367,25 @@ public void testGetTasks() //completed tasks EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_5", + "groupId_5", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_5"), "deny", getTaskWithIdAndDatasource("id_5", "deny") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_6", + "groupId_6", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_6"), "allow", getTaskWithIdAndDatasource("id_6", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_7", + "groupId_7", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_7"), "allow", @@ -384,29 +396,33 @@ public void testGetTasks() //active tasks EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_1", + "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_2", + "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_3", + "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "deny", getTaskWithIdAndDatasource("id_3", "deny") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_4", + "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_4"), "deny", @@ -456,22 +472,25 @@ public void testGetTasksFilterDataSource() EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, "allow")) .andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_5", + "groupId_5", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_5"), "allow", getTaskWithIdAndDatasource("id_5", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_6", + "groupId_6", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_6"), "allow", getTaskWithIdAndDatasource("id_6", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_7", + "groupId_7", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_7"), "allow", @@ -482,29 +501,33 @@ public void testGetTasksFilterDataSource() //active tasks EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_1", + "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_2", + "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_3", + "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", getTaskWithIdAndDatasource("id_3", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_4", + "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_4"), "allow", @@ -553,29 +576,33 @@ public void testGetTasksFilterWaitingState() //active tasks EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_1", + "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_2", + "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_3", + "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "deny", getTaskWithIdAndDatasource("id_3", "deny") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_4", + "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_4"), "deny", @@ -618,29 +645,33 @@ public void testGetTasksFilterRunningState() expectAuthorizationTokenCheck(); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_1", + "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_2", + "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_3", + "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "allow", getTaskWithIdAndDatasource("id_3", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_4", + "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_4"), "deny", @@ -691,29 +722,33 @@ public void testGetTasksFilterPendingState() ); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_1", + "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "deny", getTaskWithIdAndDatasource("id_1", "deny") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_2", + "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_3", + "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "allow", getTaskWithIdAndDatasource("id_3", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_4", + "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_4"), "deny", @@ -748,22 +783,25 @@ public void testGetTasksFilterCompleteState() expectAuthorizationTokenCheck(); EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_1", + "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_2", + "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "deny", getTaskWithIdAndDatasource("id_2", "deny") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_3", + "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "allow", @@ -796,22 +834,25 @@ public void testGetTasksFilterCompleteStateWithInterval() EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, duration, null)) .andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_1", + "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "deny", getTaskWithIdAndDatasource("id_1", "deny") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_2", + "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_3", + "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "allow", @@ -843,22 +884,25 @@ public void testGetNullCompleteTask() expectAuthorizationTokenCheck(); EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_1", + "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", null ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_2", + "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "deny", getTaskWithIdAndDatasource("id_2", "deny") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_3", + "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "allow", @@ -1001,7 +1045,14 @@ public void testGetTaskStatus() throws Exception final TaskStatus status = TaskStatus.running("mytask"); EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("mytask")) - .andReturn(new TaskInfo<>(task.getId(), DateTimes.of("2018-01-01"), status, task.getDataSource(), task)); + .andReturn(TaskInfo.createTaskInfoWithGroupId( + task.getId(), + task.getGroupId(), + DateTimes.of("2018-01-01"), + status, + task.getDataSource(), + task + )); EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("othertask")) .andReturn(null); @@ -1029,6 +1080,7 @@ public void testGetTaskStatus() throws Exception new TaskStatusResponse( "mytask", new TaskStatusPlus( + "mytask", "mytask", "noop", DateTimes.of("2018-01-01"), @@ -1100,15 +1152,17 @@ public void testShutdownAllTasks() Optional.of(mockQueue) ).anyTimes(); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of( - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_1", + "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "datasource", getTaskWithIdAndDatasource("id_1", "datasource") ), - new TaskInfo( + TaskInfo.createTaskInfoWithGroupId( "id_2", + "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "datasource", diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index acdbbd0f2c1a..cf4c8f7bed4a 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -370,7 +370,7 @@ public TaskInfo map(int index, ResultSet resultSet, State log.error(e, "Encountered exception while deserializing task status_payload"); throw new SQLException(e); } - taskInfo = new TaskInfo<>( + taskInfo = TaskInfo.createTaskInfo( resultSet.getString("id"), DateTimes.of(resultSet.getString("created_date")), status, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 0b171bdc6feb..08f768da6de2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -165,6 +165,7 @@ public class SystemSchema extends AbstractSchema static final RowSignature TASKS_SIGNATURE = RowSignature .builder() .add("task_id", ValueType.STRING) + .add("group_id", ValueType.STRING) .add("type", ValueType.STRING) .add("datasource", ValueType.STRING) .add("created_time", ValueType.STRING) @@ -649,6 +650,7 @@ public Object[] current() } return new Object[]{ task.getId(), + task.getGroupId(), task.getType(), task.getDataSource(), toStringOrNull(task.getCreatedTime()), diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 4f0b519a3716..c63c76a4f136 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -466,7 +466,7 @@ public void testGetTableMap() final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks"); final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl()); final List sysFields = sysRowType.getFieldList(); - Assert.assertEquals(13, sysFields.size()); + Assert.assertEquals(14, sysFields.size()); Assert.assertEquals("task_id", sysFields.get(0).getName()); Assert.assertEquals(SqlTypeName.VARCHAR, sysFields.get(0).getType().getSqlTypeName()); @@ -1012,6 +1012,7 @@ public void testTasksTable() throws Exception String json = "[{\n" + "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n" + + "\t\"groupId\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n" + "\t\"type\": \"index\",\n" + "\t\"createdTime\": \"2018-09-20T22:33:44.922Z\",\n" + "\t\"queueInsertionTime\": \"1970-01-01T00:00:00.000Z\",\n" @@ -1027,6 +1028,7 @@ public void testTasksTable() throws Exception + "\t\"errorMsg\": null\n" + "}, {\n" + "\t\"id\": \"index_wikipedia_2018-09-21T18:38:47.773Z\",\n" + + "\t\"groupId\": \"index_wikipedia_2018-09-21T18:38:47.773Z\",\n" + "\t\"type\": \"index\",\n" + "\t\"createdTime\": \"2018-09-21T18:38:47.873Z\",\n" + "\t\"queueInsertionTime\": \"2018-09-21T18:38:47.910Z\",\n" @@ -1077,17 +1079,27 @@ public Object get(String name) Object[] row0 = rows.get(0); Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row0[0].toString()); - Assert.assertEquals("FAILED", row0[5].toString()); - Assert.assertEquals("NONE", row0[6].toString()); - Assert.assertEquals(-1L, row0[7]); - Assert.assertEquals("testHost:1234", row0[8]); + Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row0[1].toString()); + Assert.assertEquals("index", row0[2].toString()); + Assert.assertEquals("wikipedia", row0[3].toString()); + Assert.assertEquals("2018-09-20T22:33:44.922Z", row0[4].toString()); + Assert.assertEquals("1970-01-01T00:00:00.000Z", row0[5].toString()); + Assert.assertEquals("FAILED", row0[6].toString()); + Assert.assertEquals("NONE", row0[7].toString()); + Assert.assertEquals(-1L, row0[8]); + Assert.assertEquals("testHost:1234", row0[9]); Object[] row1 = rows.get(1); Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row1[0].toString()); - Assert.assertEquals("RUNNING", row1[5].toString()); + Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row1[1].toString()); + Assert.assertEquals("index", row1[2].toString()); + Assert.assertEquals("wikipedia", row1[3].toString()); + Assert.assertEquals("2018-09-21T18:38:47.873Z", row1[4].toString()); + Assert.assertEquals("2018-09-21T18:38:47.910Z", row1[5].toString()); Assert.assertEquals("RUNNING", row1[6].toString()); - Assert.assertEquals(0L, row1[7]); - Assert.assertEquals("192.168.1.6:8100", row1[8]); + Assert.assertEquals("RUNNING", row1[7].toString()); + Assert.assertEquals(0L, row1[8]); + Assert.assertEquals("192.168.1.6:8100", row1[9]); // Verify value types. verifyTypes(rows, SystemSchema.TASKS_SIGNATURE); From f0446109876d7d1d8d5e0aee4d05608b4b8b3132 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 14 Aug 2019 16:45:24 -0700 Subject: [PATCH 2/9] adjust test --- core/src/main/java/org/apache/druid/indexer/TaskInfo.java | 2 +- .../apache/druid/sql/calcite/schema/SystemSchemaTest.java | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/druid/indexer/TaskInfo.java b/core/src/main/java/org/apache/druid/indexer/TaskInfo.java index 4a03ba089134..6957b36a4b62 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskInfo.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskInfo.java @@ -30,12 +30,12 @@ public class TaskInfo { private final String id; - private String groupId; private final DateTime createdTime; private final StatusType status; private final String dataSource; @Nullable private final EntryType task; + private String groupId; private TaskInfo( String id, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index c63c76a4f136..c8d6f75ec869 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -1088,6 +1088,10 @@ public Object get(String name) Assert.assertEquals("NONE", row0[7].toString()); Assert.assertEquals(-1L, row0[8]); Assert.assertEquals("testHost:1234", row0[9]); + Assert.assertEquals("testHost", row0[10]); + Assert.assertEquals(1234L, row0[11]); + Assert.assertEquals(-1L, row0[12]); + Assert.assertEquals(null, row0[13]); Object[] row1 = rows.get(1); Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row1[0].toString()); @@ -1100,6 +1104,10 @@ public Object get(String name) Assert.assertEquals("RUNNING", row1[7].toString()); Assert.assertEquals(0L, row1[8]); Assert.assertEquals("192.168.1.6:8100", row1[9]); + Assert.assertEquals("192.168.1.6", row1[10]); + Assert.assertEquals(8100L, row1[11]); + Assert.assertEquals(-1L, row1[12]); + Assert.assertEquals(null, row1[13]); // Verify value types. verifyTypes(rows, SystemSchema.TASKS_SIGNATURE); From 777b7847534642e6e7d957642b87142bac625d9d Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 14 Aug 2019 17:12:32 -0700 Subject: [PATCH 3/9] modify docs --- docs/content/querying/sql.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 21d6edf4f5e8..94fe0c86bc77 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -746,6 +746,7 @@ check out [ingestion tasks](#../ingestion/tasks.html) |Column|Type|Notes| |------|-----|-----| |task_id|STRING|Unique task identifier| +|group_id|STRING|Task group ID for this task, for sub tasks, this value is the parent task's ID| |type|STRING|Task type, for example this value is "index" for indexing tasks. See [tasks-overview](../ingestion/tasks.html)| |datasource|STRING|Datasource name being indexed| |created_time|STRING|Timestamp in ISO8601 format corresponding to when the ingestion task was created. Note that this value is populated for completed and waiting tasks. For running and pending tasks this value is set to 1970-01-01T00:00:00Z| From 70aa55b8c5799b9b10a818cda7438cf4a4382964 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 15 Aug 2019 15:49:00 -0700 Subject: [PATCH 4/9] Make groupId nullable --- .../org/apache/druid/indexer/TaskInfo.java | 46 +-------- .../apache/druid/indexer/TaskStatusPlus.java | 8 +- .../overlord/HeapMemoryTaskStorage.java | 8 +- .../overlord/MetadataTaskStorage.java | 12 +-- .../IndexerMetadataStorageAdapterTest.java | 8 +- .../overlord/http/OverlordResourceTest.java | 94 +++++++++---------- .../SQLMetadataStorageActionHandler.java | 3 +- 7 files changed, 73 insertions(+), 106 deletions(-) diff --git a/core/src/main/java/org/apache/druid/indexer/TaskInfo.java b/core/src/main/java/org/apache/druid/indexer/TaskInfo.java index 6957b36a4b62..80222661eeb5 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskInfo.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskInfo.java @@ -35,24 +35,10 @@ public class TaskInfo private final String dataSource; @Nullable private final EntryType task; - private String groupId; - - private TaskInfo( - String id, - DateTime createdTime, - StatusType status, - String dataSource, - @Nullable EntryType task - ) - { - this.id = Preconditions.checkNotNull(id, "id"); - this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime"); - this.status = Preconditions.checkNotNull(status, "status"); - this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); - this.task = task; - } + @Nullable + private final String groupId; - private TaskInfo( + public TaskInfo( String id, String groupId, DateTime createdTime, @@ -62,34 +48,11 @@ private TaskInfo( ) { this.id = Preconditions.checkNotNull(id, "id"); - this.groupId = Preconditions.checkNotNull(groupId, "groupId"); this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime"); this.status = Preconditions.checkNotNull(status, "status"); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.task = task; - } - - public static TaskInfo createTaskInfo( - String id, - DateTime createdTime, - StatusType status, - String dataSource, - @Nullable EntryType task - ) - { - return new TaskInfo(id, createdTime, status, dataSource, task); - } - - public static TaskInfo createTaskInfoWithGroupId( - String id, - String groupId, - DateTime createdTime, - StatusType status, - String dataSource, - @Nullable EntryType task - ) - { - return new TaskInfo(id, groupId, createdTime, status, dataSource, task); + this.groupId = groupId; } public String getId() @@ -97,6 +60,7 @@ public String getId() return id; } + @Nullable public String getGroupId() { return groupId; diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java index 789e5b8deb44..a9d3c12a53e8 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java @@ -31,7 +31,6 @@ public class TaskStatusPlus { private final String id; - private final String groupId; private final String type; private final DateTime createdTime; private final DateTime queueInsertionTime; @@ -43,10 +42,12 @@ public class TaskStatusPlus @Nullable private final String errorMsg; + @Nullable + private final String groupId; public TaskStatusPlus( String id, - String groupId, + @Nullable String groupId, String type, // nullable for backward compatibility DateTime createdTime, DateTime queueInsertionTime, @@ -94,7 +95,7 @@ public TaskStatusPlus( Preconditions.checkNotNull(duration, "duration"); } this.id = Preconditions.checkNotNull(id, "id"); - this.groupId = Preconditions.checkNotNull(groupId, "groupId"); + this.groupId = groupId; this.type = type; this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime"); this.queueInsertionTime = Preconditions.checkNotNull(queueInsertionTime, "queueInsertionTime"); @@ -122,6 +123,7 @@ public String getId() return id; } + @Nullable @JsonProperty public String getGroupId() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index 6ba5b26e5b94..1e4f453073d9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -159,7 +159,7 @@ public TaskInfo getTaskInfo(String taskId) Preconditions.checkNotNull(taskId, "taskId"); final TaskStuff taskStuff = tasks.get(taskId); if (taskStuff != null) { - return TaskInfo.createTaskInfoWithGroupId( + return new TaskInfo<>( taskStuff.getTask().getId(), taskStuff.getTask().getGroupId(), taskStuff.getCreatedDate(), @@ -204,7 +204,7 @@ public List> getActiveTaskInfo(@Nullable String dataS final ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (final TaskStuff taskStuff : tasks.values()) { if (taskStuff.getStatus().isRunnable()) { - TaskInfo t = TaskInfo.createTaskInfoWithGroupId( + TaskInfo t = new TaskInfo<>( taskStuff.getTask().getId(), taskStuff.getTask().getGroupId(), taskStuff.getCreatedDate(), @@ -269,7 +269,7 @@ private List> getRecentlyCreatedAlreadyFinishedTaskIn final ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (final TaskStuff taskStuff : list) { String id = taskStuff.getTask().getId(); - TaskInfo t = TaskInfo.createTaskInfoWithGroupId( + TaskInfo t = new TaskInfo<>( id, taskStuff.getTask().getGroupId(), taskStuff.getCreatedDate(), @@ -300,7 +300,7 @@ private List> getNRecentlyCreatedAlreadyFinishedTaskI final ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (final TaskStuff taskStuff : list) { String id = taskStuff.getTask().getId(); - TaskInfo t = TaskInfo.createTaskInfoWithGroupId( + TaskInfo t = new TaskInfo<>( id, taskStuff.getTask().getGroupId(), taskStuff.getCreatedDate(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 21ad346413a1..423bd1281d84 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -184,9 +184,9 @@ public Optional getStatus(final String taskId) public TaskInfo getTaskInfo(String taskId) { final TaskInfo taskInfo = handler.getTaskInfo(taskId); - return TaskInfo.createTaskInfoWithGroupId( + return new TaskInfo<>( taskInfo.getId(), - taskInfo.getTask().getGroupId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), taskInfo.getCreatedTime(), taskInfo.getStatus(), taskInfo.getDataSource(), @@ -213,9 +213,9 @@ public List> getActiveTaskInfo(@Nullable String dataS handler.getActiveTaskInfo(dataSource) ); return taskInfoImmutableList.stream() - .map(t -> TaskInfo.createTaskInfoWithGroupId( + .map(t -> new TaskInfo<>( t.getId(), - t.getTask().getGroupId(), + t.getTask() == null ? null : t.getTask().getGroupId(), t.getCreatedTime(), t.getStatus(), t.getDataSource(), @@ -240,9 +240,9 @@ public List> getRecentlyCreatedAlreadyFinishedTaskInf ) ); return taskInfoImmutableList.stream() - .map(t -> TaskInfo.createTaskInfoWithGroupId( + .map(t -> new TaskInfo<>( t.getId(), - t.getTask().getGroupId(), + t.getTask() == null ? null : t.getTask().getGroupId(), t.getCreatedTime(), t.getStatus(), t.getDataSource(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java index 8e5699fed2b3..71f4cd175d6e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java @@ -61,7 +61,7 @@ public void setup() public void testDeletePendingSegments() { final List> taskInfos = ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo<>( "id1", "group_id1", DateTimes.of("2017-12-01"), @@ -69,7 +69,7 @@ public void testDeletePendingSegments() "dataSource", NoopTask.create("id1", 0) ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo<>( "id1", "group_id1", DateTimes.of("2017-12-02"), @@ -95,7 +95,7 @@ public void testDeletePendingSegments() public void testDeletePendingSegmentsOfRunningTasks() { final ImmutableList> taskInfos = ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo<>( "id1", "group_id1", DateTimes.of("2017-11-01"), @@ -103,7 +103,7 @@ public void testDeletePendingSegmentsOfRunningTasks() "dataSource", NoopTask.create("id1", 0) ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo<>( "id1", "group_id1", DateTimes.of("2017-12-02"), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 185da469106b..354710a6d88a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -197,7 +197,7 @@ public void testSecuredGetWaitingTask() expectAuthorizationTokenCheck(); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_1", "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -205,7 +205,7 @@ public void testSecuredGetWaitingTask() "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_2", "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), @@ -213,7 +213,7 @@ public void testSecuredGetWaitingTask() "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_3", "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), @@ -221,7 +221,7 @@ public void testSecuredGetWaitingTask() "deny", getTaskWithIdAndDatasource("id_3", "deny") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_4", "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), @@ -268,7 +268,7 @@ public void testSecuredGetCompleteTasks() EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_1", "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -276,7 +276,7 @@ public void testSecuredGetCompleteTasks() "deny", getTaskWithIdAndDatasource("id_1", "deny") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_2", "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), @@ -284,7 +284,7 @@ public void testSecuredGetCompleteTasks() "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_3", "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), @@ -325,7 +325,7 @@ public void testSecuredGetRunningTasks() ); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_1", "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -333,7 +333,7 @@ public void testSecuredGetRunningTasks() "deny", getTaskWithIdAndDatasource("id_1", "deny") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_2", "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), @@ -367,7 +367,7 @@ public void testGetTasks() //completed tasks EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_5", "groupId_5", DateTime.now(ISOChronology.getInstanceUTC()), @@ -375,7 +375,7 @@ public void testGetTasks() "deny", getTaskWithIdAndDatasource("id_5", "deny") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_6", "groupId_6", DateTime.now(ISOChronology.getInstanceUTC()), @@ -383,7 +383,7 @@ public void testGetTasks() "allow", getTaskWithIdAndDatasource("id_6", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_7", "groupId_7", DateTime.now(ISOChronology.getInstanceUTC()), @@ -396,7 +396,7 @@ public void testGetTasks() //active tasks EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_1", "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -404,7 +404,7 @@ public void testGetTasks() "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_2", "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), @@ -412,7 +412,7 @@ public void testGetTasks() "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_3", "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), @@ -420,7 +420,7 @@ public void testGetTasks() "deny", getTaskWithIdAndDatasource("id_3", "deny") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_4", "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), @@ -472,7 +472,7 @@ public void testGetTasksFilterDataSource() EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, "allow")) .andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_5", "groupId_5", DateTime.now(ISOChronology.getInstanceUTC()), @@ -480,7 +480,7 @@ public void testGetTasksFilterDataSource() "allow", getTaskWithIdAndDatasource("id_5", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_6", "groupId_6", DateTime.now(ISOChronology.getInstanceUTC()), @@ -488,7 +488,7 @@ public void testGetTasksFilterDataSource() "allow", getTaskWithIdAndDatasource("id_6", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_7", "groupId_7", DateTime.now(ISOChronology.getInstanceUTC()), @@ -501,7 +501,7 @@ public void testGetTasksFilterDataSource() //active tasks EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_1", "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -509,7 +509,7 @@ public void testGetTasksFilterDataSource() "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_2", "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), @@ -517,7 +517,7 @@ public void testGetTasksFilterDataSource() "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_3", "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), @@ -525,7 +525,7 @@ public void testGetTasksFilterDataSource() "allow", getTaskWithIdAndDatasource("id_3", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_4", "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), @@ -576,7 +576,7 @@ public void testGetTasksFilterWaitingState() //active tasks EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_1", "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -584,7 +584,7 @@ public void testGetTasksFilterWaitingState() "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_2", "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), @@ -592,7 +592,7 @@ public void testGetTasksFilterWaitingState() "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_3", "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), @@ -600,7 +600,7 @@ public void testGetTasksFilterWaitingState() "deny", getTaskWithIdAndDatasource("id_3", "deny") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_4", "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), @@ -645,7 +645,7 @@ public void testGetTasksFilterRunningState() expectAuthorizationTokenCheck(); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_1", "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -653,7 +653,7 @@ public void testGetTasksFilterRunningState() "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_2", "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), @@ -661,7 +661,7 @@ public void testGetTasksFilterRunningState() "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_3", "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), @@ -669,7 +669,7 @@ public void testGetTasksFilterRunningState() "allow", getTaskWithIdAndDatasource("id_3", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_4", "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), @@ -722,7 +722,7 @@ public void testGetTasksFilterPendingState() ); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_1", "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -730,7 +730,7 @@ public void testGetTasksFilterPendingState() "deny", getTaskWithIdAndDatasource("id_1", "deny") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_2", "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), @@ -738,7 +738,7 @@ public void testGetTasksFilterPendingState() "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_3", "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), @@ -746,7 +746,7 @@ public void testGetTasksFilterPendingState() "allow", getTaskWithIdAndDatasource("id_3", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_4", "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), @@ -783,7 +783,7 @@ public void testGetTasksFilterCompleteState() expectAuthorizationTokenCheck(); EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_1", "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -791,7 +791,7 @@ public void testGetTasksFilterCompleteState() "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_2", "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), @@ -799,7 +799,7 @@ public void testGetTasksFilterCompleteState() "deny", getTaskWithIdAndDatasource("id_2", "deny") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_3", "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), @@ -834,7 +834,7 @@ public void testGetTasksFilterCompleteStateWithInterval() EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, duration, null)) .andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_1", "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -842,7 +842,7 @@ public void testGetTasksFilterCompleteStateWithInterval() "deny", getTaskWithIdAndDatasource("id_1", "deny") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_2", "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), @@ -850,7 +850,7 @@ public void testGetTasksFilterCompleteStateWithInterval() "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_3", "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), @@ -884,7 +884,7 @@ public void testGetNullCompleteTask() expectAuthorizationTokenCheck(); EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_1", "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -892,7 +892,7 @@ public void testGetNullCompleteTask() "allow", null ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_2", "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), @@ -900,7 +900,7 @@ public void testGetNullCompleteTask() "deny", getTaskWithIdAndDatasource("id_2", "deny") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_3", "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), @@ -1045,7 +1045,7 @@ public void testGetTaskStatus() throws Exception final TaskStatus status = TaskStatus.running("mytask"); EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("mytask")) - .andReturn(TaskInfo.createTaskInfoWithGroupId( + .andReturn(new TaskInfo( task.getId(), task.getGroupId(), DateTimes.of("2018-01-01"), @@ -1152,7 +1152,7 @@ public void testShutdownAllTasks() Optional.of(mockQueue) ).anyTimes(); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of( - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_1", "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), @@ -1160,7 +1160,7 @@ public void testShutdownAllTasks() "datasource", getTaskWithIdAndDatasource("id_1", "datasource") ), - TaskInfo.createTaskInfoWithGroupId( + new TaskInfo( "id_2", "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index cf4c8f7bed4a..aeb42d82e2c3 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -370,8 +370,9 @@ public TaskInfo map(int index, ResultSet resultSet, State log.error(e, "Encountered exception while deserializing task status_payload"); throw new SQLException(e); } - taskInfo = TaskInfo.createTaskInfo( + taskInfo = new TaskInfo<>( resultSet.getString("id"), + null, DateTimes.of(resultSet.getString("created_date")), status, resultSet.getString("datasource"), From cc1b654ecb3a2e7be463bc293d5655f0b9dbcc78 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 15 Aug 2019 17:13:19 -0700 Subject: [PATCH 5/9] fix integration test --- .../src/test/resources/results/auth_test_sys_schema_tasks.json | 1 + 1 file changed, 1 insertion(+) diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json index d27d7661eb52..3f0e48fca71e 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json @@ -2,6 +2,7 @@ { "task_id": "index_auth_test_2030-04-30T01:13:31.893Z", "type": null, + "group_id": null, "datasource": "auth_test", "created_time": "2030-04-30T01:13:31.893Z", "queue_insertion_time": "1970-01-01T00:00:00.000Z", From 5228f38f1dcb19c7c3ea923fc4583a86c7c9fc0a Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 19 Aug 2019 10:50:06 -0700 Subject: [PATCH 6/9] fix toString --- core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java index a9d3c12a53e8..bae8ce8bef00 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java @@ -241,7 +241,7 @@ public String toString() { return "TaskStatusPlus{" + "id='" + id + '\'' + - "groupId='" + groupId + '\'' + + ", groupId='" + groupId + '\'' + ", type='" + type + '\'' + ", createdTime=" + createdTime + ", queueInsertionTime=" + queueInsertionTime + From 5238706afef49fb9cf62799d6c5ae9e8192f6ed9 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 20 Aug 2019 23:50:10 -0700 Subject: [PATCH 7/9] Remove groupId from TaskInfo --- .../org/apache/druid/indexer/TaskInfo.java | 10 ---- .../apache/druid/indexer/TaskStatusPlus.java | 2 +- .../overlord/HeapMemoryTaskStorage.java | 4 -- .../overlord/MetadataTaskStorage.java | 34 ++------------ .../overlord/http/OverlordResource.java | 8 ++-- .../IndexerMetadataStorageAdapterTest.java | 4 -- .../overlord/http/OverlordResourceTest.java | 47 ------------------- .../SQLMetadataStorageActionHandler.java | 1 - 8 files changed, 8 insertions(+), 102 deletions(-) diff --git a/core/src/main/java/org/apache/druid/indexer/TaskInfo.java b/core/src/main/java/org/apache/druid/indexer/TaskInfo.java index 80222661eeb5..411d87af6a90 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskInfo.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskInfo.java @@ -35,12 +35,9 @@ public class TaskInfo private final String dataSource; @Nullable private final EntryType task; - @Nullable - private final String groupId; public TaskInfo( String id, - String groupId, DateTime createdTime, StatusType status, String dataSource, @@ -52,7 +49,6 @@ public TaskInfo( this.status = Preconditions.checkNotNull(status, "status"); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.task = task; - this.groupId = groupId; } public String getId() @@ -60,12 +56,6 @@ public String getId() return id; } - @Nullable - public String getGroupId() - { - return groupId; - } - public DateTime getCreatedTime() { return createdTime; diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java index bae8ce8bef00..23d36a688c9d 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java @@ -78,7 +78,7 @@ public TaskStatusPlus( @JsonCreator public TaskStatusPlus( @JsonProperty("id") String id, - @JsonProperty("groupId") String groupId, + @JsonProperty("groupId") @Nullable String groupId, @JsonProperty("type") @Nullable String type, // nullable for backward compatibility @JsonProperty("createdTime") DateTime createdTime, @JsonProperty("queueInsertionTime") DateTime queueInsertionTime, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index 1e4f453073d9..fe22f82be19e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -161,7 +161,6 @@ public TaskInfo getTaskInfo(String taskId) if (taskStuff != null) { return new TaskInfo<>( taskStuff.getTask().getId(), - taskStuff.getTask().getGroupId(), taskStuff.getCreatedDate(), taskStuff.getStatus(), taskStuff.getDataSource(), @@ -206,7 +205,6 @@ public List> getActiveTaskInfo(@Nullable String dataS if (taskStuff.getStatus().isRunnable()) { TaskInfo t = new TaskInfo<>( taskStuff.getTask().getId(), - taskStuff.getTask().getGroupId(), taskStuff.getCreatedDate(), taskStuff.getStatus(), taskStuff.getDataSource(), @@ -271,7 +269,6 @@ private List> getRecentlyCreatedAlreadyFinishedTaskIn String id = taskStuff.getTask().getId(); TaskInfo t = new TaskInfo<>( id, - taskStuff.getTask().getGroupId(), taskStuff.getCreatedDate(), taskStuff.getStatus(), taskStuff.getDataSource(), @@ -302,7 +299,6 @@ private List> getNRecentlyCreatedAlreadyFinishedTaskI String id = taskStuff.getTask().getId(); TaskInfo t = new TaskInfo<>( id, - taskStuff.getTask().getGroupId(), taskStuff.getCreatedDate(), taskStuff.getStatus(), taskStuff.getDataSource(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 423bd1281d84..186aa18224e3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -183,15 +183,7 @@ public Optional getStatus(final String taskId) @Override public TaskInfo getTaskInfo(String taskId) { - final TaskInfo taskInfo = handler.getTaskInfo(taskId); - return new TaskInfo<>( - taskInfo.getId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), - taskInfo.getCreatedTime(), - taskInfo.getStatus(), - taskInfo.getDataSource(), - taskInfo.getTask() - ); + return handler.getTaskInfo(taskId); } @Override @@ -209,19 +201,9 @@ public List getActiveTasks() @Override public List> getActiveTaskInfo(@Nullable String dataSource) { - final List> taskInfoImmutableList = ImmutableList.copyOf( + return ImmutableList.copyOf( handler.getActiveTaskInfo(dataSource) ); - return taskInfoImmutableList.stream() - .map(t -> new TaskInfo<>( - t.getId(), - t.getTask() == null ? null : t.getTask().getGroupId(), - t.getCreatedTime(), - t.getStatus(), - t.getDataSource(), - t.getTask() - )) - .collect(Collectors.toList()); } @Override @@ -231,7 +213,7 @@ public List> getRecentlyCreatedAlreadyFinishedTaskInf @Nullable String datasource ) { - final List> taskInfoImmutableList = ImmutableList.copyOf( + return ImmutableList.copyOf( handler.getCompletedTaskInfo( DateTimes.nowUtc() .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), @@ -239,16 +221,6 @@ public List> getRecentlyCreatedAlreadyFinishedTaskInf datasource ) ); - return taskInfoImmutableList.stream() - .map(t -> new TaskInfo<>( - t.getId(), - t.getTask() == null ? null : t.getTask().getGroupId(), - t.getCreatedTime(), - t.getStatus(), - t.getDataSource(), - t.getTask() - )) - .collect(Collectors.toList()); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 51073f4a2566..6226abd68bcb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -264,7 +264,7 @@ public Response getTaskStatus(@PathParam("taskid") String taskid) workItem.getTaskId(), new TaskStatusPlus( taskInfo.getId(), - taskInfo.getGroupId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), taskInfo.getCreatedTime(), // Would be nice to include the real queue insertion time, but the @@ -286,7 +286,7 @@ public Response getTaskStatus(@PathParam("taskid") String taskid) taskid, new TaskStatusPlus( taskInfo.getId(), - taskInfo.getGroupId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), taskInfo.getCreatedTime(), // Would be nice to include the real queue insertion time, but the @@ -593,7 +593,7 @@ public Response getTasks( Function, TaskStatusPlus> completeTaskTransformFunc = taskInfo -> new TaskStatusPlus( taskInfo.getId(), - taskInfo.getGroupId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), taskInfo.getCreatedTime(), // Would be nice to include the real queue insertion time, but the @@ -630,7 +630,7 @@ public Response getTasks( allActiveTasks.add( new AnyTask( task.getId(), - task.getGroupId(), + task.getTask() == null ? null : task.getTask().getGroupId(), task.getTask() == null ? null : task.getTask().getType(), SettableFuture.create(), task.getDataSource(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java index 71f4cd175d6e..8cb45ade0f09 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageAdapterTest.java @@ -63,7 +63,6 @@ public void testDeletePendingSegments() final List> taskInfos = ImmutableList.of( new TaskInfo<>( "id1", - "group_id1", DateTimes.of("2017-12-01"), TaskStatus.running("id1"), "dataSource", @@ -71,7 +70,6 @@ public void testDeletePendingSegments() ), new TaskInfo<>( "id1", - "group_id1", DateTimes.of("2017-12-02"), TaskStatus.running("id2"), "dataSource", @@ -97,7 +95,6 @@ public void testDeletePendingSegmentsOfRunningTasks() final ImmutableList> taskInfos = ImmutableList.of( new TaskInfo<>( "id1", - "group_id1", DateTimes.of("2017-11-01"), TaskStatus.running("id1"), "dataSource", @@ -105,7 +102,6 @@ public void testDeletePendingSegmentsOfRunningTasks() ), new TaskInfo<>( "id1", - "group_id1", DateTimes.of("2017-12-02"), TaskStatus.running("id2"), "dataSource", diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 354710a6d88a..a2971cecb888 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -199,7 +199,6 @@ public void testSecuredGetWaitingTask() ImmutableList.of( new TaskInfo( "id_1", - "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", @@ -207,7 +206,6 @@ public void testSecuredGetWaitingTask() ), new TaskInfo( "id_2", - "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", @@ -215,7 +213,6 @@ public void testSecuredGetWaitingTask() ), new TaskInfo( "id_3", - "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "deny", @@ -223,7 +220,6 @@ public void testSecuredGetWaitingTask() ), new TaskInfo( "id_4", - "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_4"), "deny", @@ -270,7 +266,6 @@ public void testSecuredGetCompleteTasks() ImmutableList.of( new TaskInfo( "id_1", - "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "deny", @@ -278,7 +273,6 @@ public void testSecuredGetCompleteTasks() ), new TaskInfo( "id_2", - "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", @@ -286,7 +280,6 @@ public void testSecuredGetCompleteTasks() ), new TaskInfo( "id_3", - "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "allow", @@ -327,7 +320,6 @@ public void testSecuredGetRunningTasks() ImmutableList.of( new TaskInfo( "id_1", - "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "deny", @@ -335,7 +327,6 @@ public void testSecuredGetRunningTasks() ), new TaskInfo( "id_2", - "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", @@ -369,7 +360,6 @@ public void testGetTasks() ImmutableList.of( new TaskInfo( "id_5", - "groupId_5", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_5"), "deny", @@ -377,7 +367,6 @@ public void testGetTasks() ), new TaskInfo( "id_6", - "groupId_6", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_6"), "allow", @@ -385,7 +374,6 @@ public void testGetTasks() ), new TaskInfo( "id_7", - "groupId_7", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_7"), "allow", @@ -398,7 +386,6 @@ public void testGetTasks() ImmutableList.of( new TaskInfo( "id_1", - "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", @@ -406,7 +393,6 @@ public void testGetTasks() ), new TaskInfo( "id_2", - "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", @@ -414,7 +400,6 @@ public void testGetTasks() ), new TaskInfo( "id_3", - "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "deny", @@ -422,7 +407,6 @@ public void testGetTasks() ), new TaskInfo( "id_4", - "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_4"), "deny", @@ -474,7 +458,6 @@ public void testGetTasksFilterDataSource() ImmutableList.of( new TaskInfo( "id_5", - "groupId_5", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_5"), "allow", @@ -482,7 +465,6 @@ public void testGetTasksFilterDataSource() ), new TaskInfo( "id_6", - "groupId_6", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_6"), "allow", @@ -490,7 +472,6 @@ public void testGetTasksFilterDataSource() ), new TaskInfo( "id_7", - "groupId_7", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_7"), "allow", @@ -503,7 +484,6 @@ public void testGetTasksFilterDataSource() ImmutableList.of( new TaskInfo( "id_1", - "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", @@ -511,7 +491,6 @@ public void testGetTasksFilterDataSource() ), new TaskInfo( "id_2", - "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", @@ -519,7 +498,6 @@ public void testGetTasksFilterDataSource() ), new TaskInfo( "id_3", - "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", @@ -527,7 +505,6 @@ public void testGetTasksFilterDataSource() ), new TaskInfo( "id_4", - "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_4"), "allow", @@ -578,7 +555,6 @@ public void testGetTasksFilterWaitingState() ImmutableList.of( new TaskInfo( "id_1", - "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", @@ -586,7 +562,6 @@ public void testGetTasksFilterWaitingState() ), new TaskInfo( "id_2", - "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", @@ -594,7 +569,6 @@ public void testGetTasksFilterWaitingState() ), new TaskInfo( "id_3", - "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "deny", @@ -602,7 +576,6 @@ public void testGetTasksFilterWaitingState() ), new TaskInfo( "id_4", - "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_4"), "deny", @@ -647,7 +620,6 @@ public void testGetTasksFilterRunningState() ImmutableList.of( new TaskInfo( "id_1", - "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", @@ -655,7 +627,6 @@ public void testGetTasksFilterRunningState() ), new TaskInfo( "id_2", - "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", @@ -663,7 +634,6 @@ public void testGetTasksFilterRunningState() ), new TaskInfo( "id_3", - "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "allow", @@ -671,7 +641,6 @@ public void testGetTasksFilterRunningState() ), new TaskInfo( "id_4", - "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_4"), "deny", @@ -724,7 +693,6 @@ public void testGetTasksFilterPendingState() ImmutableList.of( new TaskInfo( "id_1", - "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "deny", @@ -732,7 +700,6 @@ public void testGetTasksFilterPendingState() ), new TaskInfo( "id_2", - "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", @@ -740,7 +707,6 @@ public void testGetTasksFilterPendingState() ), new TaskInfo( "id_3", - "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "allow", @@ -748,7 +714,6 @@ public void testGetTasksFilterPendingState() ), new TaskInfo( "id_4", - "groupId_4", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_4"), "deny", @@ -785,7 +750,6 @@ public void testGetTasksFilterCompleteState() ImmutableList.of( new TaskInfo( "id_1", - "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", @@ -793,7 +757,6 @@ public void testGetTasksFilterCompleteState() ), new TaskInfo( "id_2", - "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "deny", @@ -801,7 +764,6 @@ public void testGetTasksFilterCompleteState() ), new TaskInfo( "id_3", - "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "allow", @@ -836,7 +798,6 @@ public void testGetTasksFilterCompleteStateWithInterval() ImmutableList.of( new TaskInfo( "id_1", - "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "deny", @@ -844,7 +805,6 @@ public void testGetTasksFilterCompleteStateWithInterval() ), new TaskInfo( "id_2", - "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", @@ -852,7 +812,6 @@ public void testGetTasksFilterCompleteStateWithInterval() ), new TaskInfo( "id_3", - "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "allow", @@ -886,7 +845,6 @@ public void testGetNullCompleteTask() ImmutableList.of( new TaskInfo( "id_1", - "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", @@ -894,7 +852,6 @@ public void testGetNullCompleteTask() ), new TaskInfo( "id_2", - "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "deny", @@ -902,7 +859,6 @@ public void testGetNullCompleteTask() ), new TaskInfo( "id_3", - "groupId_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), "allow", @@ -1047,7 +1003,6 @@ public void testGetTaskStatus() throws Exception EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("mytask")) .andReturn(new TaskInfo( task.getId(), - task.getGroupId(), DateTimes.of("2018-01-01"), status, task.getDataSource(), @@ -1154,7 +1109,6 @@ public void testShutdownAllTasks() EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of( new TaskInfo( "id_1", - "groupId_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "datasource", @@ -1162,7 +1116,6 @@ public void testShutdownAllTasks() ), new TaskInfo( "id_2", - "groupId_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "datasource", diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index aeb42d82e2c3..acdbbd0f2c1a 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -372,7 +372,6 @@ public TaskInfo map(int index, ResultSet resultSet, State } taskInfo = new TaskInfo<>( resultSet.getString("id"), - null, DateTimes.of(resultSet.getString("created_date")), status, resultSet.getString("datasource"), From 175b1fbbb01774932c67da8e8f85b6ce9d709c93 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 21 Aug 2019 16:04:17 -0700 Subject: [PATCH 8/9] Modify docs and tests --- docs/querying/sql.md | 2 +- .../parallel/AbstractParallelIndexSupervisorTaskTest.java | 4 +++- .../apache/druid/sql/calcite/schema/SystemSchemaTest.java | 8 ++++---- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/querying/sql.md b/docs/querying/sql.md index 01867659a54e..4fdd2575b50d 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -751,7 +751,7 @@ check out the documentation for [ingestion tasks](../ingestion/tasks.html). |Column|Type|Notes| |------|-----|-----| |task_id|STRING|Unique task identifier| -|group_id|STRING|Task group ID for this task, for sub tasks, this value is the parent task's ID| +|group_id|STRING|Task group ID for this task, the value depends on the task `type`. For example, for native index tasks, it's same as `task_id`, for sub tasks, this value is the parent task's ID| |type|STRING|Task type, for example this value is "index" for indexing tasks. See [tasks-overview](../ingestion/tasks.html)| |datasource|STRING|Datasource name being indexed| |created_time|STRING|Timestamp in ISO8601 format corresponding to when the ingestion task was created. Note that this value is populated for completed and waiting tasks. For running and pending tasks this value is set to 1970-01-01T00:00:00Z| diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 08659e52bab6..62a360d5b2af 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -152,10 +152,12 @@ public String runTask(Object taskObject) if (subTask.isReady(toolbox.getTaskActionClient())) { return subTask.run(toolbox); } else { + getTaskStorage().setStatus(TaskStatus.failure(subTask.getId())); throw new ISE("task[%s] is not ready", subTask.getId()); } } catch (Exception e) { + getTaskStorage().setStatus(TaskStatus.failure(subTask.getId(), e.getMessage())); throw new RuntimeException(e); } })); @@ -167,7 +169,7 @@ public TaskStatusResponse getTaskStatus(String taskId) { final Future taskStatusFuture = tasks.get(taskId); final Optional task = getTaskStorage().getTask(taskId); - final String groupId = task.isPresent() ? task.orNull().getGroupId() : taskId; + final String groupId = task.isPresent() ? task.get().getGroupId() : null; if (taskStatusFuture != null) { try { if (taskStatusFuture.isDone()) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index c8d6f75ec869..3118af043da9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -1012,7 +1012,7 @@ public void testTasksTable() throws Exception String json = "[{\n" + "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n" - + "\t\"groupId\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n" + + "\t\"groupId\": \"group_index_wikipedia_2018-09-20T22:33:44.911Z\",\n" + "\t\"type\": \"index\",\n" + "\t\"createdTime\": \"2018-09-20T22:33:44.922Z\",\n" + "\t\"queueInsertionTime\": \"1970-01-01T00:00:00.000Z\",\n" @@ -1028,7 +1028,7 @@ public void testTasksTable() throws Exception + "\t\"errorMsg\": null\n" + "}, {\n" + "\t\"id\": \"index_wikipedia_2018-09-21T18:38:47.773Z\",\n" - + "\t\"groupId\": \"index_wikipedia_2018-09-21T18:38:47.773Z\",\n" + + "\t\"groupId\": \"group_index_wikipedia_2018-09-21T18:38:47.773Z\",\n" + "\t\"type\": \"index\",\n" + "\t\"createdTime\": \"2018-09-21T18:38:47.873Z\",\n" + "\t\"queueInsertionTime\": \"2018-09-21T18:38:47.910Z\",\n" @@ -1079,7 +1079,7 @@ public Object get(String name) Object[] row0 = rows.get(0); Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row0[0].toString()); - Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row0[1].toString()); + Assert.assertEquals("group_index_wikipedia_2018-09-20T22:33:44.911Z", row0[1].toString()); Assert.assertEquals("index", row0[2].toString()); Assert.assertEquals("wikipedia", row0[3].toString()); Assert.assertEquals("2018-09-20T22:33:44.922Z", row0[4].toString()); @@ -1095,7 +1095,7 @@ public Object get(String name) Object[] row1 = rows.get(1); Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row1[0].toString()); - Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row1[1].toString()); + Assert.assertEquals("group_index_wikipedia_2018-09-21T18:38:47.773Z", row1[1].toString()); Assert.assertEquals("index", row1[2].toString()); Assert.assertEquals("wikipedia", row1[3].toString()); Assert.assertEquals("2018-09-21T18:38:47.873Z", row1[4].toString()); From f369a927dc551ad93bc4e5de16ffd24fa597dd31 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 21 Aug 2019 17:11:54 -0700 Subject: [PATCH 9/9] modify TaskMonitorTest --- .../indexing/common/task/batch/parallel/TaskMonitorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java index 5eab6aacb0f5..604376891cf3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java @@ -196,7 +196,7 @@ public TaskStatusResponse getTaskStatus(String taskId) taskId, new TaskStatusPlus( taskId, - taskId, + "groupId", "testTask", DateTimes.EPOCH, DateTimes.EPOCH,