Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -45,6 +46,7 @@
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -61,7 +63,14 @@ public class NewestSegmentFirstPolicyBenchmark
{
private static final String DATA_SOURCE_PREFIX = "dataSource_";

private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(new DefaultObjectMapper());
private final CompactionSegmentSearchPolicy policy =
new NewestSegmentFirstPolicy(
new DefaultObjectMapper(),
new TestDruidCoordinatorConfig.Builder()
.withCompactionSearchPolicyRefreshPeriod(Duration.standardMinutes(5))
.build(),
Clock.systemUTC()
);

@Param("100")
private int numDataSources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,7 @@ public boolean getCompactionSkipLockedIntervals()
return true;
}

@Config("druid.coordinator.compaction.searchPolicyRefreshPeriod")
@Default("PT5M")
Comment on lines +140 to +141
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, why here instead of on the CompactSegments duty? I guess that would make the property druid.coordinator.compaction.duty.searchPolicyRefreshPeriod instead

side note, coordinator config in general seems really complicated 😅 I had to read a bunch of code to understand how custom duties work and get wired up to stuff... and its kind of strange.

I guess where I am getting at is that it seems like having this refresh period be more frequent than the duty period seems like it would be an incorrect configuration (or at least useless since it would always reset), but I'm not entirely sure how such a check could actually be wired up. Maybe if the duty period was added to the properties that get injected so the compaction duty could pick it up or something?

There is also CoordinatorCompactionConfig to add to the confusion... not to mention druid.coordinator.kill.compaction.period which does live here...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had considered placing the config in the CoordinatorCompactionConfig instead, but decided against it because that config is dynamic, but changing that value would not necessarily update the expiry time of the iterator, so this seemed like a better UX

druid.coordinator.compaction. searchPolicyRefreshPeriod could use a better name. It's intent is to provide operators with the ability to choose when to re-build the view of the segment interval timeline to be considered for compaction. Today, every time the compact segments duty is run, the view is re-built. So the property does not need to be tied to the CompactSegments duty

Copy link
Copy Markdown
Member

@clintropolis clintropolis Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah the name doesn't really bother me, it was more of a general comment about the config of this stuff being all over the place and that I have no idea where something should actually live. Given that, I think here is probably as fine as anywhere else 😛

public abstract Duration getCompactionSearchPolicyRefreshPeriod();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
Expand Down Expand Up @@ -74,9 +75,13 @@ public class CompactSegments implements CoordinatorCustomDuty
static final String TOTAL_COUNT_OF_SEGMENTS_COMPACTED = "segmentCountCompacted";
static final String TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED = "segmentIntervalCompacted";

/** Must be synced with org.apache.druid.indexing.common.task.CompactionTask.TYPE. */
/**
* Must be synced with org.apache.druid.indexing.common.task.CompactionTask.TYPE.
*/
public static final String COMPACTION_TASK_TYPE = "compact";
/** Must be synced with org.apache.druid.indexing.common.task.Tasks.STORE_COMPACTION_STATE_KEY */
/**
* Must be synced with org.apache.druid.indexing.common.task.Tasks.STORE_COMPACTION_STATE_KEY
*/
public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";

private static final Logger LOG = new Logger(CompactSegments.class);
Expand Down Expand Up @@ -147,17 +152,21 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) response.getPayload();
DataSourceCompactionConfig dataSourceCompactionConfig = compactionConfigs.get(status.getDataSource());
if (dataSourceCompactionConfig != null && dataSourceCompactionConfig.getGranularitySpec() != null) {
Granularity configuredSegmentGranularity = dataSourceCompactionConfig.getGranularitySpec().getSegmentGranularity();
Granularity configuredSegmentGranularity = dataSourceCompactionConfig.getGranularitySpec()
.getSegmentGranularity();
if (configuredSegmentGranularity != null
&& compactionTaskQuery.getGranularitySpec() != null
&& !configuredSegmentGranularity.equals(compactionTaskQuery.getGranularitySpec().getSegmentGranularity())) {
&& !configuredSegmentGranularity.equals(compactionTaskQuery.getGranularitySpec()
.getSegmentGranularity())) {
// We will cancel active compaction task if segmentGranularity changes and we will need to
// re-compact the interval
LOG.info("Canceled task[%s] as task segmentGranularity is [%s] but compaction config "
+ "segmentGranularity is [%s]",
status.getId(),
compactionTaskQuery.getGranularitySpec().getSegmentGranularity(),
configuredSegmentGranularity);
LOG.info(
"Canceled task[%s] as task segmentGranularity is [%s] but compaction config "
+ "segmentGranularity is [%s]",
status.getId(),
compactionTaskQuery.getGranularitySpec().getSegmentGranularity(),
configuredSegmentGranularity
);
indexingServiceClient.cancelTask(status.getId());
continue;
}
Expand All @@ -184,8 +193,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
.addAll(intervals)
);

