Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.joda.time.format.DateTimeFormatter;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
Expand All @@ -40,6 +41,30 @@

public abstract class Granularity implements Cacheable
{

public static Comparator<Granularity> IS_FINER_THAN = new Comparator<Granularity>()
{
@Override
/**
* Decide whether this granularity is finer than the other granularity
*
* @param left The left granularity
* @param right The right granularity
* @return -1 if left granularity is finer, 0 if it is the same, 1 if it is greater
*/
public int compare(Granularity left, Granularity right)
{
long leftDuration = left.bucket(DateTimes.EPOCH).toDurationMillis();
long rightDuration = right.bucket(DateTimes.EPOCH).toDurationMillis();
if (leftDuration < rightDuration) {
return -1;
} else if (leftDuration == rightDuration) {
return 0;
} else {
return 1;
}
}
};
/**
* Default patterns for parsing paths.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
public class GranularityTest
{

final Granularity NONE = Granularities.NONE;
final Granularity SECOND = Granularities.SECOND;
final Granularity MINUTE = Granularities.MINUTE;
final Granularity HOUR = Granularities.HOUR;
Expand All @@ -50,6 +51,7 @@ public class GranularityTest
final Granularity WEEK = Granularities.WEEK;
final Granularity MONTH = Granularities.MONTH;
final Granularity YEAR = Granularities.YEAR;
final Granularity ALL = Granularities.ALL;

@Test
public void testHiveFormat()
Expand Down Expand Up @@ -809,6 +811,26 @@ public void testIncrementOverSpringForward()
);
}

@Test
public void testIsFinerComparator()
{
Assert.assertTrue(Granularity.IS_FINER_THAN.compare(NONE, SECOND) < 0);
Assert.assertTrue(Granularity.IS_FINER_THAN.compare(SECOND, NONE) > 0);
Assert.assertTrue(Granularity.IS_FINER_THAN.compare(NONE, MINUTE) < 0);
Assert.assertTrue(Granularity.IS_FINER_THAN.compare(MINUTE, NONE) > 0);
Assert.assertTrue(Granularity.IS_FINER_THAN.compare(DAY, MONTH) < 0);
Assert.assertTrue(Granularity.IS_FINER_THAN.compare(Granularities.YEAR, ALL) < 0);
Assert.assertTrue(Granularity.IS_FINER_THAN.compare(Granularities.ALL, YEAR) > 0);
// Distinct references are needed to avoid intelli-j complain about compare being called on itself
// thus the variables
Granularity day = DAY;
Granularity none = NONE;
Granularity all = ALL;
Assert.assertTrue(Granularity.IS_FINER_THAN.compare(DAY, day) == 0);
Assert.assertTrue(Granularity.IS_FINER_THAN.compare(NONE, none) == 0);
Assert.assertTrue(Granularity.IS_FINER_THAN.compare(ALL, all) == 0);
}

private static class PathDate
{
public final String path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.curator.shaded.com.google.common.base.Verify;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
Expand Down Expand Up @@ -65,7 +66,6 @@
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.guava.Comparators;
Expand Down Expand Up @@ -642,30 +642,22 @@ private static DataSchema createDataSchema(
Granularity segmentGranularity
)
{
// check index metadata
for (NonnullPair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
final QueryableIndex index = pair.lhs;
if (index.getMetadata() == null) {
throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId());
}
}
// check index metadata &
// Decide which values to propagate (i.e. carry over) for rollup & queryGranularity
final SettableSupplier<Boolean> rollup = new SettableSupplier<>();
final SettableSupplier<Granularity> queryGranularity = new SettableSupplier<>();
decideRollupAndQueryGranularityCarryOver(rollup, queryGranularity, queryableIndexAndSegments);

// find granularity spec
// set rollup only if rollup is set for all segments
final boolean rollup = queryableIndexAndSegments.stream().allMatch(pair -> {
// We have already checked getMetadata() doesn't return null
final Boolean isRollup = pair.lhs.getMetadata().isRollup();
return isRollup != null && isRollup;
});

final Interval totalInterval = JodaUtils.umbrellaInterval(
queryableIndexAndSegments.stream().map(p -> p.rhs.getInterval()).collect(Collectors.toList())
);

final GranularitySpec granularitySpec = new UniformGranularitySpec(
Preconditions.checkNotNull(segmentGranularity),
Granularities.NONE,
rollup,
queryGranularity.get(),
rollup.get(),
Collections.singletonList(totalInterval)
);

Expand All @@ -677,7 +669,8 @@ private static DataSchema createDataSchema(
? createMetricsSpec(queryableIndexAndSegments)
: convertToCombiningFactories(metricsSpec);

return new DataSchema(
return new
DataSchema(
dataSource,
new TimestampSpec(null, null, null),
finalDimensionsSpec,
Expand All @@ -687,6 +680,64 @@ private static DataSchema createDataSchema(
);
}


/**
* Decide which rollup & queryCardinalities to propage for the compacted segment based on
* the data segments given
*
* @param rollup Reference to update with the rollup value
* @param queryGranularity Reference to update with the queryGranularity value
* @param queryableIndexAndSegments The segments to compact
*/
private static void decideRollupAndQueryGranularityCarryOver(
SettableSupplier<Boolean> rollup,
SettableSupplier<Granularity> queryGranularity,
List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
)
{
final SettableSupplier<Boolean> rollupIsValid = new SettableSupplier<>(true);
for (NonnullPair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
final QueryableIndex index = pair.lhs;
if (index.getMetadata() == null) {
throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId());
}
// carry-overs (i.e. query granularity & rollup) are valid iff they are the same in every segment:

// Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
if (rollupIsValid.get()) {
Boolean isRollup = index.getMetadata().isRollup();
if (isRollup == null) {
rollupIsValid.set(false);
rollup.set(false);
} else if (rollup.get() == null) {
rollup.set(isRollup);
} else if (!rollup.get().equals(isRollup.booleanValue())) {
rollupIsValid.set(false);
rollup.set(false);
}
}

// Pick the finer, non-null, of the query granularities of the segments being compacted
Granularity current = index.getMetadata().getQueryGranularity();
queryGranularity.set(compareWithCurrent(queryGranularity.get(), current));
}
}

