From cec68f2665559fdc9f6111c539c587403e75fff0 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 29 Sep 2018 17:29:37 -0700 Subject: [PATCH 1/6] Add support keepSegmentGranularity for automatic compaction --- .../NewestSegmentFirstPolicyBenchmark.java | 2 + .../druid/java/util/common/Intervals.java | 5 + .../client/indexing/ClientCompactQuery.java | 17 +- .../indexing/HttpIndexingServiceClient.java | 12 +- .../indexing/IndexingServiceClient.java | 1 + .../DataSourceCompactionConfig.java | 76 ++-- .../DruidCoordinatorSegmentCompactor.java | 3 +- .../helper/NewestSegmentFirstIterator.java | 348 ++++++++---------- .../helper/SegmentCompactorUtil.java | 26 +- .../indexing/NoopIndexingServiceClient.java | 1 + .../DruidCoordinatorSegmentCompactorTest.java | 202 +++++++--- .../helper/NewestSegmentFirstPolicyTest.java | 88 +++-- 12 files changed, 458 insertions(+), 323 deletions(-) diff --git a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index 2b511adc259d..760c0a523a32 100644 --- a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -89,8 +89,10 @@ public void setup() dataSource, new DataSourceCompactionConfig( dataSource, + false, 0, targetCompactionSizeBytes, + targetCompactionSizeBytes, null, null, null, diff --git a/java-util/src/main/java/org/apache/druid/java/util/common/Intervals.java b/java-util/src/main/java/org/apache/druid/java/util/common/Intervals.java index 5502ff7b6eca..ed152f555751 100644 --- a/java-util/src/main/java/org/apache/druid/java/util/common/Intervals.java +++ b/java-util/src/main/java/org/apache/druid/java/util/common/Intervals.java @@ -38,6 +38,11 @@ public static Interval of(String interval) return new Interval(interval, ISOChronology.getInstanceUTC()); } + public static Interval of(String format, Object... formatArgs) + { + return of(StringUtils.format(format, formatArgs)); + } + public static boolean isEmpty(Interval interval) { return interval.getStart().equals(interval.getEnd()); diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java index 8cf4c791ee91..1fe2b0417ea1 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java @@ -31,6 +31,7 @@ public class ClientCompactQuery private final String dataSource; private final List segments; private final boolean keepSegmentGranularity; + private final Long targetCompactionSizeBytes; private final ClientCompactQueryTuningConfig tuningConfig; private final Map context; @@ -39,6 +40,7 @@ public ClientCompactQuery( @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments, @JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity, + @JsonProperty("targetCompactionSizeBytes") Long targetCompactionSizeBytes, @JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig, @JsonProperty("context") Map context ) @@ -46,6 +48,7 @@ public ClientCompactQuery( this.dataSource = dataSource; this.segments = segments; this.keepSegmentGranularity = keepSegmentGranularity; + this.targetCompactionSizeBytes = targetCompactionSizeBytes; this.tuningConfig = tuningConfig; this.context = context; } @@ -74,6 +77,12 @@ public boolean isKeepSegmentGranularity() return keepSegmentGranularity; } + @JsonProperty + public Long getTargetCompactionSizeBytes() + { + return targetCompactionSizeBytes; + } + @JsonProperty public ClientCompactQueryTuningConfig getTuningConfig() { @@ -90,10 +99,12 @@ public Map getContext() public String toString() { return "ClientCompactQuery{" + - "dataSource=" + dataSource + "'" + + "dataSource='" + dataSource + '\'' + ", segments=" + segments + + ", keepSegmentGranularity=" + keepSegmentGranularity + + ", targetCompactionSizeBytes=" + targetCompactionSizeBytes + ", tuningConfig=" + tuningConfig + - ", contexts=" + context + - "}"; + ", context=" + context + + '}'; } } diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index a3cf5c608c92..735d608060e7 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -97,6 +97,7 @@ public void upgradeSegment(DataSegment dataSegment) public String compactSegments( List segments, boolean keepSegmentGranularity, + long targetCompactionSizeBytes, int compactionTaskPriority, @Nullable ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context @@ -113,7 +114,16 @@ public String compactSegments( context = context == null ? new HashMap<>() : context; context.put("priority", compactionTaskPriority); - return runTask(new ClientCompactQuery(dataSource, segments, keepSegmentGranularity, tuningConfig, context)); + return runTask( + new ClientCompactQuery( + dataSource, + segments, + keepSegmentGranularity, + targetCompactionSizeBytes, + tuningConfig, + context + ) + ); } @Override diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index a15cd54af4db..ac8813826887 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -41,6 +41,7 @@ public interface IndexingServiceClient String compactSegments( List segments, boolean keepSegmentGranularity, + long targetCompactionSizeBytes, int compactionTaskPriority, @Nullable ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index 6bc13365e6b7..8a56e319b8f0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -35,15 +35,19 @@ public class DataSourceCompactionConfig // should be synchronized with Tasks.DEFAULT_MERGE_TASK_PRIORITY private static final int DEFAULT_COMPACTION_TASK_PRIORITY = 25; - private static final int DEFAULT_NUM_TARGET_COMPACTION_SEGMENTS = 150; + private static final boolean DEFAULT_KEEP_SEGMENT_GRANULARITY = true; + private static final long DEFAULT_INPUT_SEGMENT_SIZE_BYTES = 400 * 1024 * 1024; + private static final int DEFAULT_NUM_INPUT_SEGMENTS = 150; private static final Period DEFAULT_SKIP_OFFSET_FROM_LATEST = new Period("P1D"); private final String dataSource; + private final boolean keepSegmentGranularity; private final int taskPriority; + private final long inputSegmentSizeBytes; private final long targetCompactionSizeBytes; - // The number of compaction segments is limited because the byte size of a serialized task spec is limited by + // The number of input segments is limited because the byte size of a serialized task spec is limited by // RemoteTaskRunnerConfig.maxZnodeBytes. - private final int numTargetCompactionSegments; + private final int maxNumSegmentsToCompact; private final Period skipOffsetFromLatest; private final ClientCompactQueryTuningConfig tuningConfig; private final Map taskContext; @@ -51,30 +55,38 @@ public class DataSourceCompactionConfig @JsonCreator public DataSourceCompactionConfig( @JsonProperty("dataSource") String dataSource, + @JsonProperty("keepSegmentGranularity") Boolean keepSegmentGranularity, @JsonProperty("taskPriority") @Nullable Integer taskPriority, + @JsonProperty("inputSegmentSizeBytes") @Nullable Long inputSegmentSizeBytes, @JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes, - @JsonProperty("numTargetCompactionSegments") @Nullable Integer numTargetCompactionSegments, + @JsonProperty("maxNumSegmentsToCompact") @Nullable Integer maxNumSegmentsToCompact, @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest, @JsonProperty("tuningConfig") @Nullable ClientCompactQueryTuningConfig tuningConfig, @JsonProperty("taskContext") @Nullable Map taskContext ) { this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); - this.taskPriority = taskPriority == null ? - DEFAULT_COMPACTION_TASK_PRIORITY : - taskPriority; - this.targetCompactionSizeBytes = targetCompactionSizeBytes == null ? - DEFAULT_TARGET_COMPACTION_SIZE_BYTES : - targetCompactionSizeBytes; - this.numTargetCompactionSegments = numTargetCompactionSegments == null ? - DEFAULT_NUM_TARGET_COMPACTION_SEGMENTS : - numTargetCompactionSegments; + this.keepSegmentGranularity = keepSegmentGranularity == null + ? DEFAULT_KEEP_SEGMENT_GRANULARITY + : keepSegmentGranularity; + this.taskPriority = taskPriority == null + ? DEFAULT_COMPACTION_TASK_PRIORITY + : taskPriority; + this.inputSegmentSizeBytes = inputSegmentSizeBytes == null + ? DEFAULT_INPUT_SEGMENT_SIZE_BYTES + : inputSegmentSizeBytes; + this.targetCompactionSizeBytes = targetCompactionSizeBytes == null + ? DEFAULT_TARGET_COMPACTION_SIZE_BYTES + : targetCompactionSizeBytes; + this.maxNumSegmentsToCompact = maxNumSegmentsToCompact == null + ? DEFAULT_NUM_INPUT_SEGMENTS + : maxNumSegmentsToCompact; this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest; this.tuningConfig = tuningConfig; this.taskContext = taskContext; Preconditions.checkArgument( - this.numTargetCompactionSegments > 1, + this.maxNumSegmentsToCompact > 1, "numTargetCompactionSegments should be larger than 1" ); } @@ -85,6 +97,12 @@ public String getDataSource() return dataSource; } + @JsonProperty + public boolean isKeepSegmentGranularity() + { + return keepSegmentGranularity; + } + @JsonProperty public int getTaskPriority() { @@ -92,15 +110,21 @@ public int getTaskPriority() } @JsonProperty - public long getTargetCompactionSizeBytes() + public long getInputSegmentSizeBytes() { - return targetCompactionSizeBytes; + return inputSegmentSizeBytes; + } + + @JsonProperty + public int getMaxNumSegmentsToCompact() + { + return maxNumSegmentsToCompact; } @JsonProperty - public int getNumTargetCompactionSegments() + public long getTargetCompactionSizeBytes() { - return numTargetCompactionSegments; + return targetCompactionSizeBytes; } @JsonProperty @@ -140,15 +164,23 @@ public boolean equals(Object o) return false; } + if (keepSegmentGranularity != that.keepSegmentGranularity) { + return false; + } + if (taskPriority != that.taskPriority) { return false; } - if (targetCompactionSizeBytes != that.targetCompactionSizeBytes) { + if (inputSegmentSizeBytes != that.inputSegmentSizeBytes) { return false; } - if (numTargetCompactionSegments != that.numTargetCompactionSegments) { + if (maxNumSegmentsToCompact != that.maxNumSegmentsToCompact) { + return false; + } + + if (targetCompactionSizeBytes != that.targetCompactionSizeBytes) { return false; } @@ -168,9 +200,11 @@ public int hashCode() { return Objects.hash( dataSource, + keepSegmentGranularity, taskPriority, + inputSegmentSizeBytes, + maxNumSegmentsToCompact, targetCompactionSizeBytes, - numTargetCompactionSegments, skipOffsetFromLatest, tuningConfig, taskContext diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index 20943b875d35..b9b89229b402 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -154,7 +154,8 @@ private CoordinatorStats doRun( // find segments to be compacted. final String taskId = indexingServiceClient.compactSegments( segmentsToCompact, - false, + config.isKeepSegmentGranularity(), + config.getTargetCompactionSizeBytes(), config.getTaskPriority(), config.getTuningConfig(), config.getTaskContext() diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java index e5a19c5e3818..96d30feebecb 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java @@ -19,13 +19,10 @@ package org.apache.druid.server.coordinator.helper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -33,15 +30,13 @@ import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; -import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; -import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -63,15 +58,8 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator // dataSource -> intervalToFind // searchIntervals keeps track of the current state of which interval should be considered to search segments to // compact. - private final Map searchIntervals; - - // dataSource -> end dateTime of the initial searchInterval - // searchEndDates keeps the endDate of the initial searchInterval (the entire searchInterval). It's immutable and not - // changed once it's initialized. - // This is used to determine that we can expect more segments to be added for an interval in the future. If the end of - // the interval is same with searchEndDate, we can expect more segments to be added and discard the found segments for - // compaction in this run to further optimize the size of compact segments. See checkCompactableSizeForLastSegmentOrReturn(). - private final Map searchEndDates; + private final Map timelineIterators; + private final PriorityQueue queue = new PriorityQueue<>( (o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.interval, o1.interval) ); @@ -83,8 +71,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator { this.compactionConfigs = compactionConfigs; this.dataSources = dataSources; - this.searchIntervals = new HashMap<>(dataSources.size()); - this.searchEndDates = new HashMap<>(dataSources.size()); + this.timelineIterators = new HashMap<>(dataSources.size()); for (Entry> entry : dataSources.entrySet()) { final String dataSource = entry.getKey(); @@ -93,8 +80,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator if (config != null && !timeline.isEmpty()) { final Interval searchInterval = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest()); - searchIntervals.put(dataSource, searchInterval); - searchEndDates.put(dataSource, searchInterval.getEnd()); + timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval)); } } @@ -115,9 +101,7 @@ public Object2LongOpenHashMap remainingSegmentSizeBytes() { final Object2LongOpenHashMap resultMap = new Object2LongOpenHashMap<>(); resultMap.defaultReturnValue(UNKNOWN_REMAINING_SEGMENT_SIZE); - final Iterator iterator = queue.iterator(); - while (iterator.hasNext()) { - final QueueEntry entry = iterator.next(); + for (QueueEntry entry : queue) { final VersionedIntervalTimeline timeline = dataSources.get(entry.getDataSource()); final Interval interval = new Interval(timeline.first().getInterval().getStart(), entry.interval.getEnd()); @@ -165,41 +149,80 @@ public List next() /** * Find the next segments to compact for the given dataSource and add them to the queue. - * {@link #searchIntervals} is updated according to the found segments. That is, the interval of the found segments - * are removed from the searchInterval of the given dataSource. + * {@link #timelineIterators} is updated according to the found segments. That is, the found segments are removed from + * the timeline of the given dataSource. */ private void updateQueue(String dataSourceName, DataSourceCompactionConfig config) { - VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); - - if (timeline == null || timeline.isEmpty()) { - log.warn("Cannot find timeline for dataSource[%s]. Continue to the next dataSource", dataSourceName); - return; - } - - final Interval searchInterval = Preconditions.checkNotNull( - searchIntervals.get(dataSourceName), - "Cannot find intervals to find for dataSource[%s]", - dataSourceName - ); - final DateTime searchEnd = Preconditions.checkNotNull( - searchEndDates.get(dataSourceName), - "searchEndDate for dataSource[%s]", + final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor = Preconditions.checkNotNull( + timelineIterators.get(dataSourceName), + "Cannot find timeline for dataSource[%s]", dataSourceName ); - final Pair pair = findSegmentsToCompact( - timeline, - searchInterval, - searchEnd, + final SegmentsToCompact segmentsToCompact = findSegmentsToCompact( + compactibleTimelineObjectHolderCursor, config ); - final List segmentsToCompact = pair.rhs.getSegments(); - final Interval remainingSearchInterval = pair.lhs; - searchIntervals.put(dataSourceName, remainingSearchInterval); if (!segmentsToCompact.isEmpty()) { - queue.add(new QueueEntry(segmentsToCompact)); + queue.add(new QueueEntry(segmentsToCompact.segments)); + } + } + + /** + * Iterates the given {@link VersionedIntervalTimeline}. Only compactible {@link TimelineObjectHolder}s are returned, + * which means the holder always has at least one {@link DataSegment} and the total size of segments is larger than 0. + */ + private static class CompactibleTimelineObjectHolderCursor + { + private final List> holders; + + CompactibleTimelineObjectHolderCursor( + VersionedIntervalTimeline timeline, + Interval totalIntervalToSearch + ) + { + // TODO: lazy?? + this.holders = timeline + .lookup(totalIntervalToSearch) + .stream() + .filter(holder -> { + final List> chunks = Lists.newArrayList(holder.getObject().iterator()); + final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum(); + return chunks.size() > 0 + && partitionBytes > 0 + && totalIntervalToSearch.contains(chunks.get(0).getObject().getInterval()); + }) + .collect(Collectors.toList()); + } + + boolean hasNext() + { + return !holders.isEmpty(); + } + + /** + * Returns the latest holder. + */ + @Nullable + TimelineObjectHolder get() + { + if (holders.isEmpty()) { + return null; + } else { + return holders.get(holders.size() - 1); + } + } + + /** + * Removes the latest holder, so that {@link #get()} returns the next one. + */ + void next() + { + if (!holders.isEmpty()) { + holders.remove(holders.size() - 1); + } } } @@ -209,179 +232,95 @@ private void updateQueue(String dataSourceName, DataSourceCompactionConfig confi * looked up for the last one day of the given intervalToSearch, and the next day is searched again if the size of * found segments are not enough to compact. This is repeated until enough amount of segments are found. * - * @param timeline timeline of a dataSource - * @param intervalToSearch interval to search - * @param searchEnd the end of the whole searchInterval + * @param compactibleTimelineObjectHolderCursor timeline iterator * @param config compaction config * - * @return a pair of the reduced interval of (intervalToSearch - interval of found segments) and segments to compact + * @return segments to compact */ - @VisibleForTesting - static Pair findSegmentsToCompact( - final VersionedIntervalTimeline timeline, - final Interval intervalToSearch, - final DateTime searchEnd, + private static SegmentsToCompact findSegmentsToCompact( + final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor, final DataSourceCompactionConfig config ) { - final long targetCompactionSize = config.getTargetCompactionSizeBytes(); - final int numTargetSegments = config.getNumTargetCompactionSegments(); + final boolean keepSegmentGranularity = config.isKeepSegmentGranularity(); + final long inputSegmentSize = config.getInputSegmentSizeBytes(); + final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact(); final List segmentsToCompact = new ArrayList<>(); - Interval searchInterval = intervalToSearch; long totalSegmentsToCompactBytes = 0; - // Finds segments to compact together while iterating searchInterval from latest to oldest - while (!Intervals.isEmpty(searchInterval) - && totalSegmentsToCompactBytes < targetCompactionSize - && segmentsToCompact.size() < numTargetSegments) { - final Interval lookupInterval = SegmentCompactorUtil.getNextLoopupInterval(searchInterval); - // holders are sorted by their interval - final List> holders = timeline.lookup(lookupInterval); - - if (holders.isEmpty()) { - // We found nothing. Continue to the next interval. - searchInterval = SegmentCompactorUtil.removeIntervalFromEnd(searchInterval, lookupInterval); - continue; - } - - for (int i = holders.size() - 1; i >= 0; i--) { - final TimelineObjectHolder holder = holders.get(i); - final List> chunks = Lists.newArrayList(holder.getObject().iterator()); - final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum(); - if (chunks.size() == 0 || partitionBytes == 0) { - log.warn("Skip empty shard[%s]", holder); - continue; - } - - if (!intervalToSearch.contains(chunks.get(0).getObject().getInterval())) { - searchInterval = SegmentCompactorUtil.removeIntervalFromEnd( - searchInterval, - new Interval(chunks.get(0).getObject().getInterval().getStart(), searchInterval.getEnd()) - ); - continue; - } - - // Addition of the segments of a partition should be atomic. - if (SegmentCompactorUtil.isCompactible(targetCompactionSize, totalSegmentsToCompactBytes, partitionBytes) && - segmentsToCompact.size() + chunks.size() <= numTargetSegments) { - chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject())); - totalSegmentsToCompactBytes += partitionBytes; + // Finds segments to compact together while iterating timeline from latest to oldest + while (compactibleTimelineObjectHolderCursor.hasNext() + && totalSegmentsToCompactBytes < inputSegmentSize + && segmentsToCompact.size() < maxNumSegmentsToCompact) { + final TimelineObjectHolder timeChunkHolder = Preconditions.checkNotNull( + compactibleTimelineObjectHolderCursor.get(), + "timelineObjectHolder" + ); + final List> chunks = Lists.newArrayList(timeChunkHolder.getObject().iterator()); + final long timeChunkSizeBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum(); + + // The segments in a holder should be added all together or not. + if (SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, totalSegmentsToCompactBytes, timeChunkSizeBytes) + && SegmentCompactorUtil.isCompactibleNum(maxNumSegmentsToCompact, segmentsToCompact.size(), chunks.size()) + && (!keepSegmentGranularity || segmentsToCompact.size() == 0)) { + chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject())); + totalSegmentsToCompactBytes += timeChunkSizeBytes; + } else { + if (segmentsToCompact.size() > 1) { + // We found some segmens to compact and cannot add more. End here. + return new SegmentsToCompact(segmentsToCompact); } else { - if (segmentsToCompact.size() > 1) { - // We found some segmens to compact and cannot add more. End here. - return checkCompactableSizeForLastSegmentOrReturn( - segmentsToCompact, - totalSegmentsToCompactBytes, - timeline, - searchInterval, - searchEnd, - config + // (*) Discard segments found so far because we can't compact them anyway. + final int numSegmentsToCompact = segmentsToCompact.size(); + segmentsToCompact.clear(); + + if (!SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, 0, timeChunkSizeBytes)) { + // TODO: this should be changed to compact many small segments into a few large segments + final DataSegment segment = chunks.get(0).getObject(); + log.warn( + "shardSize[%d] for dataSource[%s] and interval[%s] is larger than inputSegmentSize[%d]." + + " Contitnue to the next shard.", + timeChunkSizeBytes, + segment.getDataSource(), + segment.getInterval(), + inputSegmentSize + ); + } else if (maxNumSegmentsToCompact < chunks.size()) { + final DataSegment segment = chunks.get(0).getObject(); + log.warn( + "The number of segments[%d] for dataSource[%s] and interval[%s] is larger than " + + "numTargetCompactSegments[%d]. If you see lots of shards are being skipped due to too many " + + "segments, consider increasing 'numTargetCompactionSegments' and " + + "'druid.indexer.runner.maxZnodeBytes'. Contitnue to the next shard.", + chunks.size(), + segment.getDataSource(), + segment.getInterval(), + maxNumSegmentsToCompact ); } else { - // (*) Discard segments found so far because we can't compact it anyway. - final int numSegmentsToCompact = segmentsToCompact.size(); - segmentsToCompact.clear(); - - if (!SegmentCompactorUtil.isCompactible(targetCompactionSize, 0, partitionBytes)) { - // TODO: this should be changed to compact many small segments into a few large segments - final DataSegment segment = chunks.get(0).getObject(); - log.warn( - "shardSize[%d] for dataSource[%s] and interval[%s] is larger than targetCompactionSize[%d]." - + " Contitnue to the next shard.", - partitionBytes, - segment.getDataSource(), - segment.getInterval(), - targetCompactionSize - ); - } else if (numTargetSegments < chunks.size()) { - final DataSegment segment = chunks.get(0).getObject(); - log.warn( - "The number of segments[%d] for dataSource[%s] and interval[%s] is larger than " - + "numTargetCompactSegments[%d]. If you see lots of shards are being skipped due to too many " - + "segments, consider increasing 'numTargetCompactionSegments' and " - + "'druid.indexer.runner.maxZnodeBytes'. Contitnue to the next shard.", - chunks.size(), - segment.getDataSource(), - segment.getInterval(), - numTargetSegments - ); + if (numSegmentsToCompact == 1) { + // We found a segment which is smaller than targetCompactionSize but too large to compact with other + // segments. Skip this one. + // Note that segmentsToCompact is already cleared at (*). + // TODO: handle this + chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject())); + totalSegmentsToCompactBytes = timeChunkSizeBytes; } else { - if (numSegmentsToCompact == 1) { - // We found a segment which is smaller than targetCompactionSize but too large to compact with other - // segments. Skip this one. - // Note that segmentsToCompact is already cleared at (*). - chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject())); - totalSegmentsToCompactBytes = partitionBytes; - } else { - throw new ISE( - "Cannot compact segments[%s]. shardBytes[%s], numSegments[%s]", - chunks.stream().map(PartitionChunk::getObject).collect(Collectors.toList()), - partitionBytes, - chunks.size() - ); - } + throw new ISE( + "Cannot compact segments[%s]. shardBytes[%s], numSegments[%s]", + chunks.stream().map(PartitionChunk::getObject).collect(Collectors.toList()), + timeChunkSizeBytes, + chunks.size() + ); } } } - - // Update searchInterval - searchInterval = SegmentCompactorUtil.removeIntervalFromEnd( - searchInterval, - new Interval(chunks.get(0).getObject().getInterval().getStart(), searchInterval.getEnd()) - ); } - } - if (segmentsToCompact.size() == 0 || segmentsToCompact.size() == 1) { - if (Intervals.isEmpty(searchInterval)) { - // We found nothing to compact. End here. - return Pair.of(intervalToSearch, new SegmentsToCompact(ImmutableList.of())); - } else { - // We found only 1 segment. Further find segments for the remaining interval. - return findSegmentsToCompact(timeline, searchInterval, searchEnd, config); - } + compactibleTimelineObjectHolderCursor.next(); } - return checkCompactableSizeForLastSegmentOrReturn( - segmentsToCompact, - totalSegmentsToCompactBytes, - timeline, - searchInterval, - searchEnd, - config - ); - } - - /** - * Check the found segments are enough to compact. If it's expected that more data will be added in the future for the - * interval of found segments, the found segments are skipped and remained to be considered again in the next - * coordinator run. Otherwise, simply returns a pair of the given searchInterval and found segments. - */ - private static Pair checkCompactableSizeForLastSegmentOrReturn( - final List segmentsToCompact, - final long totalSegmentsToCompactBytes, - final VersionedIntervalTimeline timeline, - final Interval searchInterval, - final DateTime searchEnd, - final DataSourceCompactionConfig config - ) - { - if (segmentsToCompact.size() > 0) { - // Check we have enough segments to compact. For realtime dataSources, we can expect more data to be added in the - // future, so we skip compaction for segments in this run if their size is not sufficiently large. - final DataSegment lastSegment = segmentsToCompact.get(segmentsToCompact.size() - 1); - if (lastSegment.getInterval().getEnd().equals(searchEnd) && - !SegmentCompactorUtil.isProperCompactionSize( - config.getTargetCompactionSizeBytes(), - totalSegmentsToCompactBytes - ) && - config.getNumTargetCompactionSegments() > segmentsToCompact.size()) { - // Ignore found segments and find again for the remaininig searchInterval. - return findSegmentsToCompact(timeline, searchInterval, searchEnd, config); - } - } - - return Pair.of(searchInterval, new SegmentsToCompact(segmentsToCompact)); + return new SegmentsToCompact(segmentsToCompact); } /** @@ -458,9 +397,14 @@ private static class SegmentsToCompact this.segments = segments; } - public List getSegments() + List getSegments() { return segments; } + + boolean isEmpty() + { + return segments.isEmpty(); + } } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java index f25d1f471caa..c6760b7715ae 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java @@ -20,42 +20,30 @@ package org.apache.druid.server.coordinator.helper; import com.google.common.base.Preconditions; -import org.joda.time.Duration; import org.joda.time.Interval; -import org.joda.time.Period; /** * Util class used by {@link DruidCoordinatorSegmentCompactor} and {@link CompactionSegmentSearchPolicy}. */ class SegmentCompactorUtil { - private static final Period LOOKUP_PERIOD = new Period("P1D"); - private static final Duration LOOKUP_DURATION = LOOKUP_PERIOD.toStandardDuration(); // Allow compaction of segments if totalSize(segments) <= remainingBytes * ALLOWED_MARGIN_OF_COMPACTION_SIZE private static final double ALLOWED_MARGIN_OF_COMPACTION_SIZE = .1; - static boolean isCompactible(long remainingBytes, long currentTotalBytes, long additionalBytes) + static boolean isCompactibleSize(long targetBytes, long currentTotalBytes, long additionalBytes) { - return remainingBytes * (1 + ALLOWED_MARGIN_OF_COMPACTION_SIZE) >= currentTotalBytes + additionalBytes; + return targetBytes * (1 + ALLOWED_MARGIN_OF_COMPACTION_SIZE) >= currentTotalBytes + additionalBytes; } - static boolean isProperCompactionSize(long targetCompactionSizeBytes, long totalBytesOfSegmentsToCompact) + static boolean isCompactibleNum(int numTargetSegments, int numCurrentSegments, int numAdditionalSegments) { - return targetCompactionSizeBytes * (1 - ALLOWED_MARGIN_OF_COMPACTION_SIZE) <= totalBytesOfSegmentsToCompact && - targetCompactionSizeBytes * (1 + ALLOWED_MARGIN_OF_COMPACTION_SIZE) >= totalBytesOfSegmentsToCompact; + return numCurrentSegments + numAdditionalSegments <= numTargetSegments; } - /** - * Return an interval for looking up for timeline. - * If {@code totalInterval} is larger than {@link #LOOKUP_PERIOD}, it returns an interval of {@link #LOOKUP_PERIOD} - * and the end of {@code totalInterval}. - */ - static Interval getNextLoopupInterval(Interval totalInterval) + static boolean isProperCompactionSize(long targetCompactionSizeBytes, long totalBytesOfSegmentsToCompact) { - final Duration givenDuration = totalInterval.toDuration(); - return givenDuration.isLongerThan(LOOKUP_DURATION) ? - new Interval(LOOKUP_PERIOD, totalInterval.getEnd()) : - totalInterval; + return targetCompactionSizeBytes * (1 - ALLOWED_MARGIN_OF_COMPACTION_SIZE) <= totalBytesOfSegmentsToCompact && + targetCompactionSizeBytes * (1 + ALLOWED_MARGIN_OF_COMPACTION_SIZE) >= totalBytesOfSegmentsToCompact; } /** diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index 664daf282d30..5e40c9476c7b 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -58,6 +58,7 @@ public void mergeSegments(List segments) public String compactSegments( List segments, boolean keepSegmentGranularity, + long targetCompactionSizeBytes, int compactionTaskPriority, @Nullable ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java index 57038a422268..42518ea135ed 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java @@ -65,6 +65,7 @@ public class DruidCoordinatorSegmentCompactorTest public String compactSegments( List segments, boolean keepSegmentGranularity, + long targetCompactionSizeBytes, int compactionTaskPriority, ClientCompactQueryTuningConfig tuningConfig, Map context @@ -129,28 +130,11 @@ public int getTotalWorkerCapacity() } }; - private List compactionConfigs; private Map> dataSources; @Before public void setup() { - compactionConfigs = new ArrayList<>(); - for (int i = 0; i < 3; i++) { - final String dataSource = DATA_SOURCE_PREFIX + i; - compactionConfigs.add( - new DataSourceCompactionConfig( - dataSource, - 0, - 50L, - null, - new Period("PT1H"), // smaller than segment interval - null, - null - ) - ); - } - dataSources = new HashMap<>(); for (int i = 0; i < 3; i++) { final String dataSource = DATA_SOURCE_PREFIX + i; @@ -213,8 +197,9 @@ private static DataSegment createSegment(String dataSource, int startDay, boolea } @Test - public void testRun() + public void testRunWithoutKeepSegmentGranularity() { + final boolean keepSegmentGranularity = false; final DruidCoordinatorSegmentCompactor compactor = new DruidCoordinatorSegmentCompactor(indexingServiceClient); final Supplier expectedVersionSupplier = new Supplier() @@ -233,7 +218,8 @@ public String get() // compact for 2017-01-08T12:00:00.000Z/2017-01-09T12:00:00.000Z assertCompactSegments( compactor, - Intervals.of(StringUtils.format("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", 8, 9)), + keepSegmentGranularity, + Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", 8, 9), expectedRemainingSegments, expectedCompactTaskCount, expectedVersionSupplier @@ -243,7 +229,8 @@ public String get() expectedRemainingSegments -= 40; assertCompactSegments( compactor, - Intervals.of(StringUtils.format("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", 4, 8)), + keepSegmentGranularity, + Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", 4, 8), expectedRemainingSegments, expectedCompactTaskCount, expectedVersionSupplier @@ -253,61 +240,112 @@ public String get() expectedRemainingSegments -= 40; assertCompactSegments( compactor, - Intervals.of(StringUtils.format("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", endDay - 1, endDay)), + keepSegmentGranularity, + Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT12:00:00", endDay - 1, endDay), expectedRemainingSegments, expectedCompactTaskCount, expectedVersionSupplier ); } - // Segments of the latest interval should not be compacted - for (int i = 0; i < 3; i++) { - final String dataSource = DATA_SOURCE_PREFIX + i; - final Interval interval = Intervals.of(StringUtils.format("2017-01-09T12:00:00/2017-01-10")); - List> holders = dataSources.get(dataSource).lookup(interval); - Assert.assertEquals(1, holders.size()); - for (TimelineObjectHolder holder : holders) { - List> chunks = Lists.newArrayList(holder.getObject()); - Assert.assertEquals(2, chunks.size()); - for (PartitionChunk chunk : chunks) { - DataSegment segment = chunk.getObject(); - Assert.assertEquals(interval, segment.getInterval()); - Assert.assertEquals("version", segment.getVersion()); - } + assertLastSegmentNotCompacted(compactor, keepSegmentGranularity); + } + + @Test + public void testRunWithKeepSegmentGranularity() + { + final boolean keepSegmentGranularity = true; + final DruidCoordinatorSegmentCompactor compactor = new DruidCoordinatorSegmentCompactor(indexingServiceClient); + + final Supplier expectedVersionSupplier = new Supplier() + { + private int i = 0; + + @Override + public String get() + { + return "newVersion_" + i++; } - } + }; + int expectedCompactTaskCount = 1; + int expectedRemainingSegments = 200; - // Emulating realtime dataSource - final String dataSource = DATA_SOURCE_PREFIX + 0; - addMoreData(dataSource, 9); + // compact for 2017-01-08T12:00:00.000Z/2017-01-09T12:00:00.000Z + assertCompactSegments( + compactor, + keepSegmentGranularity, + Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 9, 9), + expectedRemainingSegments, + expectedCompactTaskCount, + expectedVersionSupplier + ); + expectedRemainingSegments -= 20; + assertCompactSegments( + compactor, + keepSegmentGranularity, + Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 8, 9), + expectedRemainingSegments, + expectedCompactTaskCount, + expectedVersionSupplier + ); - CoordinatorStats stats = runCompactor(compactor); - Assert.assertEquals( - 1, - stats.getGlobalStat(DruidCoordinatorSegmentCompactor.COMPACT_TASK_COUNT) + // compact for 2017-01-07T12:00:00.000Z/2017-01-08T12:00:00.000Z + expectedRemainingSegments -= 20; + assertCompactSegments( + compactor, + keepSegmentGranularity, + Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 8, 8), + expectedRemainingSegments, + expectedCompactTaskCount, + expectedVersionSupplier + ); + expectedRemainingSegments -= 20; + assertCompactSegments( + compactor, + keepSegmentGranularity, + Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 4, 5), + expectedRemainingSegments, + expectedCompactTaskCount, + expectedVersionSupplier ); - addMoreData(dataSource, 10); + for (int endDay = 4; endDay > 1; endDay -= 1) { + expectedRemainingSegments -= 20; + assertCompactSegments( + compactor, + keepSegmentGranularity, + Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", endDay, endDay), + expectedRemainingSegments, + expectedCompactTaskCount, + expectedVersionSupplier + ); + expectedRemainingSegments -= 20; + assertCompactSegments( + compactor, + keepSegmentGranularity, + Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", endDay - 1, endDay), + expectedRemainingSegments, + expectedCompactTaskCount, + expectedVersionSupplier + ); + } - stats = runCompactor(compactor); - Assert.assertEquals( - 1, - stats.getGlobalStat(DruidCoordinatorSegmentCompactor.COMPACT_TASK_COUNT) - ); + assertLastSegmentNotCompacted(compactor, keepSegmentGranularity); } - private CoordinatorStats runCompactor(DruidCoordinatorSegmentCompactor compactor) + private CoordinatorStats runCompactor(DruidCoordinatorSegmentCompactor compactor, boolean keepSegmentGranularity) { DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams .newBuilder() .withDataSources(dataSources) - .withCompactionConfig(CoordinatorCompactionConfig.from(compactionConfigs)) + .withCompactionConfig(CoordinatorCompactionConfig.from(createCompactionConfigs(keepSegmentGranularity))) .build(); return compactor.run(params).getCoordinatorStats(); } private void assertCompactSegments( DruidCoordinatorSegmentCompactor compactor, + boolean keepSegmentGranularity, Interval expectedInterval, int expectedRemainingSegments, int expectedCompactTaskCount, @@ -315,7 +353,7 @@ private void assertCompactSegments( ) { for (int i = 0; i < 3; i++) { - final CoordinatorStats stats = runCompactor(compactor); + final CoordinatorStats stats = runCompactor(compactor, keepSegmentGranularity); Assert.assertEquals( expectedCompactTaskCount, stats.getGlobalStat(DruidCoordinatorSegmentCompactor.COMPACT_TASK_COUNT) @@ -356,6 +394,44 @@ private void assertCompactSegments( } } + private void assertLastSegmentNotCompacted(DruidCoordinatorSegmentCompactor compactor, boolean keepSegmentGranularity) + { + // Segments of the latest interval should not be compacted + for (int i = 0; i < 3; i++) { + final String dataSource = DATA_SOURCE_PREFIX + i; + final Interval interval = Intervals.of(StringUtils.format("2017-01-09T12:00:00/2017-01-10")); + List> holders = dataSources.get(dataSource).lookup(interval); + Assert.assertEquals(1, holders.size()); + for (TimelineObjectHolder holder : holders) { + List> chunks = Lists.newArrayList(holder.getObject()); + Assert.assertEquals(2, chunks.size()); + for (PartitionChunk chunk : chunks) { + DataSegment segment = chunk.getObject(); + Assert.assertEquals(interval, segment.getInterval()); + Assert.assertEquals("version", segment.getVersion()); + } + } + } + + // Emulating realtime dataSource + final String dataSource = DATA_SOURCE_PREFIX + 0; + addMoreData(dataSource, 9); + + CoordinatorStats stats = runCompactor(compactor, keepSegmentGranularity); + Assert.assertEquals( + 1, + stats.getGlobalStat(DruidCoordinatorSegmentCompactor.COMPACT_TASK_COUNT) + ); + + addMoreData(dataSource, 10); + + stats = runCompactor(compactor, keepSegmentGranularity); + Assert.assertEquals( + 1, + stats.getGlobalStat(DruidCoordinatorSegmentCompactor.COMPACT_TASK_COUNT) + ); + } + private void addMoreData(String dataSource, int day) { for (int i = 0; i < 2; i++) { @@ -373,4 +449,26 @@ private void addMoreData(String dataSource, int day) ); } } + + private static List createCompactionConfigs(boolean keepSegmentGranularity) + { + final List compactionConfigs = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + final String dataSource = DATA_SOURCE_PREFIX + i; + compactionConfigs.add( + new DataSourceCompactionConfig( + dataSource, + keepSegmentGranularity, + 0, + 50L, + 50L, + null, + new Period("PT1H"), // smaller than segment interval + null, + null + ) + ); + } + return compactionConfigs; + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java index 6530dce778b1..c53823901ab6 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java @@ -34,6 +34,8 @@ import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.ArrayList; import java.util.Arrays; @@ -41,13 +43,26 @@ import java.util.List; import java.util.stream.Collectors; +@RunWith(Parameterized.class) public class NewestSegmentFirstPolicyTest { private static final String DATA_SOURCE = "dataSource"; private static final long DEFAULT_SEGMENT_SIZE = 1000; private static final int DEFAULT_NUM_SEGMENTS_PER_SHARD = 4; + @Parameterized.Parameters(name = "keepSegmentGranularity = {0}") + public static Iterable constructorFeeder() + { + return ImmutableList.of(new Object[]{false}, new Object[]{true}); + } + private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(); + private final boolean keepSegmentGranularity; + + public NewestSegmentFirstPolicyTest(boolean keepSegmentGranularity) + { + this.keepSegmentGranularity = keepSegmentGranularity; + } @Test public void testLargeOffsetAndSmallSegmentInterval() @@ -98,29 +113,54 @@ public void testSmallOffsetAndLargeSegmentInterval() final List segments = iterator.next(); Assert.assertNotNull(segments); - Assert.assertEquals(8, segments.size()); - final List expectedIntervals = new ArrayList<>(segments.size()); - for (int i = 0; i < 4; i++) { - expectedIntervals.add(Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00")); - } - for (int i = 0; i < 4; i++) { - expectedIntervals.add(Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00")); - } - expectedIntervals.sort(Comparators.intervalsByStartThenEnd()); + if (keepSegmentGranularity) { + // If keepSegmentGranularity = true, the iterator returns the segments of only the next time chunk. + Assert.assertEquals(4, segments.size()); - Assert.assertEquals( - expectedIntervals, - segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()) - ); + final List expectedIntervals = new ArrayList<>(segments.size()); + for (int i = 0; i < 4; i++) { + expectedIntervals.add(Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00")); + } - assertCompactSegmentIntervals( - iterator, - segmentPeriod, - Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"), - Intervals.of("2017-11-16T05:00:00/2017-11-16T06:00:00"), - true - ); + Assert.assertEquals( + expectedIntervals, + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()) + ); + + assertCompactSegmentIntervals( + iterator, + segmentPeriod, + Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00"), + Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00"), + false + ); + } else { + // If keepSegmentGranularity = false, the returned segments can span over multiple time chunks. + Assert.assertEquals(8, segments.size()); + + final List expectedIntervals = new ArrayList<>(segments.size()); + for (int i = 0; i < 4; i++) { + expectedIntervals.add(Intervals.of("2017-11-16T06:00:00/2017-11-16T07:00:00")); + } + for (int i = 0; i < 4; i++) { + expectedIntervals.add(Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00")); + } + expectedIntervals.sort(Comparators.intervalsByStartThenEnd()); + + Assert.assertEquals( + expectedIntervals, + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()) + ); + + assertCompactSegmentIntervals( + iterator, + segmentPeriod, + Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"), + Intervals.of("2017-11-16T05:00:00/2017-11-16T06:00:00"), + true + ); + } } @Test @@ -176,9 +216,7 @@ public void testSmallNumTargetCompactionSegments() iterator, segmentPeriod, Intervals.of("2017-11-16T20:00:00/2017-11-16T21:00:00"), - // The last interval is not "2017-11-17T01:00:00/2017-11-17T02:00:00". This is because more segments are - // expected to be added for that interval. See NewestSegmentFirstIterator.returnIfCompactibleSize(). - Intervals.of("2017-11-17T00:00:00/2017-11-17T01:00:00"), + Intervals.of("2017-11-17T01:00:00/2017-11-17T02:00:00"), false ); @@ -447,7 +485,7 @@ private static VersionedIntervalTimeline createTimeline( return timeline; } - private static DataSourceCompactionConfig createCompactionConfig( + private DataSourceCompactionConfig createCompactionConfig( long targetCompactionSizeBytes, int numTargetCompactionSegments, Period skipOffsetFromLatest @@ -455,8 +493,10 @@ private static DataSourceCompactionConfig createCompactionConfig( { return new DataSourceCompactionConfig( DATA_SOURCE, + keepSegmentGranularity, 0, targetCompactionSizeBytes, + targetCompactionSizeBytes, numTargetCompactionSegments, skipOffsetFromLatest, null, From b8ffa5c1c6ba65a209b1f135ee253ed045e2de79 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sun, 30 Sep 2018 10:23:01 -0700 Subject: [PATCH 2/6] skip unknown dataSource --- .../helper/NewestSegmentFirstIterator.java | 9 ++++-- .../helper/NewestSegmentFirstPolicyTest.java | 30 +++++++++++++++++++ 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java index 96d30feebecb..013bd38922c2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java @@ -154,12 +154,15 @@ public List next() */ private void updateQueue(String dataSourceName, DataSourceCompactionConfig config) { - final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor = Preconditions.checkNotNull( - timelineIterators.get(dataSourceName), - "Cannot find timeline for dataSource[%s]", + final CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor = timelineIterators.get( dataSourceName ); + if (compactibleTimelineObjectHolderCursor == null) { + log.warn("Cannot find timeline for dataSource[%s]. Skip this dataSource", dataSourceName); + return; + } + final SegmentsToCompact segmentsToCompact = findSegmentsToCompact( compactibleTimelineObjectHolderCursor, config diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java index c53823901ab6..2952d20a141a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java @@ -396,6 +396,36 @@ public void testManySegmentsPerShard2() Assert.assertFalse(iterator.hasNext()); } + @Test + public void testSkipUnknownDataSource() + { + final String unknownDataSource = "unknown"; + final Period segmentPeriod = new Period("PT1H"); + final CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of( + unknownDataSource, + createCompactionConfig(10000, 100, new Period("P2D")), + DATA_SOURCE, + createCompactionConfig(10000, 100, new Period("P2D")) + ), + ImmutableMap.of( + DATA_SOURCE, + createTimeline( + new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod), + new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod) + ) + ) + ); + + assertCompactSegmentIntervals( + iterator, + segmentPeriod, + Intervals.of("2017-11-14T00:00:00/2017-11-14T01:00:00"), + Intervals.of("2017-11-15T03:00:00/2017-11-15T04:00:00"), + true + ); + } + private static void assertCompactSegmentIntervals( CompactionSegmentIterator iterator, Period segmentPeriod, From 8f49be4aa954dc23fb1a9eae34765890fdddddf0 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sun, 30 Sep 2018 12:16:53 -0700 Subject: [PATCH 3/6] ignore single semgnet to compact --- .../helper/NewestSegmentFirstIterator.java | 12 ++++++--- .../helper/NewestSegmentFirstPolicyTest.java | 27 +++++++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java index 013bd38922c2..498c81963037 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java @@ -168,7 +168,7 @@ private void updateQueue(String dataSourceName, DataSourceCompactionConfig confi config ); - if (!segmentsToCompact.isEmpty()) { + if (segmentsToCompact.size() > 1) { queue.add(new QueueEntry(segmentsToCompact.segments)); } } @@ -323,7 +323,11 @@ private static SegmentsToCompact findSegmentsToCompact( compactibleTimelineObjectHolderCursor.next(); } - return new SegmentsToCompact(segmentsToCompact); + if (segmentsToCompact.size() > 1) { + return new SegmentsToCompact(segmentsToCompact); + } else { + return new SegmentsToCompact(Collections.emptyList()); + } } /** @@ -405,9 +409,9 @@ List getSegments() return segments; } - boolean isEmpty() + int size() { - return segments.isEmpty(); + return segments.size(); } } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java index 2952d20a141a..1185820ccc3e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java @@ -426,6 +426,33 @@ public void testSkipUnknownDataSource() ); } + @Test + public void testIgnoreSingleSegmentToCompact() + { + final CompactionSegmentIterator iterator = policy.reset( + ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, 100, new Period("P1D"))), + ImmutableMap.of( + DATA_SOURCE, + createTimeline( + new SegmentGenerateSpec( + Intervals.of("2017-12-02T00:00:00/2017-12-03T00:00:00"), + new Period("P1D"), + 200, + 1 + ), + new SegmentGenerateSpec( + Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"), + new Period("P1D"), + 200, + 1 + ) + ) + ) + ); + + Assert.assertFalse(iterator.hasNext()); + } + private static void assertCompactSegmentIntervals( CompactionSegmentIterator iterator, Period segmentPeriod, From ac22ddadfaa0c5c8098b0d24eb37212548085d93 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 4 Oct 2018 01:25:42 -0700 Subject: [PATCH 4/6] add doc --- docs/content/configuration/index.md | 6 ++++-- docs/content/ingestion/compaction.md | 11 +++++++---- .../helper/NewestSegmentFirstIterator.java | 3 --- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 1d3a8e1c44c0..5d1604715fbb 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -782,9 +782,11 @@ A description of the compaction config is: |Property|Description|Required| |--------|-----------|--------| |`dataSource`|dataSource name to be compacted.|yes| +|`keepSegmentGranularity`|Set [keepSegmentGranularity](../ingestion/compaction.html) to true for compactionTask.|no (default = true)| |`taskPriority`|[Priority](../ingestion/tasks.html#task-priorities) of compact task.|no (default = 25)| -|`targetCompactionSizeBytes`|The target segment size of compaction. The actual size of a compact segment might be slightly larger or smaller than this value.|no (default = 838860800)| -|`numTargetCompactionSegments`|Max number of segments to compact together.|no (default = 150)| +|`inputSegmentSizeBytes`|Total input segment size of a compactionTask. The actual input size can be slightly larger than this value.|no (default = 419430400)| +|`targetCompactionSizeBytes`|The target segment size of compaction. The actual size of a compact segment might be slightly larger or smaller than this value.|no (default = 419430400)| +|`maxNumSegmentsToCompact`|Max number of segments to compact together.|no (default = 150)| |`skipOffsetFromLatest`|The offset for searching segments to be compacted. Strongly recommended to set for realtime dataSources. |no (default = "P1D")| |`tuningConfig`|Tuning config for compact tasks. See below [Compact Task TuningConfig](#compact-task-tuningconfig).|no| |`taskContext`|[Task context](../ingestion/tasks.html#task-context) for compact tasks.|no| diff --git a/docs/content/ingestion/compaction.md b/docs/content/ingestion/compaction.md index 73e5a947c121..640fb56bba7e 100644 --- a/docs/content/ingestion/compaction.md +++ b/docs/content/ingestion/compaction.md @@ -13,6 +13,8 @@ Compaction tasks merge all segments of the given interval. The syntax is: "dataSource": , "interval": , "dimensions" , + "keepSegmentGranularity": , + "targetCompactionSizeBytes": "tuningConfig" , "context": } @@ -22,9 +24,11 @@ Compaction tasks merge all segments of the given interval. The syntax is: |-----|-----------|--------| |`type`|Task type. Should be `compact`|Yes| |`id`|Task id|No| -|`dataSource`|dataSource name to be compacted|Yes| -|`interval`|interval of segments to be compacted|Yes| -|`dimensions`|custom dimensionsSpec. compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No| +|`dataSource`|DataSource name to be compacted|Yes| +|`interval`|Interval of segments to be compacted|Yes| +|`dimensions`|Custom dimensionsSpec. compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No| +|`keepSegmentGranularity`|If set to true, compactionTask will keep the time chunk boundaries and merge segments only if they fall into the same time chunk.|No (default = true)| +|`targetCompactionSizeBytes`|Target segment size after comapction. Cannot be used with `targetPartitionSize`, `maxTotalRows`, and `numShards` in tuningConfig.|No| |`tuningConfig`|[Index task tuningConfig](../ingestion/native_tasks.html#tuningconfig)|No| |`context`|[Task context](../ingestion/locking-and-priority.html#task-context)|No| @@ -62,4 +66,3 @@ your own ordering and types, you can specify a custom `dimensionsSpec` in the co - Roll-up: the output segment is rolled up only when `rollup` is set for all input segments. See [Roll-up](../ingestion/index.html#rollup) for more details. You can check that your segments are rolled up or not by using [Segment Metadata Queries](../querying/segmentmetadataquery.html#analysistypes). -- Partitioning: The compaction task is a special form of native batch indexing task, so it always uses hash-based partitioning on the full set of dimensions. \ No newline at end of file diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java index 498c81963037..7c9ea3b8b949 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java @@ -186,7 +186,6 @@ private static class CompactibleTimelineObjectHolderCursor Interval totalIntervalToSearch ) { - // TODO: lazy?? this.holders = timeline .lookup(totalIntervalToSearch) .stream() @@ -278,7 +277,6 @@ private static SegmentsToCompact findSegmentsToCompact( segmentsToCompact.clear(); if (!SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, 0, timeChunkSizeBytes)) { - // TODO: this should be changed to compact many small segments into a few large segments final DataSegment segment = chunks.get(0).getObject(); log.warn( "shardSize[%d] for dataSource[%s] and interval[%s] is larger than inputSegmentSize[%d]." @@ -305,7 +303,6 @@ private static SegmentsToCompact findSegmentsToCompact( // We found a segment which is smaller than targetCompactionSize but too large to compact with other // segments. Skip this one. // Note that segmentsToCompact is already cleared at (*). - // TODO: handle this chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject())); totalSegmentsToCompactBytes = timeChunkSizeBytes; } else { From 0a76a284aa1b7989525a6d28dea1a9050bff50a7 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sun, 7 Oct 2018 00:31:08 -0700 Subject: [PATCH 5/6] address comments --- .../helper/NewestSegmentFirstIterator.java | 19 +++++++------------ .../helper/SegmentCompactorUtil.java | 6 ------ 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java index 7c9ea3b8b949..f65647ac3d9d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java @@ -168,7 +168,7 @@ private void updateQueue(String dataSourceName, DataSourceCompactionConfig confi config ); - if (segmentsToCompact.size() > 1) { + if (segmentsToCompact.getSize() > 1) { queue.add(new QueueEntry(segmentsToCompact.segments)); } } @@ -280,7 +280,7 @@ private static SegmentsToCompact findSegmentsToCompact( final DataSegment segment = chunks.get(0).getObject(); log.warn( "shardSize[%d] for dataSource[%s] and interval[%s] is larger than inputSegmentSize[%d]." - + " Contitnue to the next shard.", + + " Continue to the next shard.", timeChunkSizeBytes, segment.getDataSource(), segment.getInterval(), @@ -292,7 +292,7 @@ private static SegmentsToCompact findSegmentsToCompact( "The number of segments[%d] for dataSource[%s] and interval[%s] is larger than " + "numTargetCompactSegments[%d]. If you see lots of shards are being skipped due to too many " + "segments, consider increasing 'numTargetCompactionSegments' and " - + "'druid.indexer.runner.maxZnodeBytes'. Contitnue to the next shard.", + + "'druid.indexer.runner.maxZnodeBytes'. Continue to the next shard.", chunks.size(), segment.getDataSource(), segment.getInterval(), @@ -375,7 +375,7 @@ private static class QueueEntry private final Interval interval; // whole interval for all segments private final List segments; - QueueEntry(List segments) + private QueueEntry(List segments) { Preconditions.checkArgument(segments != null && !segments.isEmpty()); Collections.sort(segments); @@ -386,7 +386,7 @@ private static class QueueEntry this.segments = segments; } - String getDataSource() + private String getDataSource() { return segments.get(0).getDataSource(); } @@ -396,17 +396,12 @@ private static class SegmentsToCompact { private final List segments; - SegmentsToCompact(List segments) + private SegmentsToCompact(List segments) { this.segments = segments; } - List getSegments() - { - return segments; - } - - int size() + private int getSize() { return segments.size(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java index c6760b7715ae..0533affda8b6 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java @@ -40,12 +40,6 @@ static boolean isCompactibleNum(int numTargetSegments, int numCurrentSegments, i return numCurrentSegments + numAdditionalSegments <= numTargetSegments; } - static boolean isProperCompactionSize(long targetCompactionSizeBytes, long totalBytesOfSegmentsToCompact) - { - return targetCompactionSizeBytes * (1 - ALLOWED_MARGIN_OF_COMPACTION_SIZE) <= totalBytesOfSegmentsToCompact && - targetCompactionSizeBytes * (1 + ALLOWED_MARGIN_OF_COMPACTION_SIZE) >= totalBytesOfSegmentsToCompact; - } - /** * Removes {@code smallInterval} from {@code largeInterval}. The end of both intervals should be same. * From 918ded3fbb41a5bf1426cf41a1a0ad881bc625cc Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sun, 7 Oct 2018 00:34:36 -0700 Subject: [PATCH 6/6] address comment --- docs/content/configuration/index.md | 2 +- .../apache/druid/indexing/common/task/CompactionTask.java | 2 +- .../server/coordinator/helper/SegmentCompactorUtil.java | 5 +---- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 5d1604715fbb..5a75b16036c8 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -784,7 +784,7 @@ A description of the compaction config is: |`dataSource`|dataSource name to be compacted.|yes| |`keepSegmentGranularity`|Set [keepSegmentGranularity](../ingestion/compaction.html) to true for compactionTask.|no (default = true)| |`taskPriority`|[Priority](../ingestion/tasks.html#task-priorities) of compact task.|no (default = 25)| -|`inputSegmentSizeBytes`|Total input segment size of a compactionTask. The actual input size can be slightly larger than this value.|no (default = 419430400)| +|`inputSegmentSizeBytes`|Total input segment size of a compactionTask.|no (default = 419430400)| |`targetCompactionSizeBytes`|The target segment size of compaction. The actual size of a compact segment might be slightly larger or smaller than this value.|no (default = 419430400)| |`maxNumSegmentsToCompact`|Max number of segments to compact together.|no (default = 150)| |`skipOffsetFromLatest`|The offset for searching segments to be compacted. Strongly recommended to set for realtime dataSources. |no (default = "P1D")| diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index e577161904bd..27df0e720da4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -161,7 +161,7 @@ public CompactionTask( this.tuningConfig = tuningConfig; this.jsonMapper = jsonMapper; this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments); - this.partitionConfigurationManager = new PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig); + this.partitionConfigurationManager = new PartitionConfigurationManager(this.targetCompactionSizeBytes, tuningConfig); this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java index 0533affda8b6..5dafe3fec18e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java @@ -27,12 +27,9 @@ */ class SegmentCompactorUtil { - // Allow compaction of segments if totalSize(segments) <= remainingBytes * ALLOWED_MARGIN_OF_COMPACTION_SIZE - private static final double ALLOWED_MARGIN_OF_COMPACTION_SIZE = .1; - static boolean isCompactibleSize(long targetBytes, long currentTotalBytes, long additionalBytes) { - return targetBytes * (1 + ALLOWED_MARGIN_OF_COMPACTION_SIZE) >= currentTotalBytes + additionalBytes; + return currentTotalBytes + additionalBytes <= targetBytes; } static boolean isCompactibleNum(int numTargetSegments, int numCurrentSegments, int numAdditionalSegments)