Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Interval;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

/**
* Produce a stream of intervals generated by a given set of intervals as input and a given
Expand All @@ -51,19 +47,7 @@ public class IntervalsByGranularity
*/
public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
{
// eliminate dups, sort intervals:
Set<Interval> intervalSet = new HashSet<>(intervals);
List<Interval> inputIntervals = new ArrayList<>(intervals.size());
inputIntervals.addAll(intervalSet);
inputIntervals.sort(Comparators.intervalsByStartThenEnd());

// sanity check
if (JodaUtils.containOverlappingIntervals(inputIntervals)) {
throw new IAE("Intervals contain overlapping intervals [%s]", intervals);
}

// all good:
sortedNonOverlappingIntervals = inputIntervals;
this.sortedNonOverlappingIntervals = JodaUtils.condenseIntervals(intervals);
this.granularity = granularity;
}

Expand All @@ -73,9 +57,8 @@ public IntervalsByGranularity(Collection<Interval> intervals, Granularity granul
*/
public Iterator<Interval> granularityIntervalsIterator()
{
Iterator<Interval> ite;
if (sortedNonOverlappingIntervals.isEmpty()) {
ite = Collections.emptyIterator();
return Collections.emptyIterator();
} else {
// The filter after transform & concat is to remove duplicats.
// This can happen when condense left intervals that did not overlap but
Expand All @@ -85,7 +68,7 @@ public Iterator<Interval> granularityIntervalsIterator()
// intervals will be returned, both with the same value 2013-01-01T00:00:00.000Z/2013-02-01T00:00:00.000Z.
// Thus dups can be created given the right conditions....
final SettableSupplier<Interval> previous = new SettableSupplier<>();
ite = FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
return FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
.filter(interval -> {
if (previous.get() != null && previous.get().equals(interval)) {
return false;
Expand All @@ -94,7 +77,5 @@ public Iterator<Interval> granularityIntervalsIterator()
return true;
}).iterator();
}
return ite;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.Collections;
import java.util.Iterator;
Expand All @@ -34,8 +36,8 @@

public class IntervalsByGranularityTest
{
private static final long SECONDS_IN_YEAR = 31536000;

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void testTrivialIntervalExplosion()
Expand All @@ -46,17 +48,17 @@ public void testTrivialIntervalExplosion()

IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
Granularity.fromString("DAY")
Granularities.DAY
);

// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = getCount(granularityIntervals);
Assert.assertTrue(count == 62 + 365);
long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
Assert.assertEquals(62 + 365, count);

granularityIntervals = intervals.granularityIntervalsIterator();
count = getCountWithNoHasNext(granularityIntervals);
Assert.assertTrue(count == 62 + 365);
Assert.assertEquals(62 + 365, count);
}


Expand All @@ -69,13 +71,13 @@ public void testDups()

IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
Granularity.fromString("DAY")
Granularities.DAY
);

// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = getCount(granularityIntervals);
Assert.assertTrue(count == 61);
long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
Assert.assertEquals(61, count);
}


Expand All @@ -88,30 +90,37 @@ public void testCondenseForManyIntervals()
Interval first = Intervals.of("2012-01-01T00Z/P1Y");
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first),
Granularity.fromString("SECOND")
Granularities.SECOND
);
Assert.assertEquals(
ImmutableList.of(Intervals.of("2012-01-01T00Z/2013-01-01T00Z")),
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.granularityIntervalsIterator()))
);
}

/**
* This test iterates huge intervals (2.5 years) with the SECOND granularity.
* The motivation behind this test is ensuring that IntervalsByGranularity can handle
* these huge intervals with a tiny granularity. However, this test takes a long time
* to populate all intervals based on the SECOND granularity (more than 1 min), so
* is ignored by default. We should make this test not a unit test, but a load test.
*/
@Ignore
@Test
public void testIntervalExplosion()
public void testIterateHugeIntervalsWithTinyGranularity()
{
Interval first = Intervals.of("2012-01-01T00Z/2012-12-31T00Z");
Interval second = Intervals.of("2002-01-01T00Z/2002-12-31T00Z");
Interval third = Intervals.of("2021-01-01T00Z/2021-06-30T00Z");
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
Granularity.fromString("SECOND")
Granularities.SECOND
);

// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = getCount(granularityIntervals);
Assert.assertTrue(count == 78537600);

long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
Assert.assertEquals(78537600, count);
}

@Test
Expand All @@ -132,7 +141,6 @@ public void testSimpleEliminateRepeated()
ImmutableList.of(Intervals.of("2012-01-01T00Z/2012-02-01T00Z")),
ImmutableList.copyOf(intervals.granularityIntervalsIterator())
);

}

@Test
Expand Down Expand Up @@ -160,10 +168,9 @@ public void testALittleMoreComplexEliminateRepeated()
),
ImmutableList.copyOf(intervals.granularityIntervalsIterator())
);

}

@Test(expected = IAE.class)
@Test
public void testOverlappingShouldThrow()
{
List<Interval> inputIntervals = ImmutableList.of(
Expand All @@ -174,10 +181,13 @@ public void testOverlappingShouldThrow()

IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
Granularity.fromString("DAY")
Granularities.DAY
);
}

Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
Assert.assertEquals(14, count);
}

@Test
public void testWithGranularity()
Expand All @@ -190,13 +200,13 @@ public void testWithGranularity()

IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
Granularity.fromString("MONTH")
Granularities.MONTH
);

// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = getCount(granularityIntervals);
Assert.assertTrue(count == 2);
long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
Assert.assertEquals(2, count);
}

@Test(expected = UnsupportedOperationException.class)
Expand All @@ -223,7 +233,7 @@ public void testEmptyInput()
Assert.assertFalse(intervals.granularityIntervalsIterator().hasNext());
}

private long getCount(Iterator<Interval> granularityIntervalIterator)
private long verifyIteratorAndReturnIntervalCount(Iterator<Interval> granularityIntervalIterator)
{
long count = 0;
Interval previous = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -142,8 +144,6 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private final AggregatorFactory[] metricsSpec;
@Nullable
private final Granularity segmentGranularity;
@Nullable
private final ClientCompactionTaskGranularitySpec granularitySpec;
@Nullable
private final ParallelIndexTuningConfig tuningConfig;
Expand Down Expand Up @@ -207,7 +207,6 @@ public CompactionTask(

this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
// Prior to apache/druid#10843 users could specify segmentGranularity using `segmentGranularity`
// Now users should prefer to use `granularitySpec`
// In case users accidentally specify both, and they are conflicting, warn the user instead of proceeding
Expand Down Expand Up @@ -308,6 +307,7 @@ public AggregatorFactory[] getMetricsSpec()
return metricsSpec;
}

@JsonInclude(Include.NON_NULL)
@JsonProperty
@Nullable
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
Expand Down Expand Up @@ -798,6 +799,79 @@ public void testCompactThenAppend() throws Exception
Assert.assertEquals(expectedSegments, usedSegments);
}

@Test
public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompact() throws Exception
{
// This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911.
if (lockGranularity == LockGranularity.SEGMENT) {
return;
}

runIndexTask();

final Set<DataSegment> expectedSegments = new HashSet<>(
getStorageCoordinator().retrieveUsedSegmentsForIntervals(
DATA_SOURCE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
Segments.ONLY_VISIBLE
)
);

final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);

final Interval partialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
final CompactionTask partialCompactionTask = builder
.interval(partialInterval)
.segmentGranularity(Granularities.MINUTE)
.build();

final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = runTask(partialCompactionTask);
Assert.assertTrue(partialCompactionResult.lhs.isSuccess());
// All segments in the previous expectedSegments should still appear as they have larger segment granularity.
expectedSegments.addAll(partialCompactionResult.rhs);

final Set<DataSegment> segmentsAfterPartialCompaction = new HashSet<>(
getStorageCoordinator().retrieveUsedSegmentsForIntervals(
DATA_SOURCE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
Segments.ONLY_VISIBLE
)
);

Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);

final CompactionTask fullCompactionTask = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.segmentGranularity(null)
.build();

final Pair<TaskStatus, List<DataSegment>> fullCompactionResult = runTask(fullCompactionTask);
Assert.assertTrue(fullCompactionResult.lhs.isSuccess());

final List<DataSegment> segmentsAfterFullCompaction = new ArrayList<>(
getStorageCoordinator().retrieveUsedSegmentsForIntervals(
DATA_SOURCE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
Segments.ONLY_VISIBLE
)
);
segmentsAfterFullCompaction.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval())
);

Assert.assertEquals(3, segmentsAfterFullCompaction.size());
for (int i = 0; i < segmentsAfterFullCompaction.size(); i++) {
Assert.assertEquals(
Intervals.of(StringUtils.format("2014-01-01T%02d/2014-01-01T%02d", i, i + 1)),
segmentsAfterFullCompaction.get(i).getInterval()
);
}
}

@Test
public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
Expand All @@ -44,16 +43,12 @@ public abstract class BaseGranularitySpec implements GranularitySpec
public static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY;
public static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE;

protected List<Interval> inputIntervals;
protected final List<Interval> inputIntervals;
protected final Boolean rollup;

public BaseGranularitySpec(List<Interval> inputIntervals, Boolean rollup)
{
if (inputIntervals != null) {
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
} else {
this.inputIntervals = Collections.emptyList();
}
this.inputIntervals = inputIntervals == null ? Collections.emptyList() : inputIntervals;
this.rollup = rollup == null ? DEFAULT_ROLLUP : rollup;
}

Expand Down