diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java index f32b11d26515..b4bbd1a04cd7 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java @@ -22,15 +22,18 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.Objects; public class ClientCompactQuery implements ClientQuery { private final String dataSource; private final List segments; + private final Interval interval; private final boolean keepSegmentGranularity; @Nullable private final Long targetCompactionSizeBytes; @@ -40,7 +43,8 @@ public class ClientCompactQuery implements ClientQuery @JsonCreator public ClientCompactQuery( @JsonProperty("dataSource") String dataSource, - @JsonProperty("segments") List segments, + @Nullable @JsonProperty("interval") final Interval interval, + @Nullable @JsonProperty("segments") final List segments, @JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity, @JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes, @JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig, @@ -49,6 +53,7 @@ public ClientCompactQuery( { this.dataSource = dataSource; this.segments = segments; + this.interval = interval; this.keepSegmentGranularity = keepSegmentGranularity; this.targetCompactionSizeBytes = targetCompactionSizeBytes; this.tuningConfig = tuningConfig; @@ -75,6 +80,12 @@ public List getSegments() return segments; } + @JsonProperty + public Interval getInterval() + { + return interval; + } + @JsonProperty public boolean isKeepSegmentGranularity() { @@ -100,12 +111,46 @@ public Map getContext() return context; } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientCompactQuery that = (ClientCompactQuery) o; + return keepSegmentGranularity == that.keepSegmentGranularity && + Objects.equals(dataSource, that.dataSource) && + Objects.equals(segments, that.segments) && + Objects.equals(interval, that.interval) && + Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) && + Objects.equals(tuningConfig, that.tuningConfig) && + Objects.equals(context, that.context); + } + + @Override + public int hashCode() + { + return Objects.hash( + dataSource, + segments, + interval, + keepSegmentGranularity, + targetCompactionSizeBytes, + tuningConfig, + context + ); + } + @Override public String toString() { return "ClientCompactQuery{" + "dataSource='" + dataSource + '\'' + ", segments=" + segments + + ", interval=" + interval + ", keepSegmentGranularity=" + keepSegmentGranularity + ", targetCompactionSizeBytes=" + targetCompactionSizeBytes + ", tuningConfig=" + tuningConfig + diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index c5c9531ffd89..054b0102bed0 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.Iterables; import com.google.inject.Inject; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.indexer.TaskStatusPlus; @@ -40,10 +41,13 @@ import javax.annotation.Nullable; import javax.ws.rs.core.MediaType; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class HttpIndexingServiceClient implements IndexingServiceClient { @@ -90,6 +94,7 @@ public String compactSegments( return runTask( new ClientCompactQuery( dataSource, + null, segments, keepSegmentGranularity, targetCompactionSizeBytes, @@ -195,21 +200,30 @@ public int getTotalWorkerCapacity() } @Override - public List getRunningTasks() + public List getActiveTasks() { - return getTasks("runningTasks"); - } + // Must retrieve waiting, then pending, then running, so if tasks move from one state to the next between + // calls then we still catch them. (Tasks always go waiting -> pending -> running.) + // + // Consider switching to new-style /druid/indexer/v1/tasks API in the future. + final List tasks = new ArrayList<>(); + final Set taskIdsSeen = new HashSet<>(); + + final Iterable activeTasks = Iterables.concat( + getTasks("waitingTasks"), + getTasks("pendingTasks"), + getTasks("runningTasks") + ); - @Override - public List getPendingTasks() - { - return getTasks("pendingTasks"); - } + for (TaskStatusPlus task : activeTasks) { + // Use taskIdsSeen to prevent returning the same task ID more than once (if it hops from 'pending' to 'running', + // for example, and we see it twice.) + if (taskIdsSeen.add(task.getId())) { + tasks.add(task); + } + } - @Override - public List getWaitingTasks() - { - return getTasks("waitingTasks"); + return tasks; } private List getTasks(String endpointSuffix) diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index df2baf6b4f3a..5196bc0456b8 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -49,11 +49,10 @@ String compactSegments( String killTask(String taskId); - List getRunningTasks(); - - List getPendingTasks(); - - List getWaitingTasks(); + /** + * Gets all tasks that are waiting, pending, or running. + */ + List getActiveTasks(); TaskStatusResponse getTaskStatus(String taskId); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java index a25ea07c523f..bf4ffdee80e5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorCleanupPendingSegments.java @@ -52,27 +52,11 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final List createdTimes = new ArrayList<>(); createdTimes.add( indexingServiceClient - .getRunningTasks() + .getActiveTasks() .stream() .map(TaskStatusPlus::getCreatedTime) .min(Comparators.naturalNullsFirst()) - .orElse(DateTimes.nowUtc()) // If there is no running tasks, this returns the current time. - ); - createdTimes.add( - indexingServiceClient - .getPendingTasks() - .stream() - .map(TaskStatusPlus::getCreatedTime) - .min(Comparators.naturalNullsFirst()) - .orElse(DateTimes.nowUtc()) // If there is no pending tasks, this returns the current time. - ); - createdTimes.add( - indexingServiceClient - .getWaitingTasks() - .stream() - .map(TaskStatusPlus::getCreatedTime) - .min(Comparators.naturalNullsFirst()) - .orElse(DateTimes.nowUtc()) // If there is no waiting tasks, this returns the current time. + .orElse(DateTimes.nowUtc()) // If there are no active tasks, this returns the current time. ); final TaskStatusPlus completeTaskStatus = indexingServiceClient.getLastCompleteTask(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index 0258cd0069f7..d22f92adb118 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -41,7 +41,6 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -84,11 +83,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) Map compactionConfigs = compactionConfigList .stream() .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); - final List compactTasks = filterNonCompactTasks( - indexingServiceClient.getRunningTasks(), - indexingServiceClient.getPendingTasks(), - indexingServiceClient.getWaitingTasks() - ); + final List compactTasks = filterNonCompactTasks(indexingServiceClient.getActiveTasks()); // dataSource -> list of intervals of compact tasks final Map> compactTaskIntervals = new HashMap<>(compactionConfigList.size()); for (TaskStatusPlus status : compactTasks) { @@ -98,13 +93,22 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) } if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) { final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload(); - final Interval interval = JodaUtils.umbrellaInterval( - compactQuery.getSegments() - .stream() - .map(DataSegment::getInterval) - .sorted(Comparators.intervalsByStartThenEnd()) - .collect(Collectors.toList()) - ); + final Interval interval; + + if (compactQuery.getSegments() != null) { + interval = JodaUtils.umbrellaInterval( + compactQuery.getSegments() + .stream() + .map(DataSegment::getInterval) + .sorted(Comparators.intervalsByStartThenEnd()) + .collect(Collectors.toList()) + ); + } else if (compactQuery.getInterval() != null) { + interval = compactQuery.getInterval(); + } else { + throw new ISE("task[%s] has neither 'segments' nor 'interval'", status.getId()); + } + compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval); } else { throw new ISE("WTH? task[%s] is not a compactTask?", status.getId()); @@ -146,13 +150,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) .build(); } - @SafeVarargs - private static List filterNonCompactTasks(List...taskStatusStreams) + private static List filterNonCompactTasks(List taskStatuses) { - final List allTaskStatusPlus = new ArrayList<>(); - Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll); - - return allTaskStatusPlus + return taskStatuses .stream() .filter(status -> { final String taskType = status.getType(); diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index 78a91648d785..e092d38c4d84 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -25,6 +25,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -74,21 +75,9 @@ public String killTask(String taskId) } @Override - public List getRunningTasks() + public List getActiveTasks() { - return null; - } - - @Override - public List getPendingTasks() - { - return null; - } - - @Override - public List getWaitingTasks() - { - return null; + return Collections.emptyList(); } @Override diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java index 52b78e2ff584..66b73a789479 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java @@ -107,19 +107,7 @@ public String compactSegments( } @Override - public List getRunningTasks() - { - return Collections.emptyList(); - } - - @Override - public List getPendingTasks() - { - return Collections.emptyList(); - } - - @Override - public List getWaitingTasks() + public List getActiveTasks() { return Collections.emptyList(); }