From b1419738e38f2956eeffedddb966218683768295 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 9 Apr 2018 14:33:50 -0700 Subject: [PATCH 1/6] Fix NPE in compactionTask --- .../indexing/common/task/CompactionTask.java | 13 +++- .../common/task/CompactionTaskTest.java | 70 ++++++++++++++++--- .../main/java/io/druid/segment/Metadata.java | 3 + .../java/io/druid/segment/QueryableIndex.java | 3 +- .../druid/segment/SimpleQueryableIndex.java | 40 ++++++++++- 5 files changed, 114 insertions(+), 15 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index a751958f200d..ac30e15f73cb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -51,6 +51,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.JodaUtils; import io.druid.java.util.common.Pair; +import io.druid.java.util.common.RE; import io.druid.java.util.common.granularity.NoneGranularity; import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.jackson.JacksonUtils; @@ -292,9 +293,14 @@ private static DataSchema createDataSchema( final List queryableIndices = loadSegments(timelineSegments, segmentFileMap, indexIO); // find merged aggregators + for (QueryableIndex index : queryableIndices) { + if (index.getMetadata() == null) { + throw new RE("Index metadata doesn't exist for interval[%s]", index.getDataInterval()); + } + } final List aggregatorFactories = queryableIndices .stream() - .map(index -> index.getMetadata().getAggregators()) + .map(index -> index.getMetadata().getAggregators()) // We have already done null check .collect(Collectors.toList()); final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories); @@ -304,7 +310,10 @@ private static DataSchema createDataSchema( // find granularity spec // set rollup only if rollup is set for all segments - final boolean rollup = queryableIndices.stream().allMatch(index -> index.getMetadata().isRollup()); + final boolean rollup = queryableIndices.stream().allMatch(index -> { + final Boolean isRollup = index.getMetadata().isRollup(); // We have already done null check + return isRollup != null && isRollup; + }); final GranularitySpec granularitySpec = new ArbitraryGranularitySpec( new NoneGranularity(), rollup, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java index 64f4ddeb520d..0b6a1c911926 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java @@ -26,6 +26,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.inject.Binder; import com.google.inject.Module; @@ -88,6 +89,7 @@ import org.hamcrest.CoreMatchers; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -129,10 +131,12 @@ public class CompactionTaskTest private static Map AGGREGATORS; private static List SEGMENTS; private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); - private static TaskToolbox toolbox; + private static Map segmentMap; + + private TaskToolbox toolbox; @BeforeClass - public static void setup() + public static void setupClass() { DIMENSIONS = new HashMap<>(); AGGREGATORS = new HashMap<>(); @@ -166,7 +170,7 @@ public static void setup() AGGREGATORS.put("agg_3", new FloatFirstAggregatorFactory("agg_3", "float_dim_3")); AGGREGATORS.put("agg_4", new DoubleLastAggregatorFactory("agg_4", "double_dim_4")); - final Map segmentMap = new HashMap<>(5); + segmentMap = new HashMap<>(5); for (int i = 0; i < 5; i++) { final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2))); segmentMap.put( @@ -185,12 +189,6 @@ public static void setup() ); } SEGMENTS = new ArrayList<>(segmentMap.keySet()); - - toolbox = new TestTaskToolbox( - new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())), - new TestIndexIO(objectMapper, segmentMap), - segmentMap - ); } private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) @@ -272,6 +270,16 @@ private static IndexTuningConfig createTuningConfig() @Rule public ExpectedException expectedException = ExpectedException.none(); + @Before + public void setup() + { + toolbox = new TestTaskToolbox( + new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())), + new TestIndexIO(objectMapper, segmentMap), + segmentMap + ); + } + @Test public void testSerdeWithInterval() throws IOException { @@ -415,6 +423,24 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio ); } + @Test + public void testMissingMetadata() throws IOException, SegmentLoadingException + { + expectedException.expect(RuntimeException.class); + expectedException.expectMessage(CoreMatchers.startsWith("Index metadata doesn't exist for interval")); + + final TestIndexIO indexIO = (TestIndexIO) toolbox.getIndexIO(); + indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(), null)); + final List segments = new ArrayList<>(SEGMENTS); + CompactionTask.createIngestionSchema( + toolbox, + new SegmentProvider(segments), + null, + TUNING_CONFIG, + objectMapper + ); + } + private static DimensionsSpec getExpectedDimensionsSpecForAutoGeneration() { return new DimensionsSpec( @@ -601,7 +627,6 @@ private static class TestIndexIO extends IndexIO final Metadata metadata = new Metadata(); metadata.setAggregators(aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()])); - metadata.setRollup(false); queryableIndexMap.put( entry.getValue(), @@ -622,6 +647,31 @@ public QueryableIndex loadIndex(File file) { return queryableIndexMap.get(file); } + + void removeMetadata(File file) + { + final SimpleQueryableIndex index = (SimpleQueryableIndex) queryableIndexMap.get(file); + if (index != null) { + queryableIndexMap.put( + file, + new SimpleQueryableIndex( + index.getDataInterval(), + index.getColumnNames(), + index.getAvailableDimensions(), + index.getBitmapFactoryForDimensions(), + index.getColumns(), + index.getFileMapper(), + null, + index.getDimensionHandlers() + ) + ); + } + } + + Map getQueryableIndexMap() + { + return queryableIndexMap; + } } private static Column createColumn(DimensionSchema dimensionSchema) diff --git a/processing/src/main/java/io/druid/segment/Metadata.java b/processing/src/main/java/io/druid/segment/Metadata.java index e7cf31354f2f..50cc4a4a3813 100644 --- a/processing/src/main/java/io/druid/segment/Metadata.java +++ b/processing/src/main/java/io/druid/segment/Metadata.java @@ -25,6 +25,7 @@ import io.druid.java.util.common.granularity.Granularity; import io.druid.query.aggregation.AggregatorFactory; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -52,6 +53,7 @@ public class Metadata private Granularity queryGranularity; @JsonProperty + @Nullable private Boolean rollup; public Metadata() @@ -92,6 +94,7 @@ public Metadata setQueryGranularity(Granularity queryGranularity) return this; } + @Nullable public Boolean isRollup() { return rollup; diff --git a/processing/src/main/java/io/druid/segment/QueryableIndex.java b/processing/src/main/java/io/druid/segment/QueryableIndex.java index 8452fd0fa985..9a21891b73e1 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndex.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndex.java @@ -23,6 +23,7 @@ import io.druid.segment.data.Indexed; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.Map; @@ -40,7 +41,7 @@ public interface QueryableIndex extends ColumnSelector, Closeable int getNumRows(); Indexed getAvailableDimensions(); BitmapFactory getBitmapFactoryForDimensions(); - Metadata getMetadata(); + @Nullable Metadata getMetadata(); Map getDimensionHandlers(); /** diff --git a/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java b/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java index a5862b3fc311..74ef74e06067 100644 --- a/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java +++ b/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java @@ -19,6 +19,7 @@ package io.druid.segment; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; @@ -43,6 +44,7 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde private final BitmapFactory bitmapFactory; private final Map columns; private final SmooshedFileMapper fileMapper; + @Nullable private final Metadata metadata; private final Map dimensionHandlers; @@ -51,11 +53,11 @@ public SimpleQueryableIndex( BitmapFactory bitmapFactory, Map columns, SmooshedFileMapper fileMapper, - Metadata metadata + @Nullable Metadata metadata ) { Preconditions.checkNotNull(columns.get(Column.TIME_COLUMN_NAME)); - this.dataInterval = dataInterval; + this.dataInterval = Preconditions.checkNotNull(dataInterval, "dataInterval"); ImmutableList.Builder columnNamesBuilder = ImmutableList.builder(); for (String column : columns.keySet()) { if (!Column.TIME_COLUMN_NAME.equals(column)) { @@ -72,6 +74,28 @@ public SimpleQueryableIndex( initDimensionHandlers(); } + @VisibleForTesting + public SimpleQueryableIndex( + Interval interval, + List columnNames, + Indexed availableDimensions, + BitmapFactory bitmapFactory, + Map columns, + SmooshedFileMapper fileMapper, + @Nullable Metadata metadata, + Map dimensionHandlers + ) + { + this.dataInterval = interval; + this.columnNames = columnNames; + this.availableDimensions = availableDimensions; + this.bitmapFactory = bitmapFactory; + this.columns = columns; + this.fileMapper = fileMapper; + this.metadata = metadata; + this.dimensionHandlers = dimensionHandlers; + } + @Override public Interval getDataInterval() { @@ -115,6 +139,18 @@ public Column getColumn(String columnName) return columns.get(columnName); } + @VisibleForTesting + public Map getColumns() + { + return columns; + } + + @VisibleForTesting + public SmooshedFileMapper getFileMapper() + { + return fileMapper; + } + @Override public void close() { From 40a34821f2e48c0286f839ab5a66d75adf2658d5 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 9 Apr 2018 15:23:28 -0700 Subject: [PATCH 2/6] more annotations for metadata --- .../common/task/CompactionTaskTest.java | 9 +- .../main/java/io/druid/segment/Metadata.java | 129 ++++++++---------- .../segment/incremental/IncrementalIndex.java | 12 +- .../io/druid/segment/IndexMergerTestBase.java | 14 +- .../java/io/druid/segment/MetadataTest.java | 93 +++++++------ 5 files changed, 134 insertions(+), 123 deletions(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java index 0b6a1c911926..b1e7e600b20f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java @@ -625,8 +625,13 @@ private static class TestIndexIO extends IndexIO } } - final Metadata metadata = new Metadata(); - metadata.setAggregators(aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()])); + final Metadata metadata = new Metadata( + null, + aggregatorFactories.toArray(new AggregatorFactory[0]), + null, + null, + null + ); queryableIndexMap.put( entry.getValue(), diff --git a/processing/src/main/java/io/druid/segment/Metadata.java b/processing/src/main/java/io/druid/segment/Metadata.java index 50cc4a4a3813..445be70fd52a 100644 --- a/processing/src/main/java/io/druid/segment/Metadata.java +++ b/processing/src/main/java/io/druid/segment/Metadata.java @@ -20,6 +20,7 @@ package io.druid.segment; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import io.druid.data.input.impl.TimestampSpec; import io.druid.guice.annotations.PublicApi; import io.druid.java.util.common.granularity.Granularity; @@ -31,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** @@ -40,89 +42,80 @@ public class Metadata { // container is used for arbitrary key-value pairs in segment metadata e.g. // kafka firehose uses it to store commit offset - @JsonProperty private final Map container; - - @JsonProperty - private AggregatorFactory[] aggregators; - - @JsonProperty - private TimestampSpec timestampSpec; - - @JsonProperty - private Granularity queryGranularity; - - @JsonProperty @Nullable - private Boolean rollup; - - public Metadata() + private final AggregatorFactory[] aggregators; + @Nullable + private final TimestampSpec timestampSpec; + @Nullable + private final Granularity queryGranularity; + @Nullable + private final Boolean rollup; + + public Metadata( + @JsonProperty("container") @Nullable Map container, + @JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators, + @JsonProperty("timestampSpec") @Nullable TimestampSpec timestampSpec, + @JsonProperty("queryGranularity") @Nullable Granularity queryGranularity, + @JsonProperty("rollup") @Nullable Boolean rollup + ) { - container = new ConcurrentHashMap<>(); + this.container = container == null ? new ConcurrentHashMap<>() : container; + this.aggregators = aggregators; + this.timestampSpec = timestampSpec; + this.queryGranularity = queryGranularity; + this.rollup = rollup; } - public AggregatorFactory[] getAggregators() + @JsonProperty + public Map getContainer() { - return aggregators; + return container; } - public Metadata setAggregators(AggregatorFactory[] aggregators) + @JsonProperty + @Nullable + public AggregatorFactory[] getAggregators() { - this.aggregators = aggregators; - return this; + return aggregators; } + @JsonProperty + @Nullable public TimestampSpec getTimestampSpec() { return timestampSpec; } - public Metadata setTimestampSpec(TimestampSpec timestampSpec) - { - this.timestampSpec = timestampSpec; - return this; - } - + @JsonProperty + @Nullable public Granularity getQueryGranularity() { return queryGranularity; } - public Metadata setQueryGranularity(Granularity queryGranularity) - { - this.queryGranularity = queryGranularity; - return this; - } - + @JsonProperty @Nullable public Boolean isRollup() { return rollup; } - public Metadata setRollup(Boolean rollup) - { - this.rollup = rollup; - return this; - } - public Metadata putAll(Map other) { - if (other != null) { - container.putAll(other); - } + container.putAll(Preconditions.checkNotNull(other, "other")); return this; } public Object get(String key) { - return container.get(key); + return container.get(Preconditions.checkNotNull(key, "key")); } - public Metadata put(String key, Object value) + public Metadata put(String key, @Nullable Object value) { if (value != null) { - container.put(key, value); + container.put(Preconditions.checkNotNull(key, "key"), value); } return this; } @@ -130,9 +123,10 @@ public Metadata put(String key, Object value) // arbitrary key-value pairs from the metadata just follow the semantics of last one wins if same // key exists in multiple input Metadata containers // for others e.g. Aggregators, appropriate merging is done + @Nullable public static Metadata merge( - List toBeMerged, - AggregatorFactory[] overrideMergedAggregators + @Nullable List toBeMerged, + @Nullable AggregatorFactory[] overrideMergedAggregators ) { if (toBeMerged == null || toBeMerged.size() == 0) { @@ -142,7 +136,7 @@ public static Metadata merge( boolean foundSomeMetadata = false; Map mergedContainer = new HashMap<>(); List aggregatorsToMerge = overrideMergedAggregators == null - ? new ArrayList() + ? new ArrayList<>() : null; List timestampSpecsToMerge = new ArrayList<>(); @@ -182,20 +176,17 @@ public static Metadata merge( return null; } - Metadata result = new Metadata(); - if (aggregatorsToMerge != null) { - result.setAggregators(AggregatorFactory.mergeAggregators(aggregatorsToMerge)); - } else { - result.setAggregators(overrideMergedAggregators); - } + final AggregatorFactory[] mergedAggregators = aggregatorsToMerge == null ? + overrideMergedAggregators : + AggregatorFactory.mergeAggregators(aggregatorsToMerge); - if (timestampSpecsToMerge != null) { - result.setTimestampSpec(TimestampSpec.mergeTimestampSpec(timestampSpecsToMerge)); - } + final TimestampSpec mergedTimestampSpec = timestampSpecsToMerge == null ? + null : + TimestampSpec.mergeTimestampSpec(timestampSpecsToMerge); - if (gransToMerge != null) { - result.setQueryGranularity(Granularity.mergeGranularities(gransToMerge)); - } + final Granularity mergedGranularity = gransToMerge == null ? + null : + Granularity.mergeGranularities(gransToMerge); Boolean rollup = null; if (rollupToMerge != null && !rollupToMerge.isEmpty()) { @@ -213,10 +204,13 @@ public static Metadata merge( } } - result.setRollup(rollup); - result.container.putAll(mergedContainer); - return result; - + return new Metadata( + mergedContainer, + mergedAggregators, + mergedTimestampSpec, + mergedGranularity, + rollup + ); } @Override @@ -253,12 +247,7 @@ public boolean equals(Object o) @Override public int hashCode() { - int result = container.hashCode(); - result = 31 * result + Arrays.hashCode(aggregators); - result = 31 * result + (timestampSpec != null ? timestampSpec.hashCode() : 0); - result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0); - result = 31 * result + (rollup != null ? rollup.hashCode() : 0); - return result; + return Objects.hash(container, aggregators, timestampSpec, queryGranularity, rollup); } @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index ecfd6b95a7a9..91e8c1dfe058 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -276,11 +276,13 @@ protected IncrementalIndex( this.reportParseExceptions = reportParseExceptions; this.columnCapabilities = Maps.newHashMap(); - this.metadata = new Metadata() - .setAggregators(getCombiningAggregators(metrics)) - .setTimestampSpec(incrementalIndexSchema.getTimestampSpec()) - .setQueryGranularity(this.gran) - .setRollup(this.rollup); + this.metadata = new Metadata( + null, + getCombiningAggregators(metrics), + incrementalIndexSchema.getTimestampSpec(), + this.gran, + this.rollup + ); this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics, concurrentEventAdd); diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTestBase.java b/processing/src/test/java/io/druid/segment/IndexMergerTestBase.java index 6f58c8b1376d..bb7c6246c601 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTestBase.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTestBase.java @@ -274,13 +274,13 @@ public void testPersistWithSegmentMetadata() throws Exception assertDimCompression(index, indexSpec.getDimensionCompression()); Assert.assertEquals( - new Metadata() - .setAggregators( - IncrementalIndexTest.getDefaultCombiningAggregatorFactories() - ) - .setQueryGranularity(Granularities.NONE) - .setRollup(Boolean.TRUE) - .putAll(metadataElems), + new Metadata( + metadataElems, + IncrementalIndexTest.getDefaultCombiningAggregatorFactories(), + null, + Granularities.NONE, + Boolean.TRUE + ), index.getMetadata() ); } diff --git a/processing/src/test/java/io/druid/segment/MetadataTest.java b/processing/src/test/java/io/druid/segment/MetadataTest.java index 3aba1dbc02d2..b1ca2f288be0 100644 --- a/processing/src/test/java/io/druid/segment/MetadataTest.java +++ b/processing/src/test/java/io/druid/segment/MetadataTest.java @@ -31,6 +31,7 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -42,15 +43,17 @@ public void testSerde() throws Exception { ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - Metadata metadata = new Metadata(); - metadata.put("k", "v"); - AggregatorFactory[] aggregators = new AggregatorFactory[] { new LongSumAggregatorFactory("out", "in") }; - metadata.setAggregators(aggregators); - metadata.setQueryGranularity(Granularities.ALL); - metadata.setRollup(Boolean.FALSE); + + Metadata metadata = new Metadata( + Collections.singletonMap("k", "v"), + aggregators, + null, + Granularities.ALL, + Boolean.FALSE + ); Metadata other = jsonMapper.readValue( jsonMapper.writeValueAsString(metadata), @@ -75,30 +78,39 @@ public void testMerge() AggregatorFactory[] aggs = new AggregatorFactory[] { new LongMaxAggregatorFactory("n", "f") }; - Metadata m1 = new Metadata(); - m1.put("k", "v"); - m1.setAggregators(aggs); - m1.setTimestampSpec(new TimestampSpec("ds", "auto", null)); - m1.setQueryGranularity(Granularities.ALL); - m1.setRollup(Boolean.FALSE); - - Metadata m2 = new Metadata(); - m2.put("k", "v"); - m2.setAggregators(aggs); - m2.setTimestampSpec(new TimestampSpec("ds", "auto", null)); - m2.setQueryGranularity(Granularities.ALL); - m2.setRollup(Boolean.FALSE); - - Metadata merged = new Metadata(); - merged.put("k", "v"); - merged.setAggregators( + final Metadata m1 = new Metadata( + Collections.singletonMap("k", "v"), + aggs, + new TimestampSpec("ds", "auto", null), + Granularities.ALL, + Boolean.FALSE + ); + + final Metadata m2 = new Metadata( + Collections.singletonMap("k", "v"), + aggs, + new TimestampSpec("ds", "auto", null), + Granularities.ALL, + Boolean.FALSE + ); + + final Metadata m3 = new Metadata( + Collections.singletonMap("k", "v"), + aggs, + new TimestampSpec("ds", "auto", null), + Granularities.ALL, + Boolean.TRUE + ); + + final Metadata merged = new Metadata( + Collections.singletonMap("k", "v"), new AggregatorFactory[]{ new LongMaxAggregatorFactory("n", "n") - } + }, + new TimestampSpec("ds", "auto", null), + Granularities.ALL, + Boolean.FALSE ); - merged.setTimestampSpec(new TimestampSpec("ds", "auto", null)); - merged.setRollup(Boolean.FALSE); - merged.setQueryGranularity(Granularities.ALL); Assert.assertEquals(merged, Metadata.merge(ImmutableList.of(m1, m2), null)); //merge check with one metadata being null @@ -107,29 +119,32 @@ public void testMerge() metadataToBeMerged.add(m2); metadataToBeMerged.add(null); - merged.setAggregators(null); - merged.setTimestampSpec(null); - merged.setQueryGranularity(null); - merged.setRollup(null); - Assert.assertEquals(merged, Metadata.merge(metadataToBeMerged, null)); + final Metadata merged2 = new Metadata(Collections.singletonMap("k", "v"), null, null, null, null); + + Assert.assertEquals(merged2, Metadata.merge(metadataToBeMerged, null)); //merge check with client explicitly providing merged aggregators AggregatorFactory[] explicitAggs = new AggregatorFactory[] { new DoubleMaxAggregatorFactory("x", "y") }; - merged.setAggregators(explicitAggs); + + final Metadata merged3 = new Metadata(Collections.singletonMap("k", "v"), explicitAggs, null, null, null); Assert.assertEquals( - merged, + merged3, Metadata.merge(metadataToBeMerged, explicitAggs) ); - merged.setTimestampSpec(new TimestampSpec("ds", "auto", null)); - merged.setQueryGranularity(Granularities.ALL); - m1.setRollup(Boolean.TRUE); + final Metadata merged4 = new Metadata( + Collections.singletonMap("k", "v"), + explicitAggs, + new TimestampSpec("ds", "auto", null), + Granularities.ALL, + null + ); Assert.assertEquals( - merged, - Metadata.merge(ImmutableList.of(m1, m2), explicitAggs) + merged4, + Metadata.merge(ImmutableList.of(m3, m2), explicitAggs) ); } } From f0ce1d2abd2fa87c54f88e581abeeef50f608627 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 9 Apr 2018 16:32:36 -0700 Subject: [PATCH 3/6] better error message for empty input --- .../indexing/common/task/CompactionTask.java | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index ac30e15f73cb..952106396fec 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -196,28 +196,37 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception jsonMapper ); - indexTaskSpec = new IndexTask( - getId(), - getGroupId(), - getTaskResource(), - getDataSource(), - ingestionSpec, - getContext(), - authorizerMapper, - null - ); - } - - if (indexTaskSpec.getIngestionSchema() == null) { - log.info("Cannot find segments for interval"); + if (ingestionSpec != null) { + indexTaskSpec = new IndexTask( + getId(), + getGroupId(), + getTaskResource(), + getDataSource(), + ingestionSpec, + getContext(), + authorizerMapper, + null + ); + } } - final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec); - log.info("Generated compaction task details: " + json); + if (indexTaskSpec == null) { + log.warn("Failed to generate compaction spec"); + return TaskStatus.failure(getId()); + } else { + final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec); + log.info("Generated compaction task details: " + json); - return indexTaskSpec.run(toolbox); + return indexTaskSpec.run(toolbox); + } } + /** + * Generate {@link IndexIngestionSpec} from input segments. + + * @return null if input segments don't exist. Otherwise, a generated ingestionSpec. + */ + @Nullable @VisibleForTesting static IndexIngestionSpec createIngestionSchema( TaskToolbox toolbox, From e0d6005ff64960511955857b451f48986da15f0b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 9 Apr 2018 21:07:04 -0700 Subject: [PATCH 4/6] fix build --- processing/src/main/java/io/druid/segment/Metadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/segment/Metadata.java b/processing/src/main/java/io/druid/segment/Metadata.java index 445be70fd52a..ef3844cd27b0 100644 --- a/processing/src/main/java/io/druid/segment/Metadata.java +++ b/processing/src/main/java/io/druid/segment/Metadata.java @@ -247,7 +247,7 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(container, aggregators, timestampSpec, queryGranularity, rollup); + return Objects.hash(container, Arrays.hashCode(aggregators), timestampSpec, queryGranularity, rollup); } @Override From 9b35f92a53ee7030f8802622bb0e9821e04fa03f Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 11 Apr 2018 11:53:29 -0700 Subject: [PATCH 5/6] revert some null checks --- .../src/main/java/io/druid/segment/Metadata.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/Metadata.java b/processing/src/main/java/io/druid/segment/Metadata.java index ef3844cd27b0..33fe07c2f67d 100644 --- a/processing/src/main/java/io/druid/segment/Metadata.java +++ b/processing/src/main/java/io/druid/segment/Metadata.java @@ -20,7 +20,6 @@ package io.druid.segment; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; import io.druid.data.input.impl.TimestampSpec; import io.druid.guice.annotations.PublicApi; import io.druid.java.util.common.granularity.Granularity; @@ -101,21 +100,23 @@ public Boolean isRollup() return rollup; } - public Metadata putAll(Map other) + public Metadata putAll(@Nullable Map other) { - container.putAll(Preconditions.checkNotNull(other, "other")); + if (other != null) { + container.putAll(other); + } return this; } public Object get(String key) { - return container.get(Preconditions.checkNotNull(key, "key")); + return container.get(key); } public Metadata put(String key, @Nullable Object value) { if (value != null) { - container.put(Preconditions.checkNotNull(key, "key"), value); + container.put(key, value); } return this; } From 21fbc39bf8371ea588431f60f855c22329a94bf8 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 11 Apr 2018 22:08:42 -0700 Subject: [PATCH 6/6] address comments --- .../indexing/common/task/CompactionTask.java | 55 +++++++++++++------ .../common/task/CompactionTaskTest.java | 2 +- .../main/java/io/druid/segment/Metadata.java | 26 ++------- 3 files changed, 45 insertions(+), 38 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index 952106396fec..85d2e333cf32 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -80,6 +80,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -299,17 +300,22 @@ private static DataSchema createDataSchema( throws IOException { // find metadata for interval - final List queryableIndices = loadSegments(timelineSegments, segmentFileMap, indexIO); + final List> queryableIndexAndSegments = loadSegments( + timelineSegments, + segmentFileMap, + indexIO + ); // find merged aggregators - for (QueryableIndex index : queryableIndices) { + for (Pair pair : queryableIndexAndSegments) { + final QueryableIndex index = pair.lhs; if (index.getMetadata() == null) { - throw new RE("Index metadata doesn't exist for interval[%s]", index.getDataInterval()); + throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getIdentifier()); } } - final List aggregatorFactories = queryableIndices + final List aggregatorFactories = queryableIndexAndSegments .stream() - .map(index -> index.getMetadata().getAggregators()) // We have already done null check + .map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata() .collect(Collectors.toList()); final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories); @@ -319,8 +325,9 @@ private static DataSchema createDataSchema( // find granularity spec // set rollup only if rollup is set for all segments - final boolean rollup = queryableIndices.stream().allMatch(index -> { - final Boolean isRollup = index.getMetadata().isRollup(); // We have already done null check + final boolean rollup = queryableIndexAndSegments.stream().allMatch(pair -> { + // We have already checked getMetadata() doesn't return null + final Boolean isRollup = pair.lhs.getMetadata().isRollup(); return isRollup != null && isRollup; }); final GranularitySpec granularitySpec = new ArbitraryGranularitySpec( @@ -331,7 +338,7 @@ private static DataSchema createDataSchema( // find unique dimensions final DimensionsSpec finalDimensionsSpec = dimensionsSpec == null ? - createDimensionsSpec(queryableIndices) : + createDimensionsSpec(queryableIndexAndSegments) : dimensionsSpec; final InputRowParser parser = new NoopInputRowParser(new TimeAndDimsParseSpec(null, finalDimensionsSpec)); @@ -345,7 +352,7 @@ private static DataSchema createDataSchema( ); } - private static DimensionsSpec createDimensionsSpec(List queryableIndices) + private static DimensionsSpec createDimensionsSpec(List> queryableIndices) { final BiMap uniqueDims = HashBiMap.create(); final Map dimensionSchemaMap = new HashMap<>(); @@ -355,9 +362,24 @@ private static DimensionsSpec createDimensionsSpec(List queryabl // Dimensions are extracted from the recent segments to olders because recent segments are likely to be queried more // frequently, and thus the performance should be optimized for recent ones rather than old ones. - // timelineSegments are sorted in order of interval + // timelineSegments are sorted in order of interval, but we do a sanity check here. + final Comparator intervalComparator = Comparators.intervalsByStartThenEnd(); + for (int i = 0; i < queryableIndices.size() - 1; i++) { + final Interval shouldBeSmaller = queryableIndices.get(i).lhs.getDataInterval(); + final Interval shouldBeLarger = queryableIndices.get(i + 1).lhs.getDataInterval(); + Preconditions.checkState( + intervalComparator.compare(shouldBeSmaller, shouldBeLarger) <= 0, + "QueryableIndexes are not sorted! Interval[%s] of segment[%s] is laster than interval[%s] of segment[%s]", + shouldBeSmaller, + queryableIndices.get(i).rhs.getIdentifier(), + shouldBeLarger, + queryableIndices.get(i + 1).rhs.getIdentifier() + ); + } + int index = 0; - for (QueryableIndex queryableIndex : Lists.reverse(queryableIndices)) { + for (Pair pair : Lists.reverse(queryableIndices)) { + final QueryableIndex queryableIndex = pair.lhs; final Map dimensionHandlerMap = queryableIndex.getDimensionHandlers(); for (String dimension : queryableIndex.getAvailableDimensions()) { @@ -403,23 +425,22 @@ private static DimensionsSpec createDimensionsSpec(List queryabl return new DimensionsSpec(dimensionSchemas, null, null); } - private static List loadSegments( + private static List> loadSegments( List> timelineSegments, Map segmentFileMap, IndexIO indexIO ) throws IOException { - final List segments = new ArrayList<>(); + final List> segments = new ArrayList<>(); for (TimelineObjectHolder timelineSegment : timelineSegments) { final PartitionHolder partitionHolder = timelineSegment.getObject(); for (PartitionChunk chunk : partitionHolder) { final DataSegment segment = chunk.getObject(); - segments.add( - indexIO.loadIndex( - Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getIdentifier()) - ) + final QueryableIndex queryableIndex = indexIO.loadIndex( + Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getIdentifier()) ); + segments.add(Pair.of(queryableIndex, segment)); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java index b1e7e600b20f..cefe50155295 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java @@ -427,7 +427,7 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio public void testMissingMetadata() throws IOException, SegmentLoadingException { expectedException.expect(RuntimeException.class); - expectedException.expectMessage(CoreMatchers.startsWith("Index metadata doesn't exist for interval")); + expectedException.expectMessage(CoreMatchers.startsWith("Index metadata doesn't exist for segment")); final TestIndexIO indexIO = (TestIndexIO) toolbox.getIndexIO(); indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(), null)); diff --git a/processing/src/main/java/io/druid/segment/Metadata.java b/processing/src/main/java/io/druid/segment/Metadata.java index 33fe07c2f67d..3a7c94d294cd 100644 --- a/processing/src/main/java/io/druid/segment/Metadata.java +++ b/processing/src/main/java/io/druid/segment/Metadata.java @@ -223,26 +223,12 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - - Metadata metadata = (Metadata) o; - - if (!container.equals(metadata.container)) { - return false; - } - // Probably incorrect - comparing Object[] arrays with Arrays.equals - if (!Arrays.equals(aggregators, metadata.aggregators)) { - return false; - } - if (timestampSpec != null ? !timestampSpec.equals(metadata.timestampSpec) : metadata.timestampSpec != null) { - return false; - } - if (rollup != null ? !rollup.equals(metadata.rollup) : metadata.rollup != null) { - return false; - } - return queryGranularity != null - ? queryGranularity.equals(metadata.queryGranularity) - : metadata.queryGranularity == null; - + final Metadata metadata = (Metadata) o; + return Objects.equals(container, metadata.container) && + Arrays.equals(aggregators, metadata.aggregators) && + Objects.equals(timestampSpec, metadata.timestampSpec) && + Objects.equals(queryGranularity, metadata.queryGranularity) && + Objects.equals(rollup, metadata.rollup); } @Override