Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.granularity.NoneGranularity;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.jackson.JacksonUtils;
Expand Down Expand Up @@ -79,6 +80,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -195,28 +197,37 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
jsonMapper
);

indexTaskSpec = new IndexTask(
getId(),
getGroupId(),
getTaskResource(),
getDataSource(),
ingestionSpec,
getContext(),
authorizerMapper,
null
);
}

if (indexTaskSpec.getIngestionSchema() == null) {
log.info("Cannot find segments for interval");
if (ingestionSpec != null) {
indexTaskSpec = new IndexTask(
getId(),
getGroupId(),
getTaskResource(),
getDataSource(),
ingestionSpec,
getContext(),
authorizerMapper,
null
);
}
}

final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec);
log.info("Generated compaction task details: " + json);
if (indexTaskSpec == null) {
log.warn("Failed to generate compaction spec");
return TaskStatus.failure(getId());
} else {
final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec);
log.info("Generated compaction task details: " + json);

return indexTaskSpec.run(toolbox);
return indexTaskSpec.run(toolbox);
}
}

/**
* Generate {@link IndexIngestionSpec} from input segments.

* @return null if input segments don't exist. Otherwise, a generated ingestionSpec.
*/
@Nullable
@VisibleForTesting
static IndexIngestionSpec createIngestionSchema(
TaskToolbox toolbox,
Expand Down Expand Up @@ -289,12 +300,22 @@ private static DataSchema createDataSchema(
throws IOException
{
// find metadata for interval
final List<QueryableIndex> queryableIndices = loadSegments(timelineSegments, segmentFileMap, indexIO);
final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
timelineSegments,
segmentFileMap,
indexIO
);

// find merged aggregators
final List<AggregatorFactory[]> aggregatorFactories = queryableIndices
for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
final QueryableIndex index = pair.lhs;
if (index.getMetadata() == null) {
throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getIdentifier());
}
}
final List<AggregatorFactory[]> aggregatorFactories = queryableIndexAndSegments
.stream()
.map(index -> index.getMetadata().getAggregators())
.map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata()
.collect(Collectors.toList());
final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories);