final CompactionSegmentIterator iterator =
policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction);
final Pair<CompactionSegmentIterator, Boolean> iteratorAndReset =
policy.resetIfNeeded(compactionConfigs, dataSources, intervalsToSkipCompaction);

int totalCapacity;
if (dynamicConfig.isUseAutoScaleSlots()) {
Expand Down Expand Up @@ -231,11 +240,15 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
compactionConfigs,
currentRunAutoCompactionSnapshotBuilders,
numAvailableCompactionTaskSlots,
iterator
iteratorAndReset.lhs,
iteratorAndReset.rhs
)
);
} else {
stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, 0, iterator));
stats.addToGlobalStat(COMPACTION_TASK_COUNT, 0);
if (iteratorAndReset.rhs) {
stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, iteratorAndReset.lhs));
}
}
} else {
LOG.info("compactionConfig is empty. Skip.");
Expand Down Expand Up @@ -300,7 +313,8 @@ private Map<String, List<Interval>> getLockedIntervalsToSkip(
static int findMaxNumTaskSlotsUsedByOneCompactionTask(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig)
{
if (isParallelMode(tuningConfig)) {
@Nullable Integer maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks();
@Nullable
Integer maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks();
// Max number of task slots used in parallel mode = maxNumConcurrentSubTasks + 1 (supervisor task)
return (maxNumConcurrentSubTasks == null ? 1 : maxNumConcurrentSubTasks) + 1;
} else {
Expand Down Expand Up @@ -349,7 +363,8 @@ private CoordinatorStats doRun(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
int numAvailableCompactionTaskSlots,
CompactionSegmentIterator iterator
CompactionSegmentIterator iterator,
boolean calculateStats
)
{
int numSubmittedTasks = 0;
Expand All @@ -367,7 +382,10 @@ private CoordinatorStats doRun(
k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
);
snapshotBuilder.incrementBytesCompacted(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum());
snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count());
snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream()
.map(DataSegment::getInterval)
.distinct()
.count());
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());

final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
Expand All @@ -390,7 +408,8 @@ private CoordinatorStats doRun(
LOG.warn("Cannot determine segmentGranularity from interval [%s]", interval);
}
} else {
LOG.warn("segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task");
LOG.warn(
"segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task");
}
} else {
segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity();
Expand Down Expand Up @@ -450,7 +469,11 @@ private CoordinatorStats doRun(
"coordinator-issued",
segmentsToCompact,
config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), config.getMetricsSpec() != null),
ClientCompactionTaskQueryTuningConfig.from(
config.getTuningConfig(),
config.getMaxRowsPerSegment(),

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [DataSourceCompactionConfig.getMaxRowsPerSegment](1) should be avoided because it has been deprecated.
config.getMetricsSpec() != null
),
granularitySpec,
dimensionsSpec,
config.getMetricsSpec(),
Expand All @@ -473,7 +496,9 @@ private CoordinatorStats doRun(
}
}

return makeStats(currentRunAutoCompactionSnapshotBuilders, numSubmittedTasks, iterator);
final CoordinatorStats stats = new CoordinatorStats();
stats.addToGlobalStat(COMPACTION_TASK_COUNT, numSubmittedTasks);
return calculateStats ? stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, iterator)) : stats;
}

