From 506c6e8f08c12d6d1688d9c915b2104dbab752f5 Mon Sep 17 00:00:00 2001 From: Marat Safin Date: Fri, 2 Nov 2018 12:57:29 +0300 Subject: [PATCH 1/6] 1. added support for unused DateTime start parameter in getRecentlyFinishedTaskInfoSince method: HeapMemoryTaskStorage.getRecentlyFinishedTaskInfoSince return the finished tasks by comparing TaskStuff.createdDate with the start time 2. added filtering by status complete to TaskStuff list stream in HeapMemoryTaskStorage.getNRecentlyFinishedTaskInfo method. 3. changed names of methods and parameters to present that public API method OverlordResource.getTasks return the list of completed tasks, which createdDate, not date of completion, belongs to the interval parameter. --- .../overlord/HeapMemoryTaskStorage.java | 17 +++++++++-------- .../indexing/overlord/MetadataTaskStorage.java | 6 +++--- .../druid/indexing/overlord/TaskStorage.java | 8 ++++---- .../overlord/TaskStorageQueryAdapter.java | 4 ++-- 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index 006286ac8417..0620f912ea5b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -221,9 +221,9 @@ public List> getActiveTaskInfo(@Nullable String dataS } @Override - public List> getRecentlyFinishedTaskInfo( + public List> getFinishedTaskInfoByCreatedTimeDuration( @Nullable Integer maxTaskStatuses, - @Nullable Duration duration, + @Nullable Duration createdTimeDuration, @Nullable String datasource ) { @@ -240,18 +240,18 @@ public int compare(TaskStuff a, TaskStuff b) }.reverse(); return maxTaskStatuses == null ? - getRecentlyFinishedTaskInfoSince( - DateTimes.nowUtc().minus(duration == null ? config.getRecentlyFinishedThreshold() : duration), + getFinishedTaskInfoSince( + DateTimes.nowUtc().minus(createdTimeDuration == null ? config.getRecentlyFinishedThreshold() : createdTimeDuration), createdDateDesc ) : - getNRecentlyFinishedTaskInfo(maxTaskStatuses, createdDateDesc); + getNFinishedTaskInfo(maxTaskStatuses, createdDateDesc); } finally { giant.unlock(); } } - private List> getRecentlyFinishedTaskInfoSince( + private List> getFinishedTaskInfoSince( DateTime start, Ordering createdDateDesc ) @@ -262,7 +262,7 @@ private List> getRecentlyFinishedTaskInfoSince( List list = createdDateDesc .sortedCopy(tasks.values()) .stream() - .filter(taskStuff -> taskStuff.getStatus().isComplete()) + .filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.createdDate.isAfter(start)) .collect(Collectors.toList()); final ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (final TaskStuff taskStuff : list) { @@ -283,7 +283,7 @@ private List> getRecentlyFinishedTaskInfoSince( } } - private List> getNRecentlyFinishedTaskInfo(int n, Ordering createdDateDesc) + private List> getNFinishedTaskInfo(int n, Ordering createdDateDesc) { giant.lock(); @@ -291,6 +291,7 @@ private List> getNRecentlyFinishedTaskInfo(int n, Ord List list = createdDateDesc .sortedCopy(tasks.values()) .stream() + .filter(taskStuff -> taskStuff.getStatus().isComplete()) .limit(n) .collect(Collectors.toList()); final ImmutableList.Builder> listBuilder = ImmutableList.builder(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 808fdb797295..b4b5eb547016 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -208,15 +208,15 @@ public List> getActiveTaskInfo(@Nullable String dataS } @Override - public List> getRecentlyFinishedTaskInfo( + public List> getFinishedTaskInfoByCreatedTimeDuration( @Nullable Integer maxTaskStatuses, - @Nullable Duration duration, + @Nullable Duration createdTimeDuration, @Nullable String datasource ) { return ImmutableList.copyOf( handler.getCompletedTaskInfo( - DateTimes.nowUtc().minus(duration == null ? config.getRecentlyFinishedThreshold() : duration), + DateTimes.nowUtc().minus(createdTimeDuration == null ? config.getRecentlyFinishedThreshold() : createdTimeDuration), maxTaskStatuses, datasource ) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index b2f55f0c9d43..49231e93d668 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -141,14 +141,14 @@ public interface TaskStorage * return nothing. * * @param maxTaskStatuses maxTaskStatuses - * @param duration duration - * @param datasource datasource + * @param createdTimeDuration duration + * @param datasource datasource * * @return list of {@link TaskInfo} */ - List> getRecentlyFinishedTaskInfo( + List> getFinishedTaskInfoByCreatedTimeDuration( @Nullable Integer maxTaskStatuses, - @Nullable Duration duration, + @Nullable Duration createdTimeDuration, @Nullable String datasource ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index fd61752a7f2a..2e716fb627c9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -58,13 +58,13 @@ public List> getActiveTaskInfo(@Nullable String dataS return storage.getActiveTaskInfo(dataSource); } - public List> getRecentlyCompletedTaskInfo( + public List> getCompletedTaskInfoByCreatedTimeDuration( @Nullable Integer maxTaskStatuses, @Nullable Duration duration, @Nullable String dataSource ) { - return storage.getRecentlyFinishedTaskInfo(maxTaskStatuses, duration, dataSource); + return storage.getFinishedTaskInfoByCreatedTimeDuration(maxTaskStatuses, duration, dataSource); } public Optional getTask(final String taskid) From 9ce63863f54eb74deddc8eed3d326db6748366b3 Mon Sep 17 00:00:00 2001 From: Marat Safin Date: Fri, 2 Nov 2018 15:59:23 +0300 Subject: [PATCH 2/6] Fixed OverlordResourceTest to Support changed methods names --- .../overlord/http/OverlordResourceTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 7964c764069c..e3885f755967 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -233,7 +233,7 @@ public void testSecuredGetCompleteTasks() new MockTaskRunnerWorkItem(tasksIds.get(1), null), new MockTaskRunnerWorkItem(tasksIds.get(2), null))); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -259,7 +259,7 @@ public void testSecuredGetCompleteTasks() ) ); EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req); - Assert.assertTrue(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null).size() == 3); + Assert.assertTrue(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null).size() == 3); Assert.assertTrue(taskRunner.getRunningTasks().size() == 3); List responseObjects = (List) overlordResource .getCompleteTasks(null, req).getEntity(); @@ -313,7 +313,7 @@ public void testGetTasks() { expectAuthorizationTokenCheck(); //completed tasks - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_5", @@ -403,7 +403,7 @@ public void testGetTasksFilterDataSource() { expectAuthorizationTokenCheck(); //completed tasks - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, "allow")).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, "allow")).andStubReturn( ImmutableList.of( new TaskInfo( "id_5", @@ -667,7 +667,7 @@ public void testGetTasksFilterPendingState() public void testGetTasksFilterCompleteState() { expectAuthorizationTokenCheck(); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -707,7 +707,7 @@ public void testGetTasksFilterCompleteStateWithInterval() expectAuthorizationTokenCheck(); List tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); Duration duration = new Period("PT86400S").toStandardDuration(); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, duration, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, duration, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", @@ -747,7 +747,7 @@ public void testGetTasksFilterCompleteStateWithInterval() public void testGetNullCompleteTask() { expectAuthorizationTokenCheck(); - EasyMock.expect(taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(null, null, null)).andStubReturn( + EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( ImmutableList.of( new TaskInfo( "id_1", From ec015800b3c3e29060231e7d6ee5f83a348ea813 Mon Sep 17 00:00:00 2001 From: Marat Safin Date: Wed, 14 Nov 2018 13:18:12 +0300 Subject: [PATCH 3/6] Changed methods and parameters names to make them more obvious to understand. --- .../indexing/overlord/HeapMemoryTaskStorage.java | 14 +++++++------- .../indexing/overlord/MetadataTaskStorage.java | 6 +++--- .../druid/indexing/overlord/TaskStorage.java | 4 ++-- .../indexing/overlord/TaskStorageQueryAdapter.java | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index 0620f912ea5b..51b38d026095 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -221,9 +221,9 @@ public List> getActiveTaskInfo(@Nullable String dataS } @Override - public List> getFinishedTaskInfoByCreatedTimeDuration( + public List> getRecentlyCreatedAlreadyFinishedTaskInfo( @Nullable Integer maxTaskStatuses, - @Nullable Duration createdTimeDuration, + @Nullable Duration durationBeforeNow, @Nullable String datasource ) { @@ -240,18 +240,18 @@ public int compare(TaskStuff a, TaskStuff b) }.reverse(); return maxTaskStatuses == null ? - getFinishedTaskInfoSince( - DateTimes.nowUtc().minus(createdTimeDuration == null ? config.getRecentlyFinishedThreshold() : createdTimeDuration), + getRecentlyCreatedAlreadyFinishedTaskInfoSince( + DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), createdDateDesc ) : - getNFinishedTaskInfo(maxTaskStatuses, createdDateDesc); + getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc); } finally { giant.unlock(); } } - private List> getFinishedTaskInfoSince( + private List> getRecentlyCreatedAlreadyFinishedTaskInfoSince( DateTime start, Ordering createdDateDesc ) @@ -283,7 +283,7 @@ private List> getFinishedTaskInfoSince( } } - private List> getNFinishedTaskInfo(int n, Ordering createdDateDesc) + private List> getNRecentlyCreatedAlreadyFinishedTaskInfo(int n, Ordering createdDateDesc) { giant.lock(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index b4b5eb547016..f0d9d37167f9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -208,15 +208,15 @@ public List> getActiveTaskInfo(@Nullable String dataS } @Override - public List> getFinishedTaskInfoByCreatedTimeDuration( + public List> getRecentlyCreatedAlreadyFinishedTaskInfo( @Nullable Integer maxTaskStatuses, - @Nullable Duration createdTimeDuration, + @Nullable Duration durationBeforeNow, @Nullable String datasource ) { return ImmutableList.copyOf( handler.getCompletedTaskInfo( - DateTimes.nowUtc().minus(createdTimeDuration == null ? config.getRecentlyFinishedThreshold() : createdTimeDuration), + DateTimes.nowUtc().minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), maxTaskStatuses, datasource ) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index 49231e93d668..e78293beafe4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -146,9 +146,9 @@ public interface TaskStorage * * @return list of {@link TaskInfo} */ - List> getFinishedTaskInfoByCreatedTimeDuration( + List> getRecentlyCreatedAlreadyFinishedTaskInfo( @Nullable Integer maxTaskStatuses, - @Nullable Duration createdTimeDuration, + @Nullable Duration durationBeforeNow, @Nullable String datasource ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index 2e716fb627c9..6a12b40bc11b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -64,7 +64,7 @@ public List> getCompletedTaskInfoByCreatedTimeDuratio @Nullable String dataSource ) { - return storage.getFinishedTaskInfoByCreatedTimeDuration(maxTaskStatuses, duration, dataSource); + return storage.getRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, duration, dataSource); } public Optional getTask(final String taskid) From 5f2ffb94509a6e0105792a346533122650959a48 Mon Sep 17 00:00:00 2001 From: Marat Safin Date: Fri, 2 Nov 2018 12:57:29 +0300 Subject: [PATCH 4/6] 1. added support for unused DateTime start parameter in getRecentlyFinishedTaskInfoSince method: HeapMemoryTaskStorage.getRecentlyFinishedTaskInfoSince return the finished tasks by comparing TaskStuff.createdDate with the start time 2. added filtering by status complete to TaskStuff list stream in HeapMemoryTaskStorage.getNRecentlyFinishedTaskInfo method. 3. changed names of methods and parameters to present that public API method OverlordResource.getTasks return the list of completed tasks, which createdDate, not date of completion, belongs to the interval parameter. --- .../apache/druid/indexing/overlord/TaskStorage.java | 4 ++-- .../indexing/overlord/http/OverlordResource.java | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index e78293beafe4..1edd52c2c385 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -141,8 +141,8 @@ public interface TaskStorage * return nothing. * * @param maxTaskStatuses maxTaskStatuses - * @param createdTimeDuration duration - * @param datasource datasource + * @param durationBeforeNow duration + * @param datasource datasource * * @return list of {@link TaskInfo} */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index f0748f9a3afe..1976cb9721fd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -627,7 +627,7 @@ public Response getCompleteTasks( public Response getTasks( @QueryParam("state") final String state, @QueryParam("datasource") final String dataSource, - @QueryParam("interval") final String interval, + @PathParam("createdTimeInterval") final String createdTimeInterval, @QueryParam("max") final Integer maxCompletedTasks, @QueryParam("type") final String type, @Context final HttpServletRequest req @@ -692,13 +692,13 @@ public Response getTasks( //checking for complete tasks first to avoid querying active tasks if user only wants complete tasks if (state == null || "complete".equals(StringUtils.toLowerCase(state))) { - Duration duration = null; - if (interval != null) { - final Interval theInterval = Intervals.of(interval.replace('_', '/')); - duration = theInterval.toDuration(); + Duration createdTimeDuration = null; + if (createdTimeInterval != null) { + final Interval theInterval = Intervals.of(createdTimeInterval.replace("_", "/")); + createdTimeDuration = theInterval.toDuration(); } final List> taskInfoList = - taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(maxCompletedTasks, duration, dataSource); + taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(maxCompletedTasks, createdTimeDuration, dataSource); final List completedTasks = taskInfoList.stream() .map(completeTaskTransformFunc::apply) .collect(Collectors.toList()); From 53e0c11bbde71eb708ca738a36c9407136cb6632 Mon Sep 17 00:00:00 2001 From: Marat Safin Date: Tue, 20 Nov 2018 10:47:24 +0300 Subject: [PATCH 5/6] Changed String.replace() for the StringUtils.replace()(#6607) --- .../apache/druid/indexing/overlord/http/OverlordResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 1976cb9721fd..e60d29d6064c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -694,7 +694,7 @@ public Response getTasks( if (state == null || "complete".equals(StringUtils.toLowerCase(state))) { Duration createdTimeDuration = null; if (createdTimeInterval != null) { - final Interval theInterval = Intervals.of(createdTimeInterval.replace("_", "/")); + final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval,"_", "/")); createdTimeDuration = theInterval.toDuration(); } final List> taskInfoList = From b97e703ee8212fa6d906e72b5fd281b083e51ee6 Mon Sep 17 00:00:00 2001 From: Marat Safin Date: Tue, 20 Nov 2018 14:49:51 +0300 Subject: [PATCH 6/6] Fixed checkstyle error --- .../apache/druid/indexing/overlord/http/OverlordResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index e60d29d6064c..92f01ac5a6f0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -694,7 +694,7 @@ public Response getTasks( if (state == null || "complete".equals(StringUtils.toLowerCase(state))) { Duration createdTimeDuration = null; if (createdTimeInterval != null) { - final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval,"_", "/")); + final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/")); createdTimeDuration = theInterval.toDuration(); } final List> taskInfoList =