@VisibleForTesting
static Granularity compareWithCurrent(Granularity queryGranularity, Granularity current)
{
if (queryGranularity == null && current != null) {
queryGranularity = current;
} else if (queryGranularity != null
&& current != null
&& Granularity.IS_FINER_THAN.compare(current, queryGranularity) < 0) {
queryGranularity = current;
}
// we never propagate nulls when there is at least one non-null granularity thus
// do nothing for the case queryGranularity != null && current == null
return queryGranularity;
}

private static AggregatorFactory[] createMetricsSpec(
List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
)
Expand Down Expand Up @@ -881,8 +932,8 @@ static class PartitionConfigurationManager
ParallelIndexTuningConfig computeTuningConfig()
{
ParallelIndexTuningConfig newTuningConfig = tuningConfig == null
? ParallelIndexTuningConfig.defaultConfig()
: tuningConfig;
? ParallelIndexTuningConfig.defaultConfig()
: tuningConfig;
PartitionsSpec partitionsSpec = newTuningConfig.getGivenOrDefaultPartitionsSpec();
if (partitionsSpec instanceof DynamicPartitionsSpec) {
final DynamicPartitionsSpec dynamicPartitionsSpec = (DynamicPartitionsSpec) partitionsSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void testRunParallelWithDynamicPartitioningMatchCompactionState() throws
getObjectMapper().writeValueAsString(
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
Granularities.MINUTE,
true,
ImmutableList.of(segment.getInterval())
)
Expand Down Expand Up @@ -197,7 +197,7 @@ public void testRunParallelWithHashPartitioningMatchCompactionState() throws Exc
getObjectMapper().writeValueAsString(
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
Granularities.MINUTE,
true,
ImmutableList.of(segment.getInterval())
)
Expand Down Expand Up @@ -237,7 +237,7 @@ public void testRunParallelWithRangePartitioning() throws Exception
getObjectMapper().writeValueAsString(
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
Granularities.MINUTE,
true,
ImmutableList.of(segment.getInterval())
)
Expand Down Expand Up @@ -277,7 +277,7 @@ public void testRunParallelWithRangePartitioningWithSingleTask() throws Exceptio
getObjectMapper().writeValueAsString(
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
Granularities.MINUTE,
true,
ImmutableList.of(segment.getInterval())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public void testRunWithDynamicPartitioning() throws Exception
segments.get(i).getInterval()
);
Assert.assertEquals(
getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
segments.get(i).getLastCompactionState()
);
if (lockGranularity == LockGranularity.SEGMENT) {
Expand Down Expand Up @@ -339,7 +339,7 @@ public void testRunWithHashPartitioning() throws Exception
getObjectMapper().writeValueAsString(
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
Granularities.MINUTE,
true,
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))
)
Expand Down Expand Up @@ -385,7 +385,7 @@ public void testRunCompactionTwice() throws Exception
segments.get(i).getInterval()
);
Assert.assertEquals(
getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
segments.get(i).getLastCompactionState()
);
if (lockGranularity == LockGranularity.SEGMENT) {
Expand Down Expand Up @@ -415,7 +415,7 @@ public void testRunCompactionTwice() throws Exception
segments.get(i).getInterval()
);
Assert.assertEquals(
getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
segments.get(i).getLastCompactionState()
);
if (lockGranularity == LockGranularity.SEGMENT) {
Expand Down Expand Up @@ -517,7 +517,7 @@ public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc
segments.get(i).getInterval()
);
Assert.assertEquals(
getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1))),
segments.get(i).getLastCompactionState()
);
if (lockGranularity == LockGranularity.SEGMENT) {
Expand Down Expand Up @@ -559,7 +559,7 @@ public void testWithSegmentGranularity() throws Exception
Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(0).getShardSpec());
Assert.assertEquals(
getDefaultCompactionState(Granularities.DAY, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))),
getDefaultCompactionState(Granularities.DAY, Granularities.MINUTE, ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))),
segments.get(0).getLastCompactionState()
);

