Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class ClientCompactQuery implements ClientQuery
{
private final String dataSource;
private final List<DataSegment> segments;
private final Interval interval;
private final boolean keepSegmentGranularity;
@Nullable
private final Long targetCompactionSizeBytes;
Expand All @@ -40,7 +43,8 @@ public class ClientCompactQuery implements ClientQuery
@JsonCreator
public ClientCompactQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@Nullable @JsonProperty("interval") final Interval interval,
@Nullable @JsonProperty("segments") final List<DataSegment> segments,
@JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity,
@JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
@JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
Expand All @@ -49,6 +53,7 @@ public ClientCompactQuery(
{
this.dataSource = dataSource;
this.segments = segments;
this.interval = interval;
this.keepSegmentGranularity = keepSegmentGranularity;
this.targetCompactionSizeBytes = targetCompactionSizeBytes;
this.tuningConfig = tuningConfig;
Expand All @@ -75,6 +80,12 @@ public List<DataSegment> getSegments()
return segments;
}

@JsonProperty
public Interval getInterval()
{
return interval;
}

@JsonProperty
public boolean isKeepSegmentGranularity()
{
Expand All @@ -100,12 +111,46 @@ public Map<String, Object> getContext()
return context;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ClientCompactQuery that = (ClientCompactQuery) o;
return keepSegmentGranularity == that.keepSegmentGranularity &&
Objects.equals(dataSource, that.dataSource) &&
Objects.equals(segments, that.segments) &&
Objects.equals(interval, that.interval) &&
Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
Objects.equals(context, that.context);
}

@Override
public int hashCode()
{
return Objects.hash(
dataSource,
segments,
interval,
keepSegmentGranularity,
targetCompactionSizeBytes,
tuningConfig,
context
);
}

@Override
public String toString()
{
return "ClientCompactQuery{" +
"dataSource='" + dataSource + '\'' +
", segments=" + segments +
", interval=" + interval +
", keepSegmentGranularity=" + keepSegmentGranularity +
", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
", tuningConfig=" + tuningConfig +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskStatusPlus;
Expand All @@ -40,10 +41,13 @@
import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class HttpIndexingServiceClient implements IndexingServiceClient
{
Expand Down Expand Up @@ -90,6 +94,7 @@ public String compactSegments(
return runTask(
new ClientCompactQuery(
dataSource,
null,
segments,
keepSegmentGranularity,
targetCompactionSizeBytes,
Expand Down Expand Up @@ -195,21 +200,30 @@ public int getTotalWorkerCapacity()
}

@Override
public List<TaskStatusPlus> getRunningTasks()
public List<TaskStatusPlus> getActiveTasks()
{
return getTasks("runningTasks");
}
// Must retrieve waiting, then pending, then running, so if tasks move from one state to the next between
// calls then we still catch them. (Tasks always go waiting -> pending -> running.)
//
// Consider switching to new-style /druid/indexer/v1/tasks API in the future.
final List<TaskStatusPlus> tasks = new ArrayList<>();
final Set<String> taskIdsSeen = new HashSet<>();

final Iterable<TaskStatusPlus> activeTasks = Iterables.concat(
getTasks("waitingTasks"),
getTasks("pendingTasks"),
getTasks("runningTasks")
);

@Override
public List<TaskStatusPlus> getPendingTasks()
{
return getTasks("pendingTasks");
}
for (TaskStatusPlus task : activeTasks) {
// Use taskIdsSeen to prevent returning the same task ID more than once (if it hops from 'pending' to 'running',
// for example, and we see it twice.)
if (taskIdsSeen.add(task.getId())) {
tasks.add(task);
}
}

@Override
public List<TaskStatusPlus> getWaitingTasks()
{
return getTasks("waitingTasks");
return tasks;
}

private List<TaskStatusPlus> getTasks(String endpointSuffix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ String compactSegments(

String killTask(String taskId);

List<TaskStatusPlus> getRunningTasks();

List<TaskStatusPlus> getPendingTasks();

List<TaskStatusPlus> getWaitingTasks();
/**
* Gets all tasks that are waiting, pending, or running.
*/
List<TaskStatusPlus> getActiveTasks();

TaskStatusResponse getTaskStatus(String taskId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,11 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
final List<DateTime> createdTimes = new ArrayList<>();
createdTimes.add(
indexingServiceClient
.getRunningTasks()
.getActiveTasks()
.stream()
.map(TaskStatusPlus::getCreatedTime)
.min(Comparators.naturalNullsFirst())
.orElse(DateTimes.nowUtc()) // If there is no running tasks, this returns the current time.
);
createdTimes.add(
indexingServiceClient
.getPendingTasks()
.stream()
.map(TaskStatusPlus::getCreatedTime)
.min(Comparators.naturalNullsFirst())
.orElse(DateTimes.nowUtc()) // If there is no pending tasks, this returns the current time.
);
createdTimes.add(
indexingServiceClient
.getWaitingTasks()
.stream()
.map(TaskStatusPlus::getCreatedTime)
.min(Comparators.naturalNullsFirst())
.orElse(DateTimes.nowUtc()) // If there is no waiting tasks, this returns the current time.
.orElse(DateTimes.nowUtc()) // If there are no active tasks, this returns the current time.
);

final TaskStatusPlus completeTaskStatus = indexingServiceClient.getLastCompleteTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -84,11 +83,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
.stream()
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(
indexingServiceClient.getRunningTasks(),
indexingServiceClient.getPendingTasks(),
indexingServiceClient.getWaitingTasks()
);
final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(indexingServiceClient.getActiveTasks());
// dataSource -> list of intervals of compact tasks
final Map<String, List<Interval>> compactTaskIntervals = new HashMap<>(compactionConfigList.size());
for (TaskStatusPlus status : compactTasks) {
Expand All @@ -98,13 +93,22 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
}
if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
final Interval interval = JodaUtils.umbrellaInterval(
compactQuery.getSegments()
.stream()
.map(DataSegment::getInterval)
.sorted(Comparators.intervalsByStartThenEnd())
.collect(Collectors.toList())
);
final Interval interval;

if (compactQuery.getSegments() != null) {
interval = JodaUtils.umbrellaInterval(
compactQuery.getSegments()
.stream()
.map(DataSegment::getInterval)
.sorted(Comparators.intervalsByStartThenEnd())
.collect(Collectors.toList())
);
} else if (compactQuery.getInterval() != null) {
interval = compactQuery.getInterval();
} else {
throw new ISE("task[%s] has neither 'segments' nor 'interval'", status.getId());
}

compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
} else {
throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
Expand Down Expand Up @@ -146,13 +150,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
.build();
}

@SafeVarargs
private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus>...taskStatusStreams)
private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus> taskStatuses)
{
final List<TaskStatusPlus> allTaskStatusPlus = new ArrayList<>();
Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll);

return allTaskStatusPlus
return taskStatuses
.stream()
.filter(status -> {
final String taskType = status.getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -74,21 +75,9 @@ public String killTask(String taskId)
}

@Override
public List<TaskStatusPlus> getRunningTasks()
public List<TaskStatusPlus> getActiveTasks()
{
return null;
}

@Override
public List<TaskStatusPlus> getPendingTasks()
{
return null;
}

@Override
public List<TaskStatusPlus> getWaitingTasks()
{
return null;
return Collections.emptyList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,7 @@ public String compactSegments(
}

@Override
public List<TaskStatusPlus> getRunningTasks()
{
return Collections.emptyList();
}

@Override
public List<TaskStatusPlus> getPendingTasks()
{
return Collections.emptyList();
}

@Override
public List<TaskStatusPlus> getWaitingTasks()
public List<TaskStatusPlus> getActiveTasks()
{
return Collections.emptyList();
}
Expand Down