private Map<String, Object> newAutoCompactionContext(@Nullable Map<String, Object> configuredContext)
Expand All @@ -487,19 +512,17 @@ private Map<String, Object> newAutoCompactionContext(@Nullable Map<String, Objec

private CoordinatorStats makeStats(
Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders,
int numCompactionTasks,
CompactionSegmentIterator iterator
)
{
final Map<String, AutoCompactionSnapshot> currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
final CoordinatorStats stats = new CoordinatorStats();
stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks);

// Iterate through all the remaining segments in the iterator.
// As these segments could be compacted but were not compacted due to lack of task slot, we will aggregates
// the statistic to the AwaitingCompaction statistics
while (iterator.hasNext()) {
final List<DataSegment> segmentsToCompact = iterator.next();

iterator.forEachRemainingSegmentsToCompact(segmentsToCompact -> {
if (!segmentsToCompact.isEmpty()) {
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
Expand All @@ -519,7 +542,7 @@ private CoordinatorStats makeStats(
);
snapshotBuilder.incrementSegmentCountAwaitingCompaction(segmentsToCompact.size());
}
}
});

// Statistics of all segments considered compacted after this run
Map<String, CompactionStatistics> allCompactedStatistics = iterator.totalCompactedStatistics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/**
* Segments in the lists which are the elements of this iterator are sorted according to the natural segment order
Expand All @@ -48,4 +49,9 @@ public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>
*/
Map<String, CompactionStatistics> totalSkippedStatistics();

/**
* Performs the given action over the remaining segments to compact in the iterator. Similar to
* {@link #forEachRemaining} however, this iteration does not advance the iterator.
*/
void forEachRemainingSegmentsToCompact(Consumer<List<DataSegment>> action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.server.coordinator.duty;

import org.apache.druid.java.util.common.Pair;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;
Expand All @@ -31,8 +32,22 @@
*/
public interface CompactionSegmentSearchPolicy
{
/**
* @return An iterator over the intervals that need to be compacted. The iterator may not be reset on every call.
* Callers who want to guarantee that the iterator will be reset, should call {@link #reset} instead.
*
* Implementations SHOULD ensure that calls to this method and {@link #reset} are thread safe.
*/
Pair<CompactionSegmentIterator, Boolean> resetIfNeeded(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> dataSources,
Map<String, List<Interval>> skipIntervals
);

/**
* Reset the current states of this policy. This method should be called whenever iterating starts.
*
* Implementations SHOULD ensure that calls to this method and {@link #resetIfNeeded} are thread safe.
*/
CompactionSegmentIterator reset(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -195,6 +196,17 @@ public Map<String, CompactionStatistics> totalSkippedStatistics()
return skippedSegments;
}

@Override
public void forEachRemainingSegmentsToCompact(Consumer<List<DataSegment>> action)
{
for (CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor : timelineIterators.values()) {
for (TimelineObjectHolder<String, DataSegment> holder : compactibleTimelineObjectHolderCursor.holders) {
List<DataSegment> candidates = compactibleTimelineObjectHolderCursor.getCandidates(holder);
action.accept(candidates);
}
}
}

@Override
public boolean hasNext()
{
Expand Down Expand Up @@ -310,6 +322,11 @@ public List<DataSegment> next()
throw new NoSuchElementException();
}
TimelineObjectHolder<String, DataSegment> timelineObjectHolder = holders.remove(holders.size() - 1);
return getCandidates(timelineObjectHolder);
}

private List<DataSegment> getCandidates(TimelineObjectHolder<String, DataSegment> timelineObjectHolder)
{
List<DataSegment> candidates = Streams.sequentialStreamFrom(timelineObjectHolder.getObject())
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,83 @@
package org.apache.druid.server.coordinator.duty;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;

import java.time.Clock;
import java.util.List;
import java.util.Map;

/**
* This policy searches segments for compaction from the newest one to oldest one.
* The {@link #resetIfNeeded} functionality is inspired by {@link com.google.common.base.Suppliers.ExpiringMemoizingSupplier}.
*/
public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
{
private final ObjectMapper objectMapper;
private final long durationMillis;
@GuardedBy("this")
private volatile NewestSegmentFirstIterator iterator;
// The special value 0 means "not yet initialized".
@GuardedBy("this")
private volatile long expirationMillis;
private final Clock clock;

@Inject
public NewestSegmentFirstPolicy(ObjectMapper objectMapper)
public NewestSegmentFirstPolicy(ObjectMapper objectMapper, DruidCoordinatorConfig config, Clock clock)
{
this.objectMapper = objectMapper;
this.durationMillis = config.getCompactionSearchPolicyRefreshPeriod().getMillis();
this.clock = clock;
Preconditions.checkArgument(durationMillis > 0);
}

@Override
public CompactionSegmentIterator reset(
public Pair<CompactionSegmentIterator, Boolean> resetIfNeeded(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> dataSources,
Map<String, List<Interval>> skipIntervals
)
{
return new NewestSegmentFirstIterator(objectMapper, compactionConfigs, dataSources, skipIntervals);
// This implementation was inspired by Suppliers.memoizeWithExpiration.
// resetIfNeeded and reset can be called from different threads.
long millis = expirationMillis;
long now = clock.millis();
if (millis == 0 || now - millis >= 0) {
synchronized (this) {
Comment thread
suneet-s marked this conversation as resolved.
if (millis == expirationMillis) {
NewestSegmentFirstIterator t = reset(compactionConfigs, dataSources, skipIntervals);
iterator = t;
// reset can be slow, so use the current time to set the new expiryexpiry
expirationMillis = clock.millis() + durationMillis;
return Pair.of(t, true);
}
}
}
return Pair.of(iterator, false);
}

@Override
public synchronized NewestSegmentFirstIterator reset(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> dataSources,
Map<String, List<Interval>> skipIntervals
)
{
NewestSegmentFirstIterator t = new NewestSegmentFirstIterator(
objectMapper,
compactionConfigs,
dataSources,
skipIntervals
);
iterator = t;
expirationMillis = clock.millis() + durationMillis;
return t;
}
}
Loading