diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexIngestionMultithreadedBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexIngestionMultithreadedBenchmark.java new file mode 100644 index 000000000000..c18df4aa1113 --- /dev/null +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexIngestionMultithreadedBenchmark.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.indexing; + +import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator; +import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo; +import org.apache.druid.benchmark.datagen.BenchmarkSchemas; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.hll.HyperLogLogHash; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.serde.ComplexMetrics; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class IndexIngestionMultithreadedBenchmark +{ + @Param({"75000"}) + private int rowsPerSegment; + + @Param({"basic"}) + private String schema; + + @Param({"true", "false"}) + private boolean rollup; + + private static final Logger log = new Logger(IndexIngestionBenchmark.class); + private static final int RNG_SEED = 9999; + private IncrementalIndex incIndex; + private ArrayList rows; + private BenchmarkSchemaInfo schemaInfo; + AtomicInteger threadIdAllocator = new AtomicInteger(0); + + @Setup + public void setup() + { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault())); + + rows = new ArrayList(); + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema); + + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + for (int i = 0; i < rowsPerSegment; i++) { + InputRow row = gen.nextRow(); + if (i % 10000 == 0) { + log.info(i + " rows generated."); + } + + rows.add(row); + } + } + + @State(Scope.Thread) + public static class ThreadState + { + int rowOffset = 0; + + @Setup + public void setup(IndexIngestionMultithreadedBenchmark globalState) + { + rowOffset = globalState.threadIdAllocator.getAndIncrement(); + } + } + + @Setup(Level.Invocation) + public void setup2() + { + incIndex = makeIncIndex(); + } + + private IncrementalIndex makeIncIndex() + { + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMetrics(schemaInfo.getAggsArray()) + .withRollup(rollup) + .build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment * 2) + .buildOnheap(); + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void addRows(Blackhole blackhole, ThreadState threadState) throws Exception + { + int threads = threadIdAllocator.get(); + for (int i = threadState.rowOffset; i < rowsPerSegment; i += threads) { + InputRow row = rows.get(i); + int rv = incIndex.add(row).getRowCount(); + blackhole.consume(rv); + } + } + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(IndexIngestionMultithreadedBenchmark.class.getSimpleName()) + .forks(1) + .threads(4) + .param("schema", "basic") + .param("rollup", "false") + .build(); + + new Runner(opt).run(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index 39df02406ded..5845442e8835 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -75,7 +75,6 @@ import org.joda.time.Interval; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; import java.io.Closeable; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -95,11 +94,12 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; public abstract class IncrementalIndex extends AbstractIndex implements Iterable, Closeable { - private volatile DateTime maxIngestedEventTime; + private AtomicReference maxIngestedEventTime = new AtomicReference<>(null); // Used to discover ValueType based on the class of values in a row // Also used to convert between the duplicate ValueType enums in DimensionSchema (druid-api) and main druid. @@ -235,9 +235,9 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final Map metricDescs; - private final Map dimensionDescs; - private final List dimensionDescsList; - private final Map columnCapabilities; + // Dimension data may be updated by concurrent non-blocking threads. + // Mutating the reference directly wouble be a concurrency bug. + private final AtomicReference dimensions; private final AtomicInteger numEntries = new AtomicInteger(); private final AtomicLong bytesInMemory = new AtomicLong(); @@ -275,7 +275,7 @@ protected IncrementalIndex( this.deserializeComplexMetrics = deserializeComplexMetrics; this.reportParseExceptions = reportParseExceptions; - this.columnCapabilities = new HashMap<>(); + DimensionData dimensionData = new DimensionData(); this.metadata = new Metadata( null, getCombiningAggregators(metrics), @@ -290,13 +290,10 @@ protected IncrementalIndex( for (AggregatorFactory metric : metrics) { MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric); metricDescs.put(metricDesc.getName(), metricDesc); - columnCapabilities.put(metricDesc.getName(), metricDesc.getCapabilities()); + dimensionData.putCapabilities(metricDesc.getName(), metricDesc.getCapabilities()); } DimensionsSpec dimensionsSpec = incrementalIndexSchema.getDimensionsSpec(); - this.dimensionDescs = Maps.newLinkedHashMap(); - - this.dimensionDescsList = new ArrayList<>(); for (DimensionSchema dimSchema : dimensionsSpec.getDimensions()) { ValueType type = TYPE_MAP.get(dimSchema.getValueType()); String dimName = dimSchema.getName(); @@ -311,21 +308,23 @@ protected IncrementalIndex( capabilities, dimSchema.getMultiValueHandling() ); - addNewDimension(dimName, capabilities, handler); + dimensionData.addNewDimension(dimName, capabilities, handler); } - columnCapabilities.put(dimName, capabilities); + dimensionData.putCapabilities(dimName, capabilities); } //__time capabilities ColumnCapabilitiesImpl timeCapabilities = new ColumnCapabilitiesImpl(); timeCapabilities.setType(ValueType.LONG); - columnCapabilities.put(ColumnHolder.TIME_COLUMN_NAME, timeCapabilities); + dimensionData.putCapabilities(ColumnHolder.TIME_COLUMN_NAME, timeCapabilities); // This should really be more generic List spatialDimensions = dimensionsSpec.getSpatialDimensions(); if (!spatialDimensions.isEmpty()) { this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions)); } + + this.dimensions = new AtomicReference<>(dimensionData); } public static class Builder @@ -577,7 +576,7 @@ public InputRow formatRow(InputRow row) public Map getColumnCapabilities() { - return columnCapabilities; + return dimensions.get().columnCapabilities; } /** @@ -627,101 +626,92 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) if (row.getTimestampFromEpoch() < minTimestamp) { throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, DateTimes.utc(minTimestamp)); } - - final List rowDimensions = row.getDimensions(); - - Object[] dims; - List overflow = null; - long dimsKeySize = 0; - List parseExceptionMessages = new ArrayList<>(); - synchronized (dimensionDescs) { - dims = new Object[dimensionDescs.size()]; - for (String dimension : rowDimensions) { - if (Strings.isNullOrEmpty(dimension)) { - continue; - } - boolean wasNewDim = false; - ColumnCapabilitiesImpl capabilities; - DimensionDesc desc = dimensionDescs.get(dimension); - if (desc != null) { - capabilities = desc.getCapabilities(); - } else { - wasNewDim = true; - capabilities = columnCapabilities.get(dimension); - if (capabilities == null) { - capabilities = new ColumnCapabilitiesImpl(); - // For schemaless type discovery, assume everything is a String for now, can change later. - capabilities.setType(ValueType.STRING); - capabilities.setDictionaryEncoded(true); - capabilities.setHasBitmapIndexes(true); - columnCapabilities.put(dimension, capabilities); - } - DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dimension, capabilities, null); - desc = addNewDimension(dimension, capabilities, handler); - } - DimensionHandler handler = desc.getHandler(); - DimensionIndexer indexer = desc.getIndexer(); - Object dimsKey = null; - try { - dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent( - row.getRaw(dimension), - true - ); - } - catch (ParseException pe) { - parseExceptionMessages.add(pe.getMessage()); - } - dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey); - // Set column capabilities as data is coming in - if (!capabilities.hasMultipleValues() && - dimsKey != null && - handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) { - capabilities.setHasMultipleValues(true); - } - - if (wasNewDim) { - if (overflow == null) { - overflow = new ArrayList<>(); - } - overflow.add(dimsKey); - } else if (desc.getIndex() > dims.length || dims[desc.getIndex()] != null) { - /* - * index > dims.length requires that we saw this dimension and added it to the dimensionOrder map, - * otherwise index is null. Since dims is initialized based on the size of dimensionOrder on each call to add, - * it must have been added to dimensionOrder during this InputRow. - * - * if we found an index for this dimension it means we've seen it already. If !(index > dims.length) then - * we saw it on a previous input row (this its safe to index into dims). If we found a value in - * the dims array for this index, it means we have seen this dimension already on this input row. - */ - throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension); - } else { - dims[desc.getIndex()] = dimsKey; - } - } + DimensionData prevDimensionData = this.dimensions.get(); + RowDimsKeyComponents dimsKeyComponents = getRowDimsKeyComponents(row, prevDimensionData); + DimensionData dimensionData = dimsKeyComponents.getUpdatedDimensionData(); + while (dimensionData != null && !dimensions.compareAndSet(prevDimensionData, dimensionData)) { + prevDimensionData = dimensions.get(); + dimsKeyComponents = getRowDimsKeyComponents(row, prevDimensionData); + dimensionData = dimsKeyComponents.getUpdatedDimensionData(); } - if (overflow != null) { - // Merge overflow and non-overflow - Object[] newDims = new Object[dims.length + overflow.size()]; - System.arraycopy(dims, 0, newDims, 0, dims.length); - for (int i = 0; i < overflow.size(); ++i) { - newDims[dims.length + i] = overflow.get(i); - } - dims = newDims; + dimensionData = dimensions.get(); + long dimsKeySize = 0; + Object[] dims = new Object[dimensionData.size()]; + for (Map.Entry dimension : dimsKeyComponents.getRowDimKeys().entrySet()) { + DimensionDesc desc = dimensionData.getDimensionDesc(dimension.getKey()); + DimensionIndexer indexer = desc.getIndexer(); + Object dimsKey = dimension.getValue(); + dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey); + dims[desc.getIndex()] = dimsKey; } long truncated = 0; if (row.getTimestamp() != null) { truncated = gran.bucketStart(row.getTimestamp()).getMillis(); } + IncrementalIndexRow incrementalIndexRow = IncrementalIndexRow.createTimeAndDimswithDimsKeySize( Math.max(truncated, minTimestamp), dims, - dimensionDescsList, + dimensionData.getDimensionDescsList(), dimsKeySize ); - return new IncrementalIndexRowResult(incrementalIndexRow, parseExceptionMessages); + return new IncrementalIndexRowResult(incrementalIndexRow, dimsKeyComponents.getParseExceptionMessages()); + } + + // Note: This method might be called from multiple parallel threads without synchronization. Making thread-unsafe + // calls, or mutating prevDimensionData would be a concurrency bug + private RowDimsKeyComponents getRowDimsKeyComponents(InputRow row, DimensionData prevDimensionData) + { + List parseExceptionMessages = new ArrayList<>(); + Map rowDimKeys = new HashMap<>(); + DimensionData dimensionData = null; + + final List rowDimensions = row.getDimensions(); + for (String dimension : rowDimensions) { + if (Strings.isNullOrEmpty(dimension)) { + parseExceptionMessages.add("InputRow contains null or empty dimension name"); + continue; + } + if (rowDimKeys.containsKey(dimension)) { + // If the dims map already contains a mapping at this index, it means we have seen this dimension already on this input row. + parseExceptionMessages.add(StringUtils.nonStrictFormat("Dimension[%s] occurred more than once in InputRow", dimension)); + continue; + } + DimensionDesc desc = prevDimensionData.getDimensionDesc(dimension); + if (desc == null) { + if (dimensionData == null) { + dimensionData = prevDimensionData.clone(); + } + desc = dimensionData.addNewDimension(dimension); + } + ColumnCapabilitiesImpl capabilities = desc.getCapabilities(); + DimensionHandler handler = desc.getHandler(); + DimensionIndexer indexer = desc.getIndexer(); + Object dimsKey = null; + try { + dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent( + row.getRaw(dimension), + true + ); + } + catch (ParseException pe) { + parseExceptionMessages.add(pe.getMessage()); + } + // Set column capabilities as data is coming in + if (!capabilities.hasMultipleValues() && + dimsKey != null && + handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) { + if (dimensionData == null) { + dimensionData = prevDimensionData.clone(); + } + capabilities = dimensionData.getDimensionCapabilities(dimension); + capabilities.setHasMultipleValues(true); + } + rowDimKeys.put(dimension, dimsKey); + } + return new RowDimsKeyComponents(rowDimKeys, dimensionData, parseExceptionMessages); } public static ParseException getCombinedParseException( @@ -760,10 +750,14 @@ public static ParseException getCombinedParseException( return pe; } - private synchronized void updateMaxIngestedTime(DateTime eventTime) + private void updateMaxIngestedTime(DateTime eventTime) { - if (maxIngestedEventTime == null || maxIngestedEventTime.isBefore(eventTime)) { - maxIngestedEventTime = eventTime; + DateTime maxEventTime = maxIngestedEventTime.get(); + while (maxEventTime == null || maxEventTime.isBefore(eventTime)) { + if (maxIngestedEventTime.compareAndSet(maxEventTime, eventTime)) { + break; + } + maxEventTime = maxIngestedEventTime.get(); } } @@ -824,24 +818,20 @@ public AggregatorFactory[] getMetricAggs() public List getDimensionNames() { - synchronized (dimensionDescs) { - return ImmutableList.copyOf(dimensionDescs.keySet()); - } + Map dimensionDescs = dimensions.get().getDimensionDescs(); + return ImmutableList.copyOf(dimensionDescs.keySet()); } public List getDimensions() { - synchronized (dimensionDescs) { - return ImmutableList.copyOf(dimensionDescs.values()); - } + return dimensions.get().getDimensionDescsList(); } @Nullable public DimensionDesc getDimension(String dimension) { - synchronized (dimensionDescs) { - return dimensionDescs.get(dimension); - } + Map dimensionDescs = dimensions.get().getDimensionDescs(); + return dimensionDescs.get(dimension); } @Nullable @@ -901,9 +891,8 @@ public Integer getDimensionIndex(String dimension) public List getDimensionOrder() { - synchronized (dimensionDescs) { - return ImmutableList.copyOf(dimensionDescs.keySet()); - } + Map dimensionDescs = dimensions.get().getDimensionDescs(); + return ImmutableList.copyOf(dimensionDescs.keySet()); } private ColumnCapabilitiesImpl makeCapabilitiesFromValueType(ValueType type) @@ -925,28 +914,21 @@ public void loadDimensionIterable( Map oldColumnCapabilities ) { - synchronized (dimensionDescs) { - if (!dimensionDescs.isEmpty()) { - throw new ISE("Cannot load dimension order when existing order[%s] is not empty.", dimensionDescs.keySet()); + DimensionData prevDimensionData, dimensionData; + do { + prevDimensionData = dimensions.get(); + dimensionData = prevDimensionData.clone(); + if (dimensionData.size() != 0) { + throw new ISE("Cannot load dimension order when existing order[%s] is not empty.", dimensionData.getDimensionDescs().keySet()); } for (String dim : oldDimensionOrder) { - if (dimensionDescs.get(dim) == null) { + if (dimensionData.getDimensionDesc(dim) == null) { ColumnCapabilitiesImpl capabilities = oldColumnCapabilities.get(dim); - columnCapabilities.put(dim, capabilities); - DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); - addNewDimension(dim, capabilities, handler); + dimensionData.putCapabilities(dim, capabilities); + dimensionData.addNewDimension(dim); } } - } - } - - @GuardedBy("dimensionDescs") - private DimensionDesc addNewDimension(String dim, ColumnCapabilitiesImpl capabilities, DimensionHandler handler) - { - DimensionDesc desc = new DimensionDesc(dimensionDescs.size(), dim, capabilities, handler); - dimensionDescs.put(dim, desc); - dimensionDescsList.add(desc); - return desc; + } while (!dimensions.compareAndSet(prevDimensionData, dimensionData)); } public List getMetricNames() @@ -970,7 +952,7 @@ public StorageAdapter toStorageAdapter() public ColumnCapabilities getCapabilities(String column) { - return columnCapabilities.get(column); + return dimensions.get().getDimensionCapabilities(column); } public Metadata getMetadata() @@ -1047,7 +1029,117 @@ public Iterator iterator() public DateTime getMaxIngestedEventTime() { - return maxIngestedEventTime; + return maxIngestedEventTime.get(); + } + + private static class RowDimsKeyComponents + { + public Map getRowDimKeys() + { + return rowDimKeys; + } + + @Nullable + public DimensionData getUpdatedDimensionData() + { + return updatedDimensionData; + } + + public List getParseExceptionMessages() + { + return parseExceptionMessages; + } + + private final Map rowDimKeys; + private final @Nullable DimensionData updatedDimensionData; + private final List parseExceptionMessages; + + private RowDimsKeyComponents(Map rowDimKeys, @Nullable DimensionData updatedDimensionData, List parseExceptionMessages) + { + this.rowDimKeys = rowDimKeys; + this.updatedDimensionData = updatedDimensionData; + this.parseExceptionMessages = parseExceptionMessages; + } + } + + private static class DimensionData + { + private final Map columnCapabilities; + private final Map dimensionDescs; + private final List dimensionDescsList; + + DimensionData() + { + this.columnCapabilities = new HashMap<>(); + this.dimensionDescs = Maps.newLinkedHashMap(); + this.dimensionDescsList = new ArrayList<>(); + } + + private DimensionData(Map columnCapabilities, Map dimensionDescs, List dimensionDescsList) + { + this.columnCapabilities = new HashMap<>(columnCapabilities); + this.dimensionDescs = Maps.newLinkedHashMap(dimensionDescs); + this.dimensionDescsList = new ArrayList<>(dimensionDescsList); + } + + @Override + public DimensionData clone() + { + return new DimensionData(columnCapabilities, dimensionDescs, dimensionDescsList); + } + + public DimensionDesc addNewDimension(String dim) + { + ColumnCapabilitiesImpl capabilities = getDimensionCapabilities(dim); + if (capabilities == null) { + capabilities = new ColumnCapabilitiesImpl(); + // For schemaless type discovery, assume everything is a String for now, can change later. + capabilities.setType(ValueType.STRING); + capabilities.setDictionaryEncoded(true); + capabilities.setHasBitmapIndexes(true); + putCapabilities(dim, capabilities); + } + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); + return addNewDimension(dim, capabilities, handler); + } + + public DimensionDesc addNewDimension(String dim, ColumnCapabilitiesImpl capabilities, DimensionHandler handler) + { + DimensionDesc desc = new DimensionDesc(dimensionDescsList.size(), dim, capabilities, handler); + dimensionDescs.put(dim, desc); + dimensionDescsList.add(desc); + return desc; + } + + public int size() + { + return dimensionDescsList.size(); + } + + public DimensionDesc getDimensionDesc(String dim) + { + return dimensionDescs.get(dim); + } + + public ColumnCapabilitiesImpl getDimensionCapabilities(String dim) + { + return columnCapabilities.get(dim); + } + + public void putCapabilities(String dim, ColumnCapabilitiesImpl capabilities) + { + columnCapabilities.put(dim, capabilities); + } + + public Map getDimensionDescs() + { + return dimensionDescs; + } + + public List getDimensionDescsList() + { + return dimensionDescsList; + } } public static final class DimensionDesc @@ -1154,22 +1246,23 @@ protected ColumnSelectorFactory makeColumnSelectorFactory( protected final Comparator dimsComparator() { - return new IncrementalIndexRowComparator(dimensionDescsList); + return new IncrementalIndexRowComparator(dimensions); } @VisibleForTesting static final class IncrementalIndexRowComparator implements Comparator { - private List dimensionDescs; + private AtomicReference dimensionData; - public IncrementalIndexRowComparator(List dimDescs) + public IncrementalIndexRowComparator(AtomicReference dimensionData) { - this.dimensionDescs = dimDescs; + this.dimensionData = dimensionData; } @Override public int compare(IncrementalIndexRow lhs, IncrementalIndexRow rhs) { + List dimensionDescs = dimensionData.get().getDimensionDescsList(); int retVal = Longs.compare(lhs.timestamp, rhs.timestamp); int numComparisons = Math.min(lhs.dims.length, rhs.dims.length); diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java index 3dfd3210d408..9ca85e5954a5 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java @@ -29,7 +29,6 @@ import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.parsers.ParseException; @@ -156,36 +155,46 @@ public IncrementalIndex createIndex() return constructors; } - @Test(expected = ISE.class) + @Test public void testDuplicateDimensions() throws IndexSizeExceededException { IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex()); index.add( new MapBasedInputRow( - System.currentTimeMillis() - 1, + 0, Lists.newArrayList("billy", "joe"), ImmutableMap.of("billy", "A", "joe", "B") ) ); - index.add( - new MapBasedInputRow( - System.currentTimeMillis() - 1, - Lists.newArrayList("billy", "joe", "joe"), - ImmutableMap.of("billy", "A", "joe", "B") - ) + IncrementalIndexAddResult result = index.add( + new MapBasedInputRow( + 0, + Lists.newArrayList("billy", "joe", "joe"), + ImmutableMap.of("billy", "A", "joe", "B") + ) + ); + Assert.assertEquals(ParseException.class, result.getParseException().getClass()); + Assert.assertEquals( + "Found unparseable columns in row: [MapBasedInputRow{timestamp=1970-01-01T00:00:00.000Z, event={billy=A, joe=B}, dimensions=[billy, joe, joe]}], exceptions: [Dimension[joe] occurred more than once in InputRow,]", + result.getParseException().getMessage() ); } - @Test(expected = ISE.class) + @Test public void testDuplicateDimensionsFirstOccurrence() throws IndexSizeExceededException { IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex()); - index.add( - new MapBasedInputRow( - System.currentTimeMillis() - 1, - Lists.newArrayList("billy", "joe", "joe"), - ImmutableMap.of("billy", "A", "joe", "B") - ) + IncrementalIndexAddResult result = index.add( + new MapBasedInputRow( + 0, + Lists.newArrayList("billy", "joe", "joe"), + ImmutableMap.of("billy", "A", "joe", "B") + ) + ); + Assert.assertEquals(ParseException.class, result.getParseException().getClass()); + Assert.assertEquals( + "Found unparseable columns in row: [MapBasedInputRow{timestamp=1970-01-01T00:00:00.000Z, event={billy=A, joe=B}, dimensions=[billy, joe, joe]}], exceptions: [Dimension[joe] occurred more than once in InputRow,]", + result.getParseException().getMessage() ); }