Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.error.InvalidInput;
Expand Down Expand Up @@ -171,8 +172,6 @@ public Set<ResourceAction> getInputSourceResources()
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = getTaskLockMap(toolbox.getTaskActionClient());

// Track stats for reporting
int numSegmentsKilled = 0;
int numBatchesProcessed = 0;
Expand All @@ -196,21 +195,40 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
limit,
numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "."
);

RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction(
getDataSource(),
null,
ImmutableList.of(getInterval()),
Segments.INCLUDING_OVERSHADOWED
);
// Fetch the load specs of all segments overlapping with the unused segment intervals
final Set<Map<String, Object>> usedSegmentLoadSpecs =
new HashSet<>(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction)
.stream()
.map(DataSegment::getLoadSpec)
.collect(Collectors.toSet())
);

do {
if (nextBatchSize <= 0) {
break;
}

unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));

// Fetch locks each time as a revokal could have occurred in between batches
final NavigableMap<DateTime, List<TaskLock>> taskLockMap
= getNonRevokedTaskLockMap(toolbox.getTaskActionClient());

if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
);
}

Expand All @@ -222,24 +240,6 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception

toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));

final Set<Interval> unusedSegmentIntervals = unusedSegments.stream()
.map(DataSegment::getInterval)
.collect(Collectors.toSet());
final Set<Map<String, Object>> usedSegmentLoadSpecs = new HashSet<>();
if (!unusedSegmentIntervals.isEmpty()) {
RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction(
getDataSource(),
null,
unusedSegmentIntervals,
Segments.INCLUDING_OVERSHADOWED
);
// Fetch the load specs of all segments overlapping with the unused segment intervals
usedSegmentLoadSpecs.addAll(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction)
.stream()
.map(DataSegment::getLoadSpec)
.collect(Collectors.toSet())
);
}

// Kill segments from the deep storage only if their load specs are not being used by any used segments
final List<DataSegment> segmentsToBeKilled = unusedSegments
Expand Down Expand Up @@ -289,11 +289,15 @@ int computeNextBatchSize(int numSegmentsKilled)
return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : batchSize;
}

private NavigableMap<DateTime, List<TaskLock>> getTaskLockMap(TaskActionClient client) throws IOException
private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActionClient client) throws IOException
{
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<>();
getTaskLocks(client).forEach(
taskLock -> taskLockMap.computeIfAbsent(taskLock.getInterval().getStart(), k -> new ArrayList<>()).add(taskLock)
taskLock -> {
if (!taskLock.isRevoked()) {
taskLockMap.computeIfAbsent(taskLock.getInterval().getStart(), k -> new ArrayList<>()).add(taskLock);
}
}
);
return taskLockMap;
}
Expand Down