Expand All @@ -304,7 +325,11 @@ private static DataSchema createDataSchema(

// find granularity spec
// set rollup only if rollup is set for all segments
final boolean rollup = queryableIndices.stream().allMatch(index -> index.getMetadata().isRollup());
final boolean rollup = queryableIndexAndSegments.stream().allMatch(pair -> {
// We have already checked getMetadata() doesn't return null
final Boolean isRollup = pair.lhs.getMetadata().isRollup();
return isRollup != null && isRollup;
});
final GranularitySpec granularitySpec = new ArbitraryGranularitySpec(
new NoneGranularity(),
rollup,
Expand All @@ -313,7 +338,7 @@ private static DataSchema createDataSchema(

// find unique dimensions
final DimensionsSpec finalDimensionsSpec = dimensionsSpec == null ?
createDimensionsSpec(queryableIndices) :
createDimensionsSpec(queryableIndexAndSegments) :
dimensionsSpec;
final InputRowParser parser = new NoopInputRowParser(new TimeAndDimsParseSpec(null, finalDimensionsSpec));

Expand All @@ -327,7 +352,7 @@ private static DataSchema createDataSchema(
);
}

private static DimensionsSpec createDimensionsSpec(List<QueryableIndex> queryableIndices)
private static DimensionsSpec createDimensionsSpec(List<Pair<QueryableIndex, DataSegment>> queryableIndices)
{
final BiMap<String, Integer> uniqueDims = HashBiMap.create();
final Map<String, DimensionSchema> dimensionSchemaMap = new HashMap<>();
Expand All @@ -337,9 +362,24 @@ private static DimensionsSpec createDimensionsSpec(List<QueryableIndex> queryabl
// Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more
// frequently, and thus the performance should be optimized for recent ones rather than old ones.

// timelineSegments are sorted in order of interval
// timelineSegments are sorted in order of interval, but we do a sanity check here.
final Comparator<Interval> intervalComparator = Comparators.intervalsByStartThenEnd();
for (int i = 0; i < queryableIndices.size() - 1; i++) {
final Interval shouldBeSmaller = queryableIndices.get(i).lhs.getDataInterval();
final Interval shouldBeLarger = queryableIndices.get(i + 1).lhs.getDataInterval();
Preconditions.checkState(
intervalComparator.compare(shouldBeSmaller, shouldBeLarger) <= 0,
"QueryableIndexes are not sorted! Interval[%s] of segment[%s] is laster than interval[%s] of segment[%s]",
shouldBeSmaller,
queryableIndices.get(i).rhs.getIdentifier(),
shouldBeLarger,
queryableIndices.get(i + 1).rhs.getIdentifier()
);
}

int index = 0;
for (QueryableIndex queryableIndex : Lists.reverse(queryableIndices)) {
for (Pair<QueryableIndex, DataSegment> pair : Lists.reverse(queryableIndices)) {
final QueryableIndex queryableIndex = pair.lhs;
final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers();

for (String dimension : queryableIndex.getAvailableDimensions()) {
Expand Down Expand Up @@ -385,23 +425,22 @@ private static DimensionsSpec createDimensionsSpec(List<QueryableIndex> queryabl
return new DimensionsSpec(dimensionSchemas, null, null);
}

private static List<QueryableIndex> loadSegments(
private static List<Pair<QueryableIndex, DataSegment>> loadSegments(
List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
Map<DataSegment, File> segmentFileMap,
IndexIO indexIO
) throws IOException
{
final List<QueryableIndex> segments = new ArrayList<>();
final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();

for (TimelineObjectHolder<String, DataSegment> timelineSegment : timelineSegments) {
final PartitionHolder<DataSegment> partitionHolder = timelineSegment.getObject();
for (PartitionChunk<DataSegment> chunk : partitionHolder) {
final DataSegment segment = chunk.getObject();
segments.add(
indexIO.loadIndex(
Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getIdentifier())
)
final QueryableIndex queryableIndex = indexIO.loadIndex(
Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getIdentifier())
);
segments.add(Pair.of(queryableIndex, segment));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Module;
Expand Down Expand Up @@ -88,6 +89,7 @@
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -129,10 +131,12 @@ public class CompactionTaskTest
private static Map<String, AggregatorFactory> AGGREGATORS;
private static List<DataSegment> SEGMENTS;
private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
private static TaskToolbox toolbox;
private static Map<DataSegment, File> segmentMap;

private TaskToolbox toolbox;

@BeforeClass
public static void setup()
public static void setupClass()
{
DIMENSIONS = new HashMap<>();
AGGREGATORS = new HashMap<>();
Expand Down Expand Up @@ -166,7 +170,7 @@ public static void setup()
AGGREGATORS.put("agg_3", new FloatFirstAggregatorFactory("agg_3", "float_dim_3"));
AGGREGATORS.put("agg_4", new DoubleLastAggregatorFactory("agg_4", "double_dim_4"));

final Map<DataSegment, File> segmentMap = new HashMap<>(5);
segmentMap = new HashMap<>(5);
for (int i = 0; i < 5; i++) {
final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2)));
segmentMap.put(
Expand All @@ -185,12 +189,6 @@ public static void setup()
);
}
SEGMENTS = new ArrayList<>(segmentMap.keySet());

toolbox = new TestTaskToolbox(
new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())),
new TestIndexIO(objectMapper, segmentMap),
segmentMap
);
}

private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper)
Expand Down Expand Up @@ -272,6 +270,16 @@ private static IndexTuningConfig createTuningConfig()
@Rule
public ExpectedException expectedException = ExpectedException.none();

@Before
public void setup()
{
toolbox = new TestTaskToolbox(
new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())),
new TestIndexIO(objectMapper, segmentMap),
segmentMap
);
}

@Test
public void testSerdeWithInterval() throws IOException
{
Expand Down Expand Up @@ -415,6 +423,24 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio
);
}

@Test
public void testMissingMetadata() throws IOException, SegmentLoadingException
{
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(CoreMatchers.startsWith("Index metadata doesn't exist for segment"));

final TestIndexIO indexIO = (TestIndexIO) toolbox.getIndexIO();
indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(), null));
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(segments),
null,
TUNING_CONFIG,
objectMapper
);
}

private static DimensionsSpec getExpectedDimensionsSpecForAutoGeneration()
{
return new DimensionsSpec(
Expand Down Expand Up @@ -599,9 +625,13 @@ private static class TestIndexIO extends IndexIO
}
}

final Metadata metadata = new Metadata();
metadata.setAggregators(aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()]));
metadata.setRollup(false);
final Metadata metadata = new Metadata(
null,
aggregatorFactories.toArray(new AggregatorFactory[0]),
null,
null,
null
);

queryableIndexMap.put(
entry.getValue(),
Expand All @@ -622,6 +652,31 @@ public QueryableIndex loadIndex(File file)
{
return queryableIndexMap.get(file);
}

void removeMetadata(File file)
{
final SimpleQueryableIndex index = (SimpleQueryableIndex) queryableIndexMap.get(file);
if (index != null) {
queryableIndexMap.put(
file,
new SimpleQueryableIndex(
index.getDataInterval(),
index.getColumnNames(),
index.getAvailableDimensions(),
index.getBitmapFactoryForDimensions(),
index.getColumns(),
index.getFileMapper(),
null,
index.getDimensionHandlers()
)
);
}
}

Map<File, QueryableIndex> getQueryableIndexMap()
{
return queryableIndexMap;
}
}

private static Column createColumn(DimensionSchema dimensionSchema)
Expand Down
Loading