From 48fdcb551c09309993a767bf93681d008aa060eb Mon Sep 17 00:00:00 2001 From: Eran Meir Date: Sun, 26 May 2019 13:58:07 +0300 Subject: [PATCH 1/4] Improve IncrementalIndex concurrency scalability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Shared state is encapsulated in a new class - DimensionData. This includes dimensionDescs, dimensionDescsList and columnCapabilities - Concurrent threads share an atomic reference to an instance of DimensionData - CoW: Only when a thread needs to update the shared state, it will copy the instance, update the copy, and eventually swap the reference atomically. - Consistency is maintained when the reference is updated. This simplifies row processing, removes the need for keeping an “overflow” array, and allows fast failure when a row contains duplicate dimensions. - New multi-threaded ingestion benchmark: IndexIngestionMultithreadedBenchmark --- .../IndexIngestionMultithreadedBenchmark.java | 158 +++++++++ .../segment/incremental/IncrementalIndex.java | 313 +++++++++++------- 2 files changed, 345 insertions(+), 126 deletions(-) create mode 100644 benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexIngestionMultithreadedBenchmark.java diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexIngestionMultithreadedBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexIngestionMultithreadedBenchmark.java new file mode 100644 index 000000000000..feb00981b871 --- /dev/null +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexIngestionMultithreadedBenchmark.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.indexing; + +import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator; +import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo; +import org.apache.druid.benchmark.datagen.BenchmarkSchemas; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.hll.HyperLogLogHash; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.serde.ComplexMetrics; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class IndexIngestionMultithreadedBenchmark +{ + @Param({"75000"}) + private int rowsPerSegment; + + @Param({"basic"}) + private String schema; + + @Param({"true", "false"}) + private boolean rollup; + + private static final Logger log = new Logger(IndexIngestionBenchmark.class); + private static final int RNG_SEED = 9999; + private IncrementalIndex incIndex; + private ArrayList rows; + private BenchmarkSchemaInfo schemaInfo; + AtomicInteger threadIdAllocator = new AtomicInteger(0); + + @Setup + public void setup() + { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault())); + + rows = new ArrayList(); + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema); + + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + for (int i = 0; i < rowsPerSegment; i++) { + InputRow row = gen.nextRow(); + if (i % 10000 == 0) { + log.info(i + " rows generated."); + } + + rows.add(row); + } + } + + @State(Scope.Thread) + public static class ThreadState + { + int id = 0; + + @Setup + public void setup(IndexIngestionMultithreadedBenchmark globalState) + { + id = globalState.threadIdAllocator.getAndIncrement(); + } + } + + @Setup(Level.Invocation) + public void setup2() + { + incIndex = makeIncIndex(); + } + + private IncrementalIndex makeIncIndex() + { + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMetrics(schemaInfo.getAggsArray()) + .withRollup(rollup) + .build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment * 2) + .buildOnheap(); + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void addRows(Blackhole blackhole, ThreadState threadState) throws Exception + { + int threads = threadIdAllocator.get(); + for (int i = threadState.id; i < rowsPerSegment; i += threads) { + InputRow row = rows.get(i); + int rv = incIndex.add(row).getRowCount(); + blackhole.consume(rv); + } + } + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(IndexIngestionMultithreadedBenchmark.class.getSimpleName()) + .forks(1) + .threads(4) + .param("schema", "basic") + .param("rollup", "false") + .build(); + + new Runner(opt).run(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index 39df02406ded..c75a468305c4 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -75,7 +75,6 @@ import org.joda.time.Interval; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; import java.io.Closeable; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -95,11 +94,12 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; public abstract class IncrementalIndex extends AbstractIndex implements Iterable, Closeable { - private volatile DateTime maxIngestedEventTime; + private AtomicReference maxIngestedEventTime = new AtomicReference<>(null); // Used to discover ValueType based on the class of values in a row // Also used to convert between the duplicate ValueType enums in DimensionSchema (druid-api) and main druid. @@ -235,9 +235,7 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final Map metricDescs; - private final Map dimensionDescs; - private final List dimensionDescsList; - private final Map columnCapabilities; + private final AtomicReference dimensions; private final AtomicInteger numEntries = new AtomicInteger(); private final AtomicLong bytesInMemory = new AtomicLong(); @@ -275,7 +273,7 @@ protected IncrementalIndex( this.deserializeComplexMetrics = deserializeComplexMetrics; this.reportParseExceptions = reportParseExceptions; - this.columnCapabilities = new HashMap<>(); + DimensionData dimensionData = new DimensionData(); this.metadata = new Metadata( null, getCombiningAggregators(metrics), @@ -290,13 +288,10 @@ protected IncrementalIndex( for (AggregatorFactory metric : metrics) { MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric); metricDescs.put(metricDesc.getName(), metricDesc); - columnCapabilities.put(metricDesc.getName(), metricDesc.getCapabilities()); + dimensionData.putCapabilities(metricDesc.getName(), metricDesc.getCapabilities()); } DimensionsSpec dimensionsSpec = incrementalIndexSchema.getDimensionsSpec(); - this.dimensionDescs = Maps.newLinkedHashMap(); - - this.dimensionDescsList = new ArrayList<>(); for (DimensionSchema dimSchema : dimensionsSpec.getDimensions()) { ValueType type = TYPE_MAP.get(dimSchema.getValueType()); String dimName = dimSchema.getName(); @@ -311,21 +306,23 @@ protected IncrementalIndex( capabilities, dimSchema.getMultiValueHandling() ); - addNewDimension(dimName, capabilities, handler); + dimensionData.addNewDimension(dimName, capabilities, handler); } - columnCapabilities.put(dimName, capabilities); + dimensionData.putCapabilities(dimName, capabilities); } //__time capabilities ColumnCapabilitiesImpl timeCapabilities = new ColumnCapabilitiesImpl(); timeCapabilities.setType(ValueType.LONG); - columnCapabilities.put(ColumnHolder.TIME_COLUMN_NAME, timeCapabilities); + dimensionData.putCapabilities(ColumnHolder.TIME_COLUMN_NAME, timeCapabilities); // This should really be more generic List spatialDimensions = dimensionsSpec.getSpatialDimensions(); if (!spatialDimensions.isEmpty()) { this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions)); } + + this.dimensions = new AtomicReference<>(dimensionData); } public static class Builder @@ -577,7 +574,7 @@ public InputRow formatRow(InputRow row) public Map getColumnCapabilities() { - return columnCapabilities; + return dimensions.get().columnCapabilities; } /** @@ -627,98 +624,96 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) if (row.getTimestampFromEpoch() < minTimestamp) { throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, DateTimes.utc(minTimestamp)); } - final List rowDimensions = row.getDimensions(); - Object[] dims; - List overflow = null; + Map rowDimKeys = new HashMap<>(); long dimsKeySize = 0; List parseExceptionMessages = new ArrayList<>(); - synchronized (dimensionDescs) { - dims = new Object[dimensionDescs.size()]; - for (String dimension : rowDimensions) { - if (Strings.isNullOrEmpty(dimension)) { - continue; - } - boolean wasNewDim = false; - ColumnCapabilitiesImpl capabilities; - DimensionDesc desc = dimensionDescs.get(dimension); - if (desc != null) { - capabilities = desc.getCapabilities(); - } else { - wasNewDim = true; - capabilities = columnCapabilities.get(dimension); - if (capabilities == null) { - capabilities = new ColumnCapabilitiesImpl(); - // For schemaless type discovery, assume everything is a String for now, can change later. - capabilities.setType(ValueType.STRING); - capabilities.setDictionaryEncoded(true); - capabilities.setHasBitmapIndexes(true); - columnCapabilities.put(dimension, capabilities); - } - DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dimension, capabilities, null); - desc = addNewDimension(dimension, capabilities, handler); - } - DimensionHandler handler = desc.getHandler(); - DimensionIndexer indexer = desc.getIndexer(); - Object dimsKey = null; - try { - dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent( - row.getRaw(dimension), - true - ); + + DimensionData prevDimensionData = this.dimensions.get(); + DimensionData dimensionData = null; + for (String dimension : rowDimensions) { + if (Strings.isNullOrEmpty(dimension)) { + continue; + } + + if (rowDimKeys.containsKey(dimension)) { + // If the dims map already contains a mapping at this index, it means we have seen this dimension already on this input row. + throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension); + } + ColumnCapabilitiesImpl capabilities; + DimensionDesc desc = prevDimensionData.getDimensionDesc(dimension); + if (desc != null) { + capabilities = desc.getCapabilities(); + } else { + if (dimensionData == null) { + dimensionData = prevDimensionData.clone(); } - catch (ParseException pe) { - parseExceptionMessages.add(pe.getMessage()); + capabilities = dimensionData.getDimensionCapabilities(dimension); + if (capabilities == null) { + capabilities = new ColumnCapabilitiesImpl(); + // For schemaless type discovery, assume everything is a String for now, can change later. + capabilities.setType(ValueType.STRING); + capabilities.setDictionaryEncoded(true); + capabilities.setHasBitmapIndexes(true); + dimensionData.putCapabilities(dimension, capabilities); } - dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey); - // Set column capabilities as data is coming in - if (!capabilities.hasMultipleValues() && - dimsKey != null && - handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) { - capabilities.setHasMultipleValues(true); + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dimension, capabilities, null); + if (dimensionData == null) { + dimensionData = prevDimensionData.clone(); } - - if (wasNewDim) { - if (overflow == null) { - overflow = new ArrayList<>(); - } - overflow.add(dimsKey); - } else if (desc.getIndex() > dims.length || dims[desc.getIndex()] != null) { - /* - * index > dims.length requires that we saw this dimension and added it to the dimensionOrder map, - * otherwise index is null. Since dims is initialized based on the size of dimensionOrder on each call to add, - * it must have been added to dimensionOrder during this InputRow. - * - * if we found an index for this dimension it means we've seen it already. If !(index > dims.length) then - * we saw it on a previous input row (this its safe to index into dims). If we found a value in - * the dims array for this index, it means we have seen this dimension already on this input row. - */ - throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension); - } else { - dims[desc.getIndex()] = dimsKey; + desc = dimensionData.addNewDimension(dimension, capabilities, handler); + } + DimensionHandler handler = desc.getHandler(); + DimensionIndexer indexer = desc.getIndexer(); + Object dimsKey = null; + try { + dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent( + row.getRaw(dimension), + true + ); + } + catch (ParseException pe) { + parseExceptionMessages.add(pe.getMessage()); + } + dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey); + // Set column capabilities as data is coming in + if (!capabilities.hasMultipleValues() && + dimsKey != null && + handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) { + if (dimensionData == null) { + dimensionData = prevDimensionData.clone(); } + capabilities = dimensionData.getDimensionCapabilities(dimension); + capabilities.setHasMultipleValues(true); } + rowDimKeys.put(dimension, dimsKey); } - if (overflow != null) { - // Merge overflow and non-overflow - Object[] newDims = new Object[dims.length + overflow.size()]; - System.arraycopy(dims, 0, newDims, 0, dims.length); - for (int i = 0; i < overflow.size(); ++i) { - newDims[dims.length + i] = overflow.get(i); + + if (dimensionData != null) { + while (!dimensions.compareAndSet(prevDimensionData, dimensionData)) { + prevDimensionData = dimensions.get(); + dimensionData.rebase(prevDimensionData); } - dims = newDims; + } else { + dimensionData = prevDimensionData; + } + Object[] dims = new Object[dimensionData.size()]; + for (String dimension : rowDimKeys.keySet()) { + DimensionDesc desc = dimensionData.getDimensionDesc(dimension); + dims[desc.getIndex()] = rowDimKeys.get(dimension); } long truncated = 0; if (row.getTimestamp() != null) { truncated = gran.bucketStart(row.getTimestamp()).getMillis(); } + IncrementalIndexRow incrementalIndexRow = IncrementalIndexRow.createTimeAndDimswithDimsKeySize( Math.max(truncated, minTimestamp), dims, - dimensionDescsList, + dimensionData.getDimensionDescsList(), dimsKeySize ); return new IncrementalIndexRowResult(incrementalIndexRow, parseExceptionMessages); @@ -760,10 +755,14 @@ public static ParseException getCombinedParseException( return pe; } - private synchronized void updateMaxIngestedTime(DateTime eventTime) + private void updateMaxIngestedTime(DateTime eventTime) { - if (maxIngestedEventTime == null || maxIngestedEventTime.isBefore(eventTime)) { - maxIngestedEventTime = eventTime; + DateTime maxEventTime = maxIngestedEventTime.get(); + while (maxEventTime == null || maxEventTime.isBefore(eventTime)) { + if (maxIngestedEventTime.compareAndSet(maxEventTime, eventTime)) { + break; + } + maxEventTime = maxIngestedEventTime.get(); } } @@ -824,24 +823,20 @@ public AggregatorFactory[] getMetricAggs() public List getDimensionNames() { - synchronized (dimensionDescs) { - return ImmutableList.copyOf(dimensionDescs.keySet()); - } + Map dimensionDescs = dimensions.get().getDimensionDescs(); + return ImmutableList.copyOf(dimensionDescs.keySet()); } public List getDimensions() { - synchronized (dimensionDescs) { - return ImmutableList.copyOf(dimensionDescs.values()); - } + return dimensions.get().getDimensionDescsList(); } @Nullable public DimensionDesc getDimension(String dimension) { - synchronized (dimensionDescs) { - return dimensionDescs.get(dimension); - } + Map dimensionDescs = dimensions.get().getDimensionDescs(); + return dimensionDescs.get(dimension); } @Nullable @@ -901,9 +896,8 @@ public Integer getDimensionIndex(String dimension) public List getDimensionOrder() { - synchronized (dimensionDescs) { - return ImmutableList.copyOf(dimensionDescs.keySet()); - } + Map dimensionDescs = dimensions.get().getDimensionDescs(); + return ImmutableList.copyOf(dimensionDescs.keySet()); } private ColumnCapabilitiesImpl makeCapabilitiesFromValueType(ValueType type) @@ -925,30 +919,20 @@ public void loadDimensionIterable( Map oldColumnCapabilities ) { - synchronized (dimensionDescs) { - if (!dimensionDescs.isEmpty()) { - throw new ISE("Cannot load dimension order when existing order[%s] is not empty.", dimensionDescs.keySet()); - } - for (String dim : oldDimensionOrder) { - if (dimensionDescs.get(dim) == null) { - ColumnCapabilitiesImpl capabilities = oldColumnCapabilities.get(dim); - columnCapabilities.put(dim, capabilities); - DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); - addNewDimension(dim, capabilities, handler); - } + DimensionData dimensionData = dimensions.get().clone(); + if (dimensionData.size() != 0) { + throw new ISE("Cannot load dimension order when existing order[%s] is not empty.", dimensionData.getDimensionDescs().keySet()); + } + for (String dim : oldDimensionOrder) { + if (dimensionData.getDimensionDesc(dim) == null) { + ColumnCapabilitiesImpl capabilities = oldColumnCapabilities.get(dim); + dimensionData.putCapabilities(dim, capabilities); + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); + dimensionData.addNewDimension(dim, capabilities, handler); } } } - @GuardedBy("dimensionDescs") - private DimensionDesc addNewDimension(String dim, ColumnCapabilitiesImpl capabilities, DimensionHandler handler) - { - DimensionDesc desc = new DimensionDesc(dimensionDescs.size(), dim, capabilities, handler); - dimensionDescs.put(dim, desc); - dimensionDescsList.add(desc); - return desc; - } - public List getMetricNames() { return ImmutableList.copyOf(metricDescs.keySet()); @@ -970,7 +954,7 @@ public StorageAdapter toStorageAdapter() public ColumnCapabilities getCapabilities(String column) { - return columnCapabilities.get(column); + return dimensions.get().getDimensionCapabilities(column); } public Metadata getMetadata() @@ -1047,7 +1031,83 @@ public Iterator iterator() public DateTime getMaxIngestedEventTime() { - return maxIngestedEventTime; + return maxIngestedEventTime.get(); + } + + private static class DimensionData + { + private Map columnCapabilities; + private Map dimensionDescs; + private List dimensionDescsList; + + + DimensionData() + { + this.columnCapabilities = new HashMap<>(); + this.dimensionDescs = Maps.newLinkedHashMap(); + this.dimensionDescsList = new ArrayList<>(); + } + + private DimensionData(Map columnCapabilities, Map dimensionDescs, List dimensionDescsList) + { + this.columnCapabilities = new HashMap<>(columnCapabilities); + this.dimensionDescs = Maps.newLinkedHashMap(dimensionDescs); + this.dimensionDescsList = new ArrayList<>(dimensionDescsList); + } + + @Override + public DimensionData clone() + { + return new DimensionData(columnCapabilities, dimensionDescs, dimensionDescsList); + } + + public DimensionDesc addNewDimension(String dim, ColumnCapabilitiesImpl capabilities, DimensionHandler handler) + { + DimensionDesc desc = new DimensionDesc(dimensionDescsList.size(), dim, capabilities, handler); + dimensionDescs.put(dim, desc); + dimensionDescsList.add(desc); + return desc; + } + + public DimensionData rebase(DimensionData previous) + { + DimensionData rebased = previous.clone(); + for (DimensionDesc dim : dimensionDescsList) { + rebased.putCapabilities(dim.getName(), dim.getCapabilities()); + rebased.addNewDimension(dim.getName(), dim.getCapabilities(), dim.getHandler()); + } + return rebased; + } + + public int size() + { + return dimensionDescsList.size(); + } + + public DimensionDesc getDimensionDesc(String dimension) + { + return dimensionDescs.get(dimension); + } + + public ColumnCapabilitiesImpl getDimensionCapabilities(String dimension) + { + return columnCapabilities.get(dimension); + } + + public void putCapabilities(String name, ColumnCapabilitiesImpl capabilities) + { + columnCapabilities.put(name, capabilities); + } + + public Map getDimensionDescs() + { + return dimensionDescs; + } + + public List getDimensionDescsList() + { + return dimensionDescsList; + } } public static final class DimensionDesc @@ -1154,22 +1214,23 @@ protected ColumnSelectorFactory makeColumnSelectorFactory( protected final Comparator dimsComparator() { - return new IncrementalIndexRowComparator(dimensionDescsList); + return new IncrementalIndexRowComparator(dimensions); } @VisibleForTesting static final class IncrementalIndexRowComparator implements Comparator { - private List dimensionDescs; + private AtomicReference dimensionData; - public IncrementalIndexRowComparator(List dimDescs) + public IncrementalIndexRowComparator(AtomicReference dimensionData) { - this.dimensionDescs = dimDescs; + this.dimensionData = dimensionData; } @Override public int compare(IncrementalIndexRow lhs, IncrementalIndexRow rhs) { + List dimensionDescs = dimensionData.get().getDimensionDescsList(); int retVal = Longs.compare(lhs.timestamp, rhs.timestamp); int numComparisons = Math.min(lhs.dims.length, rhs.dims.length); From 71315ac5846e72e341241f49e759f89a1bac2acf Mon Sep 17 00:00:00 2001 From: Eran Meir Date: Mon, 10 Jun 2019 16:08:36 +0300 Subject: [PATCH 2/4] Fixing dimension order in index creation Added missing CAS operation after dimension updates --- .../druid/segment/incremental/IncrementalIndex.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index c75a468305c4..14de536f50cf 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -919,7 +919,8 @@ public void loadDimensionIterable( Map oldColumnCapabilities ) { - DimensionData dimensionData = dimensions.get().clone(); + DimensionData prevDimensionData = dimensions.get(); + DimensionData dimensionData = prevDimensionData.clone(); if (dimensionData.size() != 0) { throw new ISE("Cannot load dimension order when existing order[%s] is not empty.", dimensionData.getDimensionDescs().keySet()); } @@ -931,6 +932,11 @@ public void loadDimensionIterable( dimensionData.addNewDimension(dim, capabilities, handler); } } + + while (!dimensions.compareAndSet(prevDimensionData, dimensionData)) { + prevDimensionData = dimensions.get(); + dimensionData.rebase(prevDimensionData); + } } public List getMetricNames() From 6e5eb5a6923597b4d379b32027666ddfa368f852 Mon Sep 17 00:00:00 2001 From: Eran Meir Date: Mon, 17 Jun 2019 14:23:52 +0300 Subject: [PATCH 3/4] Fixing some code review issues (WIP) - Permissive duplicate dimension handling (tests updated) - Additional error logging --- .../IndexIngestionMultithreadedBenchmark.java | 6 +-- .../segment/incremental/IncrementalIndex.java | 11 ++--- .../incremental/IncrementalIndexTest.java | 41 +++++++++++-------- 3 files changed, 34 insertions(+), 24 deletions(-) diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexIngestionMultithreadedBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexIngestionMultithreadedBenchmark.java index feb00981b871..c18df4aa1113 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexIngestionMultithreadedBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexIngestionMultithreadedBenchmark.java @@ -101,12 +101,12 @@ public void setup() @State(Scope.Thread) public static class ThreadState { - int id = 0; + int rowOffset = 0; @Setup public void setup(IndexIngestionMultithreadedBenchmark globalState) { - id = globalState.threadIdAllocator.getAndIncrement(); + rowOffset = globalState.threadIdAllocator.getAndIncrement(); } } @@ -136,7 +136,7 @@ private IncrementalIndex makeIncIndex() public void addRows(Blackhole blackhole, ThreadState threadState) throws Exception { int threads = threadIdAllocator.get(); - for (int i = threadState.id; i < rowsPerSegment; i += threads) { + for (int i = threadState.rowOffset; i < rowsPerSegment; i += threads) { InputRow row = rows.get(i); int rv = incIndex.add(row).getRowCount(); blackhole.consume(rv); diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index 14de536f50cf..91aed0d86ad1 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -634,12 +634,13 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) DimensionData dimensionData = null; for (String dimension : rowDimensions) { if (Strings.isNullOrEmpty(dimension)) { + parseExceptionMessages.add("InputRow contains null or empty dimension name"); continue; } - if (rowDimKeys.containsKey(dimension)) { // If the dims map already contains a mapping at this index, it means we have seen this dimension already on this input row. - throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension); + parseExceptionMessages.add(StringUtils.nonStrictFormat("Dimension[%s] occurred more than once in InputRow", dimension)); + continue; } ColumnCapabilitiesImpl capabilities; DimensionDesc desc = prevDimensionData.getDimensionDesc(dimension); @@ -700,9 +701,9 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) dimensionData = prevDimensionData; } Object[] dims = new Object[dimensionData.size()]; - for (String dimension : rowDimKeys.keySet()) { - DimensionDesc desc = dimensionData.getDimensionDesc(dimension); - dims[desc.getIndex()] = rowDimKeys.get(dimension); + for (Map.Entry dimension : rowDimKeys.entrySet()) { + DimensionDesc desc = dimensionData.getDimensionDesc(dimension.getKey()); + dims[desc.getIndex()] = dimension.getValue(); } long truncated = 0; diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java index 3dfd3210d408..9ca85e5954a5 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java @@ -29,7 +29,6 @@ import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.parsers.ParseException; @@ -156,36 +155,46 @@ public IncrementalIndex createIndex() return constructors; } - @Test(expected = ISE.class) + @Test public void testDuplicateDimensions() throws IndexSizeExceededException { IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex()); index.add( new MapBasedInputRow( - System.currentTimeMillis() - 1, + 0, Lists.newArrayList("billy", "joe"), ImmutableMap.of("billy", "A", "joe", "B") ) ); - index.add( - new MapBasedInputRow( - System.currentTimeMillis() - 1, - Lists.newArrayList("billy", "joe", "joe"), - ImmutableMap.of("billy", "A", "joe", "B") - ) + IncrementalIndexAddResult result = index.add( + new MapBasedInputRow( + 0, + Lists.newArrayList("billy", "joe", "joe"), + ImmutableMap.of("billy", "A", "joe", "B") + ) + ); + Assert.assertEquals(ParseException.class, result.getParseException().getClass()); + Assert.assertEquals( + "Found unparseable columns in row: [MapBasedInputRow{timestamp=1970-01-01T00:00:00.000Z, event={billy=A, joe=B}, dimensions=[billy, joe, joe]}], exceptions: [Dimension[joe] occurred more than once in InputRow,]", + result.getParseException().getMessage() ); } - @Test(expected = ISE.class) + @Test public void testDuplicateDimensionsFirstOccurrence() throws IndexSizeExceededException { IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex()); - index.add( - new MapBasedInputRow( - System.currentTimeMillis() - 1, - Lists.newArrayList("billy", "joe", "joe"), - ImmutableMap.of("billy", "A", "joe", "B") - ) + IncrementalIndexAddResult result = index.add( + new MapBasedInputRow( + 0, + Lists.newArrayList("billy", "joe", "joe"), + ImmutableMap.of("billy", "A", "joe", "B") + ) + ); + Assert.assertEquals(ParseException.class, result.getParseException().getClass()); + Assert.assertEquals( + "Found unparseable columns in row: [MapBasedInputRow{timestamp=1970-01-01T00:00:00.000Z, event={billy=A, joe=B}, dimensions=[billy, joe, joe]}], exceptions: [Dimension[joe] occurred more than once in InputRow,]", + result.getParseException().getMessage() ); } From fb9fe68125036997027394740b733971d3068916 Mon Sep 17 00:00:00 2001 From: Eran Meir Date: Thu, 20 Jun 2019 14:11:15 +0300 Subject: [PATCH 4/4] Fixing some code review issues (WIP) - Refactoring toIncrementalIndexRow - Additional comments indicating code that should be thread-safe --- .../segment/incremental/IncrementalIndex.java | 201 ++++++++++-------- 1 file changed, 113 insertions(+), 88 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index 91aed0d86ad1..5845442e8835 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -235,6 +235,8 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final Map metricDescs; + // Dimension data may be updated by concurrent non-blocking threads. + // Mutating the reference directly wouble be a concurrency bug. private final AtomicReference dimensions; private final AtomicInteger numEntries = new AtomicInteger(); private final AtomicLong bytesInMemory = new AtomicLong(); @@ -624,14 +626,49 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) if (row.getTimestampFromEpoch() < minTimestamp) { throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, DateTimes.utc(minTimestamp)); } - final List rowDimensions = row.getDimensions(); + DimensionData prevDimensionData = this.dimensions.get(); + RowDimsKeyComponents dimsKeyComponents = getRowDimsKeyComponents(row, prevDimensionData); + DimensionData dimensionData = dimsKeyComponents.getUpdatedDimensionData(); + while (dimensionData != null && !dimensions.compareAndSet(prevDimensionData, dimensionData)) { + prevDimensionData = dimensions.get(); + dimsKeyComponents = getRowDimsKeyComponents(row, prevDimensionData); + dimensionData = dimsKeyComponents.getUpdatedDimensionData(); + } - Map rowDimKeys = new HashMap<>(); + dimensionData = dimensions.get(); long dimsKeySize = 0; - List parseExceptionMessages = new ArrayList<>(); + Object[] dims = new Object[dimensionData.size()]; + for (Map.Entry dimension : dimsKeyComponents.getRowDimKeys().entrySet()) { + DimensionDesc desc = dimensionData.getDimensionDesc(dimension.getKey()); + DimensionIndexer indexer = desc.getIndexer(); + Object dimsKey = dimension.getValue(); + dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey); + dims[desc.getIndex()] = dimsKey; + } - DimensionData prevDimensionData = this.dimensions.get(); + long truncated = 0; + if (row.getTimestamp() != null) { + truncated = gran.bucketStart(row.getTimestamp()).getMillis(); + } + + IncrementalIndexRow incrementalIndexRow = IncrementalIndexRow.createTimeAndDimswithDimsKeySize( + Math.max(truncated, minTimestamp), + dims, + dimensionData.getDimensionDescsList(), + dimsKeySize + ); + return new IncrementalIndexRowResult(incrementalIndexRow, dimsKeyComponents.getParseExceptionMessages()); + } + + // Note: This method might be called from multiple parallel threads without synchronization. Making thread-unsafe + // calls, or mutating prevDimensionData would be a concurrency bug + private RowDimsKeyComponents getRowDimsKeyComponents(InputRow row, DimensionData prevDimensionData) + { + List parseExceptionMessages = new ArrayList<>(); + Map rowDimKeys = new HashMap<>(); DimensionData dimensionData = null; + + final List rowDimensions = row.getDimensions(); for (String dimension : rowDimensions) { if (Strings.isNullOrEmpty(dimension)) { parseExceptionMessages.add("InputRow contains null or empty dimension name"); @@ -642,29 +679,14 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) parseExceptionMessages.add(StringUtils.nonStrictFormat("Dimension[%s] occurred more than once in InputRow", dimension)); continue; } - ColumnCapabilitiesImpl capabilities; DimensionDesc desc = prevDimensionData.getDimensionDesc(dimension); - if (desc != null) { - capabilities = desc.getCapabilities(); - } else { + if (desc == null) { if (dimensionData == null) { dimensionData = prevDimensionData.clone(); } - capabilities = dimensionData.getDimensionCapabilities(dimension); - if (capabilities == null) { - capabilities = new ColumnCapabilitiesImpl(); - // For schemaless type discovery, assume everything is a String for now, can change later. - capabilities.setType(ValueType.STRING); - capabilities.setDictionaryEncoded(true); - capabilities.setHasBitmapIndexes(true); - dimensionData.putCapabilities(dimension, capabilities); - } - DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dimension, capabilities, null); - if (dimensionData == null) { - dimensionData = prevDimensionData.clone(); - } - desc = dimensionData.addNewDimension(dimension, capabilities, handler); + desc = dimensionData.addNewDimension(dimension); } + ColumnCapabilitiesImpl capabilities = desc.getCapabilities(); DimensionHandler handler = desc.getHandler(); DimensionIndexer indexer = desc.getIndexer(); Object dimsKey = null; @@ -677,7 +699,6 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) catch (ParseException pe) { parseExceptionMessages.add(pe.getMessage()); } - dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey); // Set column capabilities as data is coming in if (!capabilities.hasMultipleValues() && dimsKey != null && @@ -690,34 +711,7 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) } rowDimKeys.put(dimension, dimsKey); } - - - if (dimensionData != null) { - while (!dimensions.compareAndSet(prevDimensionData, dimensionData)) { - prevDimensionData = dimensions.get(); - dimensionData.rebase(prevDimensionData); - } - } else { - dimensionData = prevDimensionData; - } - Object[] dims = new Object[dimensionData.size()]; - for (Map.Entry dimension : rowDimKeys.entrySet()) { - DimensionDesc desc = dimensionData.getDimensionDesc(dimension.getKey()); - dims[desc.getIndex()] = dimension.getValue(); - } - - long truncated = 0; - if (row.getTimestamp() != null) { - truncated = gran.bucketStart(row.getTimestamp()).getMillis(); - } - - IncrementalIndexRow incrementalIndexRow = IncrementalIndexRow.createTimeAndDimswithDimsKeySize( - Math.max(truncated, minTimestamp), - dims, - dimensionData.getDimensionDescsList(), - dimsKeySize - ); - return new IncrementalIndexRowResult(incrementalIndexRow, parseExceptionMessages); + return new RowDimsKeyComponents(rowDimKeys, dimensionData, parseExceptionMessages); } public static ParseException getCombinedParseException( @@ -920,24 +914,21 @@ public void loadDimensionIterable( Map oldColumnCapabilities ) { - DimensionData prevDimensionData = dimensions.get(); - DimensionData dimensionData = prevDimensionData.clone(); - if (dimensionData.size() != 0) { - throw new ISE("Cannot load dimension order when existing order[%s] is not empty.", dimensionData.getDimensionDescs().keySet()); - } - for (String dim : oldDimensionOrder) { - if (dimensionData.getDimensionDesc(dim) == null) { - ColumnCapabilitiesImpl capabilities = oldColumnCapabilities.get(dim); - dimensionData.putCapabilities(dim, capabilities); - DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); - dimensionData.addNewDimension(dim, capabilities, handler); - } - } - - while (!dimensions.compareAndSet(prevDimensionData, dimensionData)) { + DimensionData prevDimensionData, dimensionData; + do { prevDimensionData = dimensions.get(); - dimensionData.rebase(prevDimensionData); - } + dimensionData = prevDimensionData.clone(); + if (dimensionData.size() != 0) { + throw new ISE("Cannot load dimension order when existing order[%s] is not empty.", dimensionData.getDimensionDescs().keySet()); + } + for (String dim : oldDimensionOrder) { + if (dimensionData.getDimensionDesc(dim) == null) { + ColumnCapabilitiesImpl capabilities = oldColumnCapabilities.get(dim); + dimensionData.putCapabilities(dim, capabilities); + dimensionData.addNewDimension(dim); + } + } + } while (!dimensions.compareAndSet(prevDimensionData, dimensionData)); } public List getMetricNames() @@ -1041,12 +1032,41 @@ public DateTime getMaxIngestedEventTime() return maxIngestedEventTime.get(); } - private static class DimensionData + private static class RowDimsKeyComponents { - private Map columnCapabilities; - private Map dimensionDescs; - private List dimensionDescsList; + public Map getRowDimKeys() + { + return rowDimKeys; + } + + @Nullable + public DimensionData getUpdatedDimensionData() + { + return updatedDimensionData; + } + + public List getParseExceptionMessages() + { + return parseExceptionMessages; + } + private final Map rowDimKeys; + private final @Nullable DimensionData updatedDimensionData; + private final List parseExceptionMessages; + + private RowDimsKeyComponents(Map rowDimKeys, @Nullable DimensionData updatedDimensionData, List parseExceptionMessages) + { + this.rowDimKeys = rowDimKeys; + this.updatedDimensionData = updatedDimensionData; + this.parseExceptionMessages = parseExceptionMessages; + } + } + + private static class DimensionData + { + private final Map columnCapabilities; + private final Map dimensionDescs; + private final List dimensionDescsList; DimensionData() { @@ -1068,6 +1088,21 @@ public DimensionData clone() return new DimensionData(columnCapabilities, dimensionDescs, dimensionDescsList); } + public DimensionDesc addNewDimension(String dim) + { + ColumnCapabilitiesImpl capabilities = getDimensionCapabilities(dim); + if (capabilities == null) { + capabilities = new ColumnCapabilitiesImpl(); + // For schemaless type discovery, assume everything is a String for now, can change later. + capabilities.setType(ValueType.STRING); + capabilities.setDictionaryEncoded(true); + capabilities.setHasBitmapIndexes(true); + putCapabilities(dim, capabilities); + } + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); + return addNewDimension(dim, capabilities, handler); + } + public DimensionDesc addNewDimension(String dim, ColumnCapabilitiesImpl capabilities, DimensionHandler handler) { DimensionDesc desc = new DimensionDesc(dimensionDescsList.size(), dim, capabilities, handler); @@ -1076,34 +1111,24 @@ public DimensionDesc addNewDimension(String dim, ColumnCapabilitiesImpl capabili return desc; } - public DimensionData rebase(DimensionData previous) - { - DimensionData rebased = previous.clone(); - for (DimensionDesc dim : dimensionDescsList) { - rebased.putCapabilities(dim.getName(), dim.getCapabilities()); - rebased.addNewDimension(dim.getName(), dim.getCapabilities(), dim.getHandler()); - } - return rebased; - } - public int size() { return dimensionDescsList.size(); } - public DimensionDesc getDimensionDesc(String dimension) + public DimensionDesc getDimensionDesc(String dim) { - return dimensionDescs.get(dimension); + return dimensionDescs.get(dim); } - public ColumnCapabilitiesImpl getDimensionCapabilities(String dimension) + public ColumnCapabilitiesImpl getDimensionCapabilities(String dim) { - return columnCapabilities.get(dimension); + return columnCapabilities.get(dim); } - public void putCapabilities(String name, ColumnCapabilitiesImpl capabilities) + public void putCapabilities(String dim, ColumnCapabilitiesImpl capabilities) { - columnCapabilities.put(name, capabilities); + columnCapabilities.put(dim, capabilities); } public Map getDimensionDescs()