From 53cc3b94a95ed51b9022568a237b287ffe8edeb2 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 20 Feb 2021 12:43:05 -0800 Subject: [PATCH 1/3] Allow overlapping intervals for the compaction task --- .../granularity/IntervalsByGranularity.java | 25 +------ .../common/IntervalsByGranularityTest.java | 60 ++++++++------- .../indexing/common/task/CompactionTask.java | 6 +- .../common/task/CompactionTaskRunTest.java | 75 +++++++++++++++++++ .../granularity/BaseGranularitySpec.java | 9 +-- 5 files changed, 118 insertions(+), 57 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java index 7065535eeb4f..ff076d46752e 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java +++ b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java @@ -23,16 +23,12 @@ import org.apache.druid.common.guava.SettableSupplier; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.JodaUtils; -import org.apache.druid.java.util.common.guava.Comparators; import org.joda.time.Interval; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Set; /** * Produce a stream of intervals generated by a given set of intervals as input and a given @@ -51,19 +47,7 @@ public class IntervalsByGranularity */ public IntervalsByGranularity(Collection intervals, Granularity granularity) { - // eliminate dups, sort intervals: - Set intervalSet = new HashSet<>(intervals); - List inputIntervals = new ArrayList<>(intervals.size()); - inputIntervals.addAll(intervalSet); - inputIntervals.sort(Comparators.intervalsByStartThenEnd()); - - // sanity check - if (JodaUtils.containOverlappingIntervals(inputIntervals)) { - throw new IAE("Intervals contain overlapping intervals [%s]", intervals); - } - - // all good: - sortedNonOverlappingIntervals = inputIntervals; + this.sortedNonOverlappingIntervals = JodaUtils.condenseIntervals(intervals); this.granularity = granularity; } @@ -73,9 +57,8 @@ public IntervalsByGranularity(Collection intervals, Granularity granul */ public Iterator granularityIntervalsIterator() { - Iterator ite; if (sortedNonOverlappingIntervals.isEmpty()) { - ite = Collections.emptyIterator(); + return Collections.emptyIterator(); } else { // The filter after transform & concat is to remove duplicats. // This can happen when condense left intervals that did not overlap but @@ -85,7 +68,7 @@ public Iterator granularityIntervalsIterator() // intervals will be returned, both with the same value 2013-01-01T00:00:00.000Z/2013-02-01T00:00:00.000Z. // Thus dups can be created given the right conditions.... final SettableSupplier previous = new SettableSupplier<>(); - ite = FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable) + return FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable) .filter(interval -> { if (previous.get() != null && previous.get().equals(interval)) { return false; @@ -94,7 +77,5 @@ public Iterator granularityIntervalsIterator() return true; }).iterator(); } - return ite; } - } diff --git a/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java index a38e6d5dbe8a..ee01aa028eca 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java @@ -21,11 +21,13 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.Collections; import java.util.Iterator; @@ -34,8 +36,8 @@ public class IntervalsByGranularityTest { - private static final long SECONDS_IN_YEAR = 31536000; - + @Rule + public ExpectedException expectedException = ExpectedException.none(); @Test public void testTrivialIntervalExplosion() @@ -46,17 +48,17 @@ public void testTrivialIntervalExplosion() IntervalsByGranularity intervals = new IntervalsByGranularity( ImmutableList.of(first, second, third), - Granularity.fromString("DAY") + Granularities.DAY ); // get count: Iterator granularityIntervals = intervals.granularityIntervalsIterator(); - long count = getCount(granularityIntervals); - Assert.assertTrue(count == 62 + 365); + long count = verifyIteratorAndReturnIntervalCount(granularityIntervals); + Assert.assertEquals(62 + 365, count); granularityIntervals = intervals.granularityIntervalsIterator(); count = getCountWithNoHasNext(granularityIntervals); - Assert.assertTrue(count == 62 + 365); + Assert.assertEquals(62 + 365, count); } @@ -69,13 +71,13 @@ public void testDups() IntervalsByGranularity intervals = new IntervalsByGranularity( ImmutableList.of(first, second, third), - Granularity.fromString("DAY") + Granularities.DAY ); // get count: Iterator granularityIntervals = intervals.granularityIntervalsIterator(); - long count = getCount(granularityIntervals); - Assert.assertTrue(count == 61); + long count = verifyIteratorAndReturnIntervalCount(granularityIntervals); + Assert.assertEquals(61, count); } @@ -88,7 +90,7 @@ public void testCondenseForManyIntervals() Interval first = Intervals.of("2012-01-01T00Z/P1Y"); IntervalsByGranularity intervals = new IntervalsByGranularity( ImmutableList.of(first), - Granularity.fromString("SECOND") + Granularities.SECOND ); Assert.assertEquals( ImmutableList.of(Intervals.of("2012-01-01T00Z/2013-01-01T00Z")), @@ -96,22 +98,29 @@ public void testCondenseForManyIntervals() ); } + /** + * This test iterates huge intervals (2.5 years) with the SECOND granularity. + * The motivation behind this test is ensuring that IntervalsByGranularity can handle + * these huge intervals with a tiny granularity. However, this test takes a long time + * to populate all intervals based on the SECOND granularity (more than 1 min), so + * is ignored by default. We should make this test not a unit test, but a load test. + */ + @Ignore @Test - public void testIntervalExplosion() + public void testIterateHugeIntervalsWithTinyGranularity() { Interval first = Intervals.of("2012-01-01T00Z/2012-12-31T00Z"); Interval second = Intervals.of("2002-01-01T00Z/2002-12-31T00Z"); Interval third = Intervals.of("2021-01-01T00Z/2021-06-30T00Z"); IntervalsByGranularity intervals = new IntervalsByGranularity( ImmutableList.of(first, second, third), - Granularity.fromString("SECOND") + Granularities.SECOND ); // get count: Iterator granularityIntervals = intervals.granularityIntervalsIterator(); - long count = getCount(granularityIntervals); - Assert.assertTrue(count == 78537600); - + long count = verifyIteratorAndReturnIntervalCount(granularityIntervals); + Assert.assertEquals(78537600, count); } @Test @@ -132,7 +141,6 @@ public void testSimpleEliminateRepeated() ImmutableList.of(Intervals.of("2012-01-01T00Z/2012-02-01T00Z")), ImmutableList.copyOf(intervals.granularityIntervalsIterator()) ); - } @Test @@ -160,10 +168,9 @@ public void testALittleMoreComplexEliminateRepeated() ), ImmutableList.copyOf(intervals.granularityIntervalsIterator()) ); - } - @Test(expected = IAE.class) + @Test public void testOverlappingShouldThrow() { List inputIntervals = ImmutableList.of( @@ -174,10 +181,13 @@ public void testOverlappingShouldThrow() IntervalsByGranularity intervals = new IntervalsByGranularity( inputIntervals, - Granularity.fromString("DAY") + Granularities.DAY ); - } + Iterator granularityIntervals = intervals.granularityIntervalsIterator(); + long count = verifyIteratorAndReturnIntervalCount(granularityIntervals); + Assert.assertEquals(14, count); + } @Test public void testWithGranularity() @@ -190,13 +200,13 @@ public void testWithGranularity() IntervalsByGranularity intervals = new IntervalsByGranularity( inputIntervals, - Granularity.fromString("MONTH") + Granularities.MONTH ); // get count: Iterator granularityIntervals = intervals.granularityIntervalsIterator(); - long count = getCount(granularityIntervals); - Assert.assertTrue(count == 2); + long count = verifyIteratorAndReturnIntervalCount(granularityIntervals); + Assert.assertEquals(2, count); } @Test(expected = UnsupportedOperationException.class) @@ -223,7 +233,7 @@ public void testEmptyInput() Assert.assertFalse(intervals.granularityIntervalsIterator().hasNext()); } - private long getCount(Iterator granularityIntervalIterator) + private long verifyIteratorAndReturnIntervalCount(Iterator granularityIntervalIterator) { long count = 0; Interval previous = null; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 276b1761281a..c5737362ba96 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -139,8 +141,6 @@ public class CompactionTask extends AbstractBatchIndexTask @Nullable private final AggregatorFactory[] metricsSpec; @Nullable - private final Granularity segmentGranularity; - @Nullable private final GranularitySpec granularitySpec; @Nullable private final ParallelIndexTuningConfig tuningConfig; @@ -204,7 +204,6 @@ public CompactionTask( this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec; this.metricsSpec = metricsSpec; - this.segmentGranularity = segmentGranularity; if (granularitySpec == null && segmentGranularity != null) { this.granularitySpec = new UniformGranularitySpec( segmentGranularity, @@ -296,6 +295,7 @@ public AggregatorFactory[] getMetricsSpec() return metricsSpec; } + @JsonInclude(Include.NON_NULL) @JsonProperty @Nullable @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 0e287721c440..26170ddcf784 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -55,6 +55,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; @@ -106,6 +107,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -620,6 +622,79 @@ public void testCompactThenAppend() throws Exception Assert.assertEquals(expectedSegments, usedSegments); } + @Test + public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompact() throws Exception + { + // This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911. + if (lockGranularity == LockGranularity.SEGMENT) { + return; + } + + runIndexTask(); + + final Set expectedSegments = new HashSet<>( + getStorageCoordinator().retrieveUsedSegmentsForIntervals( + DATA_SOURCE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")), + Segments.ONLY_VISIBLE + ) + ); + + final Builder builder = new Builder( + DATA_SOURCE, + segmentLoaderFactory, + RETRY_POLICY_FACTORY + ); + + final Interval partialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00"); + final CompactionTask partialCompactionTask = builder + .interval(partialInterval) + .segmentGranularity(Granularities.MINUTE) + .build(); + + final Pair> partialCompactionResult = runTask(partialCompactionTask); + Assert.assertTrue(partialCompactionResult.lhs.isSuccess()); + // All segments in the previous expectedSegments should still appear as they have larger segment granularity. + expectedSegments.addAll(partialCompactionResult.rhs); + + final Set segmentsAfterPartialCompaction = new HashSet<>( + getStorageCoordinator().retrieveUsedSegmentsForIntervals( + DATA_SOURCE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")), + Segments.ONLY_VISIBLE + ) + ); + + Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction); + + final CompactionTask fullCompactionTask = builder + .interval(Intervals.of("2014-01-01/2014-01-02")) + .segmentGranularity(null) + .build(); + + final Pair> fullCompactionResult = runTask(fullCompactionTask); + Assert.assertTrue(fullCompactionResult.lhs.isSuccess()); + + final List segmentsAfterFullCompaction = new ArrayList<>( + getStorageCoordinator().retrieveUsedSegmentsForIntervals( + DATA_SOURCE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")), + Segments.ONLY_VISIBLE + ) + ); + segmentsAfterFullCompaction.sort(( + s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()) + ); + + Assert.assertEquals(3, segmentsAfterFullCompaction.size()); + for (int i = 0; i < segmentsAfterFullCompaction.size(); i++) { + Assert.assertEquals( + Intervals.of(StringUtils.format("2014-01-01T%02d/2014-01-01T%02d", i, i + 1)), + segmentsAfterFullCompaction.get(i).getInterval() + ); + } + } + @Test public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws Exception { diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java index 779952b57793..ac9fdcae7e16 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.granularity.Granularities; @@ -44,16 +43,12 @@ public abstract class BaseGranularitySpec implements GranularitySpec public static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY; public static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE; - protected List inputIntervals; + protected final List inputIntervals; protected final Boolean rollup; public BaseGranularitySpec(List inputIntervals, Boolean rollup) { - if (inputIntervals != null) { - this.inputIntervals = ImmutableList.copyOf(inputIntervals); - } else { - this.inputIntervals = Collections.emptyList(); - } + this.inputIntervals = inputIntervals == null ? Collections.emptyList() : inputIntervals; this.rollup = rollup == null ? DEFAULT_ROLLUP : rollup; } From 487b84329c31ac9ea6e32f4bea34dbab9024392e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 22 Feb 2021 09:37:49 -0800 Subject: [PATCH 2/3] unused import --- .../apache/druid/indexing/common/task/CompactionTaskRunTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 26170ddcf784..bf3a37bd0115 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -107,7 +107,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; From 1671cdb233b650b7ba64fbcb4c615ec9b3ace9eb Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 22 Feb 2021 10:22:07 -0800 Subject: [PATCH 3/3] line indentation --- .../druid/indexing/common/task/CompactionTaskRunTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index bf3a37bd0115..73a5bae2f6f6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -681,8 +681,8 @@ public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullInterva Segments.ONLY_VISIBLE ) ); - segmentsAfterFullCompaction.sort(( - s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()) + segmentsAfterFullCompaction.sort( + (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()) ); Assert.assertEquals(3, segmentsAfterFullCompaction.size());