Expand All @@ -580,7 +580,7 @@ public void testWithSegmentGranularity() throws Exception
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1), segments.get(i).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 1), segments.get(i).getShardSpec());
Assert.assertEquals(
getDefaultCompactionState(Granularities.HOUR, Granularities.NONE, ImmutableList.of(Intervals.of("2014-01-01/2014-01-02"))),
getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE, ImmutableList.of(Intervals.of("2014-01-01/2014-01-02"))),
segments.get(i).getLastCompactionState()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
Expand Down Expand Up @@ -1116,6 +1117,60 @@ public void testNullSegmentGranularityAnd() throws IOException, SegmentLoadingEx
);
}

@Test
public void testChooseFinestGranularityWithNulls()
{
List<Granularity> input = Arrays.asList(
Granularities.DAY,
Granularities.SECOND,
Granularities.MINUTE,
Granularities.SIX_HOUR,
Granularities.DAY,
null,
Granularities.ALL,
Granularities.MINUTE
);
Assert.assertTrue(Granularities.SECOND.equals(chooseFinestGranularityHelper(input)));
}

@Test
public void testChooseFinestGranularityNone()
{
List<Granularity> input = ImmutableList.of(
Granularities.DAY,
Granularities.SECOND,
Granularities.MINUTE,
Granularities.SIX_HOUR,
Granularities.NONE,
Granularities.DAY,
Granularities.NONE,
Granularities.MINUTE
);
Assert.assertTrue(Granularities.NONE.equals(chooseFinestGranularityHelper(input)));
}

@Test
public void testChooseFinestGranularityAllNulls()
{
List<Granularity> input = Arrays.asList(
null,
null,
null,
null
);
Assert.assertNull(chooseFinestGranularityHelper(input));
}

private Granularity chooseFinestGranularityHelper(List<Granularity> granularities)
{
SettableSupplier<Granularity> queryGranularity = new SettableSupplier<>();
for (Granularity current : granularities) {
queryGranularity.set(CompactionTask.compareWithCurrent(queryGranularity.get(), current));
}
return queryGranularity.get();
}


private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration()
{
return ImmutableList.of(
Expand Down
Loading