From a7c8879de178ba11c491d9ff0b49d20b1609900a Mon Sep 17 00:00:00 2001 From: Liran Funaro Date: Wed, 5 Jan 2022 15:11:59 +0200 Subject: [PATCH 1/5] Preparations for IncrementalIndex extensions --- .../druid/segment/DoubleDimensionIndexer.java | 15 +- .../druid/segment/FloatDimensionIndexer.java | 15 +- .../druid/segment/LongDimensionIndexer.java | 15 +- .../druid/segment/StringDimensionIndexer.java | 110 ++++----- .../segment/incremental/IncrementalIndex.java | 213 +++++++++++++---- .../incremental/IncrementalIndexAdapter.java | 7 +- .../incremental/IncrementalIndexRow.java | 24 +- .../incremental/OnheapIncrementalIndex.java | 215 ++++-------------- .../IncrementalIndexAdapterTest.java | 33 +-- .../IncrementalIndexIngestionTest.java | 7 +- .../OnheapIncrementalIndexBenchmark.java | 9 +- .../druid/segment/realtime/plumber/Sink.java | 2 +- 12 files changed, 345 insertions(+), 320 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/DoubleDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/DoubleDimensionIndexer.java index f943f2f163ba..69188ebe7b62 100644 --- a/processing/src/main/java/org/apache/druid/segment/DoubleDimensionIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/DoubleDimensionIndexer.java @@ -132,20 +132,19 @@ class IndexerDoubleColumnSelector implements DoubleColumnSelector @Override public boolean isNull() { - final Object[] dims = currEntry.get().getDims(); - return hasNulls && (dimIndex >= dims.length || dims[dimIndex] == null); + return hasNulls && currEntry.get().isDimNull(dimIndex); } @Override public double getDouble() { - final Object[] dims = currEntry.get().getDims(); + final Object dim = currEntry.get().getDim(dimIndex); - if (dimIndex >= dims.length || dims[dimIndex] == null) { + if (dim == null) { assert NullHandling.replaceWithDefault(); return 0.0; } - return (Double) dims[dimIndex]; + return (Double) dim; } @SuppressWarnings("deprecation") @@ -153,12 +152,12 @@ public double getDouble() @Override public Double getObject() { - final Object[] dims = currEntry.get().getDims(); + final Object dim = currEntry.get().getDim(dimIndex); - if (dimIndex >= dims.length || dims[dimIndex] == null) { + if (dim == null) { return NullHandling.defaultDoubleValue(); } - return (Double) dims[dimIndex]; + return (Double) dim; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/FloatDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/FloatDimensionIndexer.java index 21bd1a65c1b7..dbf5650167b7 100644 --- a/processing/src/main/java/org/apache/druid/segment/FloatDimensionIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/FloatDimensionIndexer.java @@ -133,21 +133,20 @@ class IndexerFloatColumnSelector implements FloatColumnSelector @Override public boolean isNull() { - final Object[] dims = currEntry.get().getDims(); - return hasNulls && (dimIndex >= dims.length || dims[dimIndex] == null); + return hasNulls && currEntry.get().isDimNull(dimIndex); } @Override public float getFloat() { - final Object[] dims = currEntry.get().getDims(); + final Object dim = currEntry.get().getDim(dimIndex); - if (dimIndex >= dims.length || dims[dimIndex] == null) { + if (dim == null) { assert NullHandling.replaceWithDefault(); return 0.0f; } - return (Float) dims[dimIndex]; + return (Float) dim; } @SuppressWarnings("deprecation") @@ -155,13 +154,13 @@ public float getFloat() @Override public Float getObject() { - final Object[] dims = currEntry.get().getDims(); + final Object dim = currEntry.get().getDim(dimIndex); - if (dimIndex >= dims.length || dims[dimIndex] == null) { + if (dim == null) { return NullHandling.defaultFloatValue(); } - return (Float) dims[dimIndex]; + return (Float) dim; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/LongDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/LongDimensionIndexer.java index bce27c27fcc9..4e2236a8525b 100644 --- a/processing/src/main/java/org/apache/druid/segment/LongDimensionIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/LongDimensionIndexer.java @@ -134,21 +134,20 @@ class IndexerLongColumnSelector implements LongColumnSelector @Override public boolean isNull() { - final Object[] dims = currEntry.get().getDims(); - return hasNulls && (dimIndex >= dims.length || dims[dimIndex] == null); + return hasNulls && currEntry.get().isDimNull(dimIndex); } @Override public long getLong() { - final Object[] dims = currEntry.get().getDims(); + final Object dim = currEntry.get().getDim(dimIndex); - if (dimIndex >= dims.length || dims[dimIndex] == null) { + if (dim == null) { assert NullHandling.replaceWithDefault(); return 0; } - return (Long) dims[dimIndex]; + return (Long) dim; } @SuppressWarnings("deprecation") @@ -156,13 +155,13 @@ public long getLong() @Override public Long getObject() { - final Object[] dims = currEntry.get().getDims(); + final Object dim = currEntry.get().getDim(dimIndex); - if (dimIndex >= dims.length || dims[dimIndex] == null) { + if (dim == null) { return NullHandling.defaultLongValue(); } - return (Long) dims[dimIndex]; + return (Long) dim; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java index 108db6e72ff8..9abb032eb10c 100644 --- a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java @@ -265,53 +265,62 @@ class IndexerDimensionSelector implements DimensionSelector, IdLookup @MonotonicNonNull private int[] nullIdIntArray; - @Override - public IndexedInts getRow() + /** + * Tries to fetch the int array using getDim() and convert it to IndexedInts. + * If the dim is null or with zero length, the value is considered null. + * It may be null or empty due to currEntry's rowIndex being smaller than the row's rowIndex in which this + * dim first appears. + * + * @return IndexedInts instance, or null if the dim is null. + */ + @Nullable + private IndexedInts getRowOrNull() { - final Object[] dims = currEntry.get().getDims(); + IncrementalIndexRow key = currEntry.get(); - int[] indices; - if (dimIndex < dims.length) { - indices = (int[]) dims[dimIndex]; - } else { - indices = null; + if (key.isDimNull(dimIndex)) { + return null; } - int[] row = null; - int rowSize = 0; + int[] indices = (int[]) key.getDim(dimIndex); - // usually due to currEntry's rowIndex is smaller than the row's rowIndex in which this dim first appears if (indices == null || indices.length == 0) { - if (hasMultipleValues) { - row = IntArrays.EMPTY_ARRAY; - rowSize = 0; - } else { - final int nullId = getEncodedValue(null, false); - if (nullId >= 0 && nullId < maxId) { - // null was added to the dictionary before this selector was created; return its ID. - if (nullIdIntArray == null) { - nullIdIntArray = new int[]{nullId}; - } - row = nullIdIntArray; - rowSize = 1; - } else { - // null doesn't exist in the dictionary; return an empty array. - // Choose to use ArrayBasedIndexedInts later, instead of special "empty" IndexedInts, for monomorphism - row = IntArrays.EMPTY_ARRAY; - rowSize = 0; - } - } + return null; } - if (row == null && indices != null && indices.length > 0) { - row = indices; - rowSize = indices.length; + indexedInts.setValues(indices, indices.length); + return indexedInts; + } + + private IndexedInts getDefaultIndexedInts() + { + if (hasMultipleValues) { + indexedInts.setValues(IntArrays.EMPTY_ARRAY, 0); + } else { + final int nullId = getEncodedValue(null, false); + if (nullId >= 0 && nullId < maxId) { + // null was added to the dictionary before this selector was created; return its ID. + if (nullIdIntArray == null) { + nullIdIntArray = new int[]{nullId}; + } + indexedInts.setValues(nullIdIntArray, 1); + } else { + // null doesn't exist in the dictionary; return an empty array. + // Choose to use ArrayBasedIndexedInts later, instead of special "empty" IndexedInts, for monomorphism + indexedInts.setValues(IntArrays.EMPTY_ARRAY, 0); + } } - indexedInts.setValues(row, rowSize); return indexedInts; } + @Override + public IndexedInts getRow() + { + IndexedInts ret = getRowOrNull(); + return ret != null ? ret : getDefaultIndexedInts(); + } + @Override public ValueMatcher makeValueMatcher(final String value) { @@ -323,18 +332,14 @@ public ValueMatcher makeValueMatcher(final String value) @Override public boolean matches() { - Object[] dims = currEntry.get().getDims(); - if (dimIndex >= dims.length) { - return value == null; - } - - int[] dimsInt = (int[]) dims[dimIndex]; - if (dimsInt == null || dimsInt.length == 0) { + IndexedInts dimsInt = getRowOrNull(); + if (dimsInt == null) { return value == null; } - for (int id : dimsInt) { - if (id == valueId) { + int size = dimsInt.size(); + for (int i = 0; i < size; i++) { + if (dimsInt.get(i) == valueId) { return true; } } @@ -369,17 +374,14 @@ public ValueMatcher makeValueMatcher(final Predicate predicate) @Override public boolean matches() { - Object[] dims = currEntry.get().getDims(); - if (dimIndex >= dims.length) { - return matchNull; - } - - int[] dimsInt = (int[]) dims[dimIndex]; - if (dimsInt == null || dimsInt.length == 0) { + IndexedInts dimsInt = getRowOrNull(); + if (dimsInt == null) { return matchNull; } - for (int id : dimsInt) { + int size = dimsInt.size(); + for (int i = 0; i < size; i++) { + int id = dimsInt.get(i); if (checkedIds.get(id)) { if (matchingIds.get(id)) { return true; @@ -464,12 +466,12 @@ public Object getObject() return null; } - Object[] dims = key.getDims(); - if (dimIndex >= dims.length) { + Object dim = key.getDim(dimIndex); + if (dim == null) { return null; } - return convertUnsortedEncodedKeyComponentToActualList((int[]) dims[dimIndex]); + return convertUnsortedEncodedKeyComponentToActualList((int[]) dim); } @SuppressWarnings("deprecation") diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index a94ae1c23493..eb995e8e2534 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -34,6 +34,7 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -41,6 +42,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.UnparseableColumnsParseException; @@ -97,7 +99,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; public abstract class IncrementalIndex extends AbstractIndex implements Iterable, Closeable, ColumnInspector @@ -219,6 +220,9 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final long minTimestamp; private final Granularity gran; private final boolean rollup; + protected final int maxRowCount; + protected final long maxBytesInMemory; + private final List> rowTransformers; private final VirtualColumns virtualColumns; private final AggregatorFactory[] metrics; @@ -228,17 +232,22 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final Map metricDescs; private final Map dimensionDescs; - private final List dimensionDescsList; + protected final List dimensionDescsList; // dimension capabilities are provided by the indexers private final Map timeAndMetricsColumnCapabilities; - private final AtomicInteger numEntries = new AtomicInteger(); - private final AtomicLong bytesInMemory = new AtomicLong(); + protected final Map selectors; + + protected final AtomicInteger indexIncrement = new AtomicInteger(0); // This is modified on add() in a critical section. private final ThreadLocal in = new ThreadLocal<>(); private final Supplier rowSupplier = in::get; - private volatile DateTime maxIngestedEventTime; + @Nullable + private volatile DateTime maxIngestedEventTime = null; + + @Nullable + private String outOfRowsReason = null; /** @@ -256,7 +265,9 @@ public ColumnCapabilities getColumnCapabilities(String columnName) protected IncrementalIndex( final IncrementalIndexSchema incrementalIndexSchema, final boolean deserializeComplexMetrics, - final boolean concurrentEventAdd + final boolean concurrentEventAdd, + final int maxRowCount, + final long maxBytesInMemory ) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); @@ -267,6 +278,9 @@ protected IncrementalIndex( this.rowTransformers = new CopyOnWriteArrayList<>(); this.deserializeComplexMetrics = deserializeComplexMetrics; + this.maxRowCount = maxRowCount; + this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory; + this.timeAndMetricsColumnCapabilities = new HashMap<>(); this.metricDescs = Maps.newLinkedHashMap(); this.dimensionDescs = Maps.newLinkedHashMap(); @@ -278,7 +292,7 @@ protected IncrementalIndex( this.rollup ); - initAggs(metrics, rowSupplier, deserializeComplexMetrics, concurrentEventAdd); + this.selectors = generateSelectors(metrics, rowSupplier, deserializeComplexMetrics, concurrentEventAdd); for (AggregatorFactory metric : metrics) { MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric); @@ -326,11 +340,32 @@ protected IncrementalIndex( public abstract FactsHolder getFacts(); - public abstract boolean canAppendRow(); + public boolean canAppendRow() + { + final boolean countCheck = size() < maxRowCount; + // if maxBytesInMemory = -1, then ignore sizeCheck + final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory() < maxBytesInMemory; + if (!countCheck && !sizeCheck) { + outOfRowsReason = StringUtils.format( + "Maximum number of rows [%d] and maximum size in bytes [%d] reached", + maxRowCount, + maxBytesInMemory + ); + } else if (!countCheck) { + outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount); + } else if (!sizeCheck) { + outOfRowsReason = StringUtils.format("Maximum size in bytes [%d] reached", maxBytesInMemory); + } + return countCheck && sizeCheck; + } - public abstract String getOutOfRowsReason(); + @Nullable + public String getOutOfRowsReason() + { + return outOfRowsReason; + } - protected abstract void initAggs( + protected abstract Map generateSelectors( AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics, @@ -348,15 +383,15 @@ protected abstract AddToFactsResult addToFacts( public abstract int getLastRowIndex(); - protected abstract float getMetricFloatValue(int rowOffset, int aggOffset); + protected abstract float getMetricFloatValue(IncrementalIndexRow incrementalIndexRow, int aggOffset); - protected abstract long getMetricLongValue(int rowOffset, int aggOffset); + protected abstract long getMetricLongValue(IncrementalIndexRow incrementalIndexRow, int aggOffset); - protected abstract Object getMetricObjectValue(int rowOffset, int aggOffset); + protected abstract Object getMetricObjectValue(IncrementalIndexRow incrementalIndexRow, int aggOffset); - protected abstract double getMetricDoubleValue(int rowOffset, int aggOffset); + protected abstract double getMetricDoubleValue(IncrementalIndexRow incrementalIndexRow, int aggOffset); - protected abstract boolean isNull(int rowOffset, int aggOffset); + protected abstract boolean isNull(IncrementalIndexRow incrementalIndexRow, int aggOffset); static class IncrementalIndexRowResult { @@ -380,7 +415,7 @@ List getParseExceptionMessages() } } - static class AddToFactsResult + public static class AddToFactsResult { private final int rowCount; private final long bytesInMemory; @@ -421,6 +456,7 @@ public boolean isRollup() @Override public void close() { + selectors.clear(); } public InputRow formatRow(InputRow row) @@ -689,34 +725,23 @@ private synchronized void updateMaxIngestedTime(DateTime eventTime) public boolean isEmpty() { - return numEntries.get() == 0; + return size() == 0; } - public int size() - { - return numEntries.get(); - } + public abstract int size(); + + public abstract long getBytesInMemory(); boolean getDeserializeComplexMetrics() { return deserializeComplexMetrics; } - AtomicInteger getNumEntries() - { - return numEntries; - } - AggregatorFactory[] getMetrics() { return metrics; } - public AtomicLong getBytesInMemory() - { - return bytesInMemory; - } - private long getMinTimeMillis() { return getFacts().getMinTimeMillis(); @@ -913,6 +938,52 @@ public abstract Iterable iterableWithPostAggregations( boolean descending ); + /** + * Apply post aggregation on a row. + * @param row the row to apply on + * @param values a stream of the row's values + * @param postAggs the post aggregators + * @return a new row + */ + public MapBasedRow getMapBasedRowWithPostAggregations( + IncrementalIndexRow row, + Stream values, + @Nullable final List postAggs + ) + { + Map theVals = Maps.newLinkedHashMap(); + + for (int i = 0; i < row.getDimsLength(); ++i) { + Object dim = row.getDim(i); + DimensionDesc dimensionDesc = dimensionDescsList.get(i); + if (dimensionDesc == null) { + continue; + } + String dimensionName = dimensionDesc.getName(); + DimensionHandler handler = dimensionDesc.getHandler(); + if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) { + theVals.put(dimensionName, null); + continue; + } + final DimensionIndexer indexer = dimensionDesc.getIndexer(); + Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualList(dim); + theVals.put(dimensionName, rowVals); + } + + int i = 0; + for (Iterator it = values.iterator(); it.hasNext() && i < metrics.length; i++) { + theVals.put(metrics[i].getName(), it.next()); + } + + if (postAggs != null) { + for (PostAggregator postAgg : postAggs) { + theVals.put(postAgg.getName(), postAgg.compute(theVals)); + } + } + + return new MapBasedRow(row.getTimestamp(), theVals); + } + public DateTime getMaxIngestedEventTime() { return maxIngestedEventTime; @@ -1022,6 +1093,19 @@ protected ColumnSelectorFactory makeColumnSelectorFactory( return makeColumnSelectorFactory(this, virtualColumns, agg, in, deserializeComplexMetrics); } + protected ColumnSelectorFactory makeCachedColumnSelectorFactory( + final AggregatorFactory agg, + final Supplier in, + final boolean deserializeComplexMetrics, + final boolean concurrentEventAdd + ) + { + return new CachingColumnSelectorFactory( + makeColumnSelectorFactory(agg, in, deserializeComplexMetrics), + concurrentEventAdd + ); + } + protected final Comparator dimsComparator() { return new IncrementalIndexRowComparator(dimensionDescsList); @@ -1088,7 +1172,7 @@ private static boolean allNull(Object[] dims, int startPosition) return true; } - interface FactsHolder + public interface FactsHolder { /** * @return the previous rowIndex associated with the specified key, or @@ -1362,7 +1446,7 @@ public LongMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metricI public long getLong() { assert NullHandling.replaceWithDefault() || !isNull(); - return getMetricLongValue(currEntry.get().getRowIndex(), metricIndex); + return getMetricLongValue(currEntry.get(), metricIndex); } @Override @@ -1374,7 +1458,7 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) @Override public boolean isNull() { - return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), metricIndex); + return IncrementalIndex.this.isNull(currEntry.get(), metricIndex); } } @@ -1399,7 +1483,7 @@ public ObjectMetricColumnSelector( @Override public Object getObject() { - return getMetricObjectValue(currEntry.get().getRowIndex(), metricIndex); + return getMetricObjectValue(currEntry.get(), metricIndex); } @Override @@ -1430,7 +1514,7 @@ public FloatMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metric public float getFloat() { assert NullHandling.replaceWithDefault() || !isNull(); - return getMetricFloatValue(currEntry.get().getRowIndex(), metricIndex); + return getMetricFloatValue(currEntry.get(), metricIndex); } @Override @@ -1442,7 +1526,7 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) @Override public boolean isNull() { - return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), metricIndex); + return IncrementalIndex.this.isNull(currEntry.get(), metricIndex); } } @@ -1461,13 +1545,13 @@ public DoubleMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metri public double getDouble() { assert NullHandling.replaceWithDefault() || !isNull(); - return getMetricDoubleValue(currEntry.get().getRowIndex(), metricIndex); + return getMetricDoubleValue(currEntry.get(), metricIndex); } @Override public boolean isNull() { - return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), metricIndex); + return IncrementalIndex.this.isNull(currEntry.get(), metricIndex); } @Override @@ -1476,4 +1560,55 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) inspector.visit("index", IncrementalIndex.this); } } + + /** + * Caches references to selector objects for each column instead of creating a new object each time in order to save + * heap space. In general the selectorFactory need not to thread-safe. If required, set concurrentEventAdd to true to + * use concurrent hash map instead of vanilla hash map for thread-safe operations. + */ + static class CachingColumnSelectorFactory implements ColumnSelectorFactory + { + private final Map> columnSelectorMap; + private final ColumnSelectorFactory delegate; + + public CachingColumnSelectorFactory(ColumnSelectorFactory delegate, boolean concurrentEventAdd) + { + this.delegate = delegate; + + if (concurrentEventAdd) { + columnSelectorMap = new ConcurrentHashMap<>(); + } else { + columnSelectorMap = new HashMap<>(); + } + } + + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return delegate.makeDimensionSelector(dimensionSpec); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + ColumnValueSelector existing = columnSelectorMap.get(columnName); + if (existing != null) { + return existing; + } + + // We cannot use columnSelectorMap.computeIfAbsent(columnName, delegate::makeColumnValueSelector) + // here since makeColumnValueSelector may modify the columnSelectorMap itself through + // virtual column references, triggering a ConcurrentModificationException in JDK 9 and above. + ColumnValueSelector columnValueSelector = delegate.makeColumnValueSelector(columnName); + existing = columnSelectorMap.putIfAbsent(columnName, columnValueSelector); + return existing != null ? existing : columnValueSelector; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String columnName) + { + return delegate.getColumnCapabilities(columnName); + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java index 1f2be9ed1ec7..16e9efb523bc 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java @@ -95,14 +95,13 @@ private void processRows( { int rowNum = 0; for (IncrementalIndexRow row : index.getFacts().persistIterable()) { - final Object[] dims = row.getDims(); - for (IncrementalIndex.DimensionDesc dimension : dimensions) { final int dimIndex = dimension.getIndex(); DimensionAccessor accessor = accessors.get(dimension.getName()); // Add 'null' to the dimension's dictionary. - if (dimIndex >= dims.length || dims[dimIndex] == null) { + // No need to check "dimIndex >= row.getDimsLength()" because isDimNull() verifies that. + if (row.isDimNull(dimIndex)) { accessor.indexer.processRowValsToUnsortedEncodedKeyComponent(null, true); continue; } @@ -111,7 +110,7 @@ private void processRows( if (capabilities.hasBitmapIndexes()) { final MutableBitmap[] bitmapIndexes = accessor.invertedIndexes; final DimensionIndexer indexer = accessor.indexer; - indexer.fillBitmapsFromUnsortedEncodedKeyComponent(dims[dimIndex], rowNum, bitmapIndexes, bitmapFactory); + indexer.fillBitmapsFromUnsortedEncodedKeyComponent(row.getDim(dimIndex), rowNum, bitmapIndexes, bitmapFactory); } } ++rowNum; diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java index 987ee5f8bf7c..b9661a7be81f 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java @@ -30,7 +30,7 @@ import java.util.Collections; import java.util.List; -public final class IncrementalIndexRow +public class IncrementalIndexRow { public static final int EMPTY_ROW_INDEX = -1; @@ -56,7 +56,7 @@ public final class IncrementalIndexRow this(timestamp, dims, dimensionDescsList, EMPTY_ROW_INDEX); } - IncrementalIndexRow( + public IncrementalIndexRow( long timestamp, Object[] dims, List dimensionDescsList, @@ -97,9 +97,23 @@ public long getTimestamp() return timestamp; } - public Object[] getDims() + @Nullable + public Object getDim(int index) + { + if (index >= dims.length) { + return null; + } + return dims[index]; + } + + public int getDimsLength() + { + return dims.length; + } + + public boolean isDimNull(int index) { - return dims; + return (index >= dims.length) || (dims[index] == null); } public int getRowIndex() @@ -107,7 +121,7 @@ public int getRowIndex() return rowIndex; } - void setRowIndex(int rowIndex) + public void setRowIndex(int rowIndex) { this.rowIndex = rowIndex; } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java index bb24ae4cadde..87e771f088a4 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java @@ -21,24 +21,15 @@ import com.google.common.base.Supplier; import com.google.common.collect.Iterators; -import com.google.common.collect.Maps; import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.data.input.Row; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; -import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.DimensionHandler; -import org.apache.druid.segment.DimensionIndexer; -import org.apache.druid.segment.DimensionSelector; -import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; @@ -51,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; /** * @@ -73,15 +65,10 @@ public class OnheapIncrementalIndex extends IncrementalIndex private static final int ROUGH_OVERHEAD_PER_MAP_ENTRY = Long.BYTES * 5 + Integer.BYTES; private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); private final FactsHolder facts; - private final AtomicInteger indexIncrement = new AtomicInteger(0); private final long maxBytesPerRowForAggregators; - protected final int maxRowCount; - protected final long maxBytesInMemory; - @Nullable - private volatile Map selectors; - @Nullable - private String outOfRowsReason = null; + protected final AtomicInteger numEntries = new AtomicInteger(); + protected final AtomicLong bytesInMemory = new AtomicLong(); OnheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, @@ -92,9 +79,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex long maxBytesInMemory ) { - super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd); - this.maxRowCount = maxRowCount; - this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory; + super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, maxRowCount, maxBytesInMemory); this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) : new PlainFactsHolder(sortFacts, dimsComparator()); maxBytesPerRowForAggregators = getMaxBytesPerRowForAggregators(incrementalIndexSchema); @@ -141,23 +126,33 @@ public FactsHolder getFacts() } @Override - protected void initAggs( + protected Map generateSelectors( final AggregatorFactory[] metrics, final Supplier rowSupplier, final boolean deserializeComplexMetrics, final boolean concurrentEventAdd ) { - selectors = new HashMap<>(); + Map selectors = new HashMap<>(); for (AggregatorFactory agg : metrics) { selectors.put( agg.getName(), - new CachingColumnSelectorFactory( - makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics), - concurrentEventAdd - ) + makeCachedColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics, concurrentEventAdd) ); } + return selectors; + } + + @Override + public int size() + { + return numEntries.get(); + } + + @Override + public long getBytesInMemory() + { + return bytesInMemory.get(); } @Override @@ -174,8 +169,6 @@ protected AddToFactsResult addToFacts( Aggregator[] aggs; final AggregatorFactory[] metrics = getMetrics(); - final AtomicInteger numEntries = getNumEntries(); - final AtomicLong sizeInBytes = getBytesInMemory(); if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) { aggs = concurrentGet(priorIndex); doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages); @@ -188,7 +181,7 @@ protected AddToFactsResult addToFacts( concurrentSet(rowIndex, aggs); // Last ditch sanity checks - if ((numEntries.get() >= maxRowCount || sizeInBytes.get() >= maxBytesInMemory) + if ((numEntries.get() >= maxRowCount || bytesInMemory.get() >= maxBytesInMemory) && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX && !skipMaxRowsInMemoryCheck) { throw new IndexSizeExceededException( @@ -201,7 +194,7 @@ protected AddToFactsResult addToFacts( if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) { numEntries.incrementAndGet(); long estimatedRowSize = estimateRowSizeInBytes(key, maxBytesPerRowForAggregators); - sizeInBytes.addAndGet(estimatedRowSize); + bytesInMemory.addAndGet(estimatedRowSize); } else { // We lost a race parseExceptionMessages.clear(); @@ -213,7 +206,7 @@ protected AddToFactsResult addToFacts( } } - return new AddToFactsResult(numEntries.get(), sizeInBytes.get(), parseExceptionMessages); + return new AddToFactsResult(numEntries.get(), bytesInMemory.get(), parseExceptionMessages); } /** @@ -315,69 +308,39 @@ protected void concurrentRemove(int offset) aggregators.remove(offset); } - @Override - public boolean canAppendRow() - { - final boolean countCheck = size() < maxRowCount; - // if maxBytesInMemory = -1, then ignore sizeCheck - final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory().get() < maxBytesInMemory; - final boolean canAdd = countCheck && sizeCheck; - if (!countCheck && !sizeCheck) { - outOfRowsReason = StringUtils.format( - "Maximum number of rows [%d] and maximum size in bytes [%d] reached", - maxRowCount, - maxBytesInMemory - ); - } else { - if (!countCheck) { - outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount); - } else if (!sizeCheck) { - outOfRowsReason = StringUtils.format("Maximum size in bytes [%d] reached", maxBytesInMemory); - } - } - - return canAdd; - } - - @Override - public String getOutOfRowsReason() - { - return outOfRowsReason; - } - protected Aggregator[] getAggsForRow(int rowOffset) { return concurrentGet(rowOffset); } @Override - public float getMetricFloatValue(int rowOffset, int aggOffset) + public float getMetricFloatValue(IncrementalIndexRow incrementalIndexRow, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getFloat(); + return concurrentGet(incrementalIndexRow.getRowIndex())[aggOffset].getFloat(); } @Override - public long getMetricLongValue(int rowOffset, int aggOffset) + public long getMetricLongValue(IncrementalIndexRow incrementalIndexRow, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getLong(); + return concurrentGet(incrementalIndexRow.getRowIndex())[aggOffset].getLong(); } @Override - public Object getMetricObjectValue(int rowOffset, int aggOffset) + public Object getMetricObjectValue(IncrementalIndexRow incrementalIndexRow, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].get(); + return concurrentGet(incrementalIndexRow.getRowIndex())[aggOffset].get(); } @Override - protected double getMetricDoubleValue(int rowOffset, int aggOffset) + protected double getMetricDoubleValue(IncrementalIndexRow incrementalIndexRow, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getDouble(); + return concurrentGet(incrementalIndexRow.getRowIndex())[aggOffset].getDouble(); } @Override - public boolean isNull(int rowOffset, int aggOffset) + public boolean isNull(IncrementalIndexRow incrementalIndexRow, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].isNull(); + return concurrentGet(incrementalIndexRow.getRowIndex())[aggOffset].isNull(); } @Override @@ -386,53 +349,17 @@ public Iterable iterableWithPostAggregations( final boolean descending ) { - final AggregatorFactory[] metrics = getMetricAggs(); - - { - return () -> { - final List dimensions = getDimensions(); - - return Iterators.transform( - getFacts().iterator(descending), - incrementalIndexRow -> { - final int rowOffset = incrementalIndexRow.getRowIndex(); - - Object[] theDims = incrementalIndexRow.getDims(); - - Map theVals = Maps.newLinkedHashMap(); - for (int i = 0; i < theDims.length; ++i) { - Object dim = theDims[i]; - DimensionDesc dimensionDesc = dimensions.get(i); - if (dimensionDesc == null) { - continue; - } - String dimensionName = dimensionDesc.getName(); - DimensionHandler handler = dimensionDesc.getHandler(); - if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) { - theVals.put(dimensionName, null); - continue; - } - final DimensionIndexer indexer = dimensionDesc.getIndexer(); - Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualList(dim); - theVals.put(dimensionName, rowVals); - } - - Aggregator[] aggs = getAggsForRow(rowOffset); - for (int i = 0; i < aggs.length; ++i) { - theVals.put(metrics[i].getName(), aggs[i].get()); - } - - if (postAggs != null) { - for (PostAggregator postAgg : postAggs) { - theVals.put(postAgg.getName(), postAgg.compute(theVals)); - } - } - - return new MapBasedRow(incrementalIndexRow.getTimestamp(), theVals); - } - ); - }; - } + return () -> Iterators.transform( + facts.iterator(descending), + incrementalIndexRow -> { + final Aggregator[] aggs = getAggsForRow(incrementalIndexRow.getRowIndex()); + return getMapBasedRowWithPostAggregations( + incrementalIndexRow, + Stream.of(aggs).map(Aggregator::get), + postAggs + ); + } + ); } /** @@ -446,60 +373,6 @@ public void close() closeAggregators(); aggregators.clear(); facts.clear(); - if (selectors != null) { - selectors.clear(); - } - } - - /** - * Caches references to selector objects for each column instead of creating a new object each time in order to save - * heap space. In general the selectorFactory need not to thread-safe. If required, set concurrentEventAdd to true to - * use concurrent hash map instead of vanilla hash map for thread-safe operations. - */ - static class CachingColumnSelectorFactory implements ColumnSelectorFactory - { - private final Map> columnSelectorMap; - private final ColumnSelectorFactory delegate; - - public CachingColumnSelectorFactory(ColumnSelectorFactory delegate, boolean concurrentEventAdd) - { - this.delegate = delegate; - - if (concurrentEventAdd) { - columnSelectorMap = new ConcurrentHashMap<>(); - } else { - columnSelectorMap = new HashMap<>(); - } - } - - @Override - public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) - { - return delegate.makeDimensionSelector(dimensionSpec); - } - - @Override - public ColumnValueSelector makeColumnValueSelector(String columnName) - { - ColumnValueSelector existing = columnSelectorMap.get(columnName); - if (existing != null) { - return existing; - } - - // We cannot use columnSelectorMap.computeIfAbsent(columnName, delegate::makeColumnValueSelector) - // here since makeColumnValueSelector may modify the columnSelectorMap itself through - // virtual column references, triggering a ConcurrentModificationException in JDK 9 and above. - ColumnValueSelector columnValueSelector = delegate.makeColumnValueSelector(columnName); - existing = columnSelectorMap.putIfAbsent(columnName, columnValueSelector); - return existing != null ? existing : columnValueSelector; - } - - @Nullable - @Override - public ColumnCapabilities getColumnCapabilities(String columnName) - { - return delegate.getColumnCapabilities(columnName); - } } public static class Builder extends AppendableIndexBuilder diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java index 5200ada8ef31..097874e72f32 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java @@ -40,7 +40,9 @@ import org.junit.runners.Parameterized; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.function.Function; @@ -127,14 +129,26 @@ public void testGetRowsIterableNoRollup() throws Exception IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1); - - ArrayList dim1Vals = new ArrayList<>(); + // facts.keySet() return the rows in the order they are stored internally. + // In plain mode, OnheapInrementalIndex sort its rows internally by timestamp then by + // index (the order they were inserted). + // But facts.keySet() does not require this order. Other implementations, might sort their + // rows in their native order (as it would be expected by facts.persistIterable()). + // To mitigate this, we validate the row index without expecting a specific order. + HashMap dim1Vals = new HashMap<>(); + HashMap dim2Vals = new HashMap<>(); for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) { - dim1Vals.add(((int[]) row.getDims()[0])[0]); + dim1Vals.put(row.getRowIndex(), ((int[]) row.getDim(0))[0]); + dim2Vals.put(row.getRowIndex(), ((int[]) row.getDim(1))[0]); } - ArrayList dim2Vals = new ArrayList<>(); - for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) { - dim2Vals.add(((int[]) row.getDims()[1])[0]); + + Assert.assertEquals(6, dim1Vals.size()); + Assert.assertEquals(6, dim2Vals.size()); + + List expected = Arrays.asList(0, 1, 0, 1, 0, 1); + for (int i = 0; i < 6; i++) { + Assert.assertEquals(expected.get(i), dim1Vals.get(i)); + Assert.assertEquals(expected.get(i), dim2Vals.get(i)); } final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( @@ -184,13 +198,6 @@ public void testGetRowsIterableNoRollup() throws Exception Assert.assertEquals(6, rowStrings.size()); for (int i = 0; i < 6; i++) { - if (i % 2 == 0) { - Assert.assertEquals(0, (long) dim1Vals.get(i)); - Assert.assertEquals(0, (long) dim2Vals.get(i)); - } else { - Assert.assertEquals(1, (long) dim1Vals.get(i)); - Assert.assertEquals(1, (long) dim2Vals.get(i)); - } Assert.assertEquals(getExpected.apply(i), rowStrings.get(i)); } } diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java index 121f093b1a67..9d846b63d385 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java @@ -114,7 +114,7 @@ public void run() { while (!Thread.interrupted()) { for (IncrementalIndexRow row : index.getFacts().keySet()) { - if (index.getMetricLongValue(row.getRowIndex(), 0) != 1) { + if (index.getMetricLongValue(row, 0) != 1) { checkFailedCount.addAndGet(1); } } @@ -127,6 +127,7 @@ public void run() addThreads[i].join(); } checkThread.interrupt(); + checkThread.join(); Assert.assertEquals(0, checkFailedCount.get()); } @@ -204,11 +205,11 @@ public void run() long jsSum = 0; for (IncrementalIndexRow row : indexExpr.getFacts().keySet()) { - exprSum += indexExpr.getMetricLongValue(row.getRowIndex(), 0); + exprSum += indexExpr.getMetricLongValue(row, 0); } for (IncrementalIndexRow row : indexJs.getFacts().keySet()) { - jsSum += indexJs.getMetricLongValue(row.getRowIndex(), 0); + jsSum += indexJs.getMetricLongValue(row, 0); } Assert.assertEquals(exprSum, jsSum); diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index 7b7783917d37..a1e927ab7e86 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -71,7 +71,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; /** * Extending AbstractBenchmark means only runs if explicitly called @@ -176,8 +175,6 @@ protected AddToFactsResult addToFacts( Aggregator[] aggs; final AggregatorFactory[] metrics = getMetrics(); - final AtomicInteger numEntries = getNumEntries(); - final AtomicLong sizeInBytes = getBytesInMemory(); if (null != priorIdex) { aggs = indexedMap.get(priorIdex); } else { @@ -197,14 +194,14 @@ protected AddToFactsResult addToFacts( // Last ditch sanity checks - if ((numEntries.get() >= maxRowCount || sizeInBytes.get() >= maxBytesInMemory) + if ((numEntries.get() >= maxRowCount || bytesInMemory.get() >= maxBytesInMemory) && getFacts().getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) { throw new IndexSizeExceededException("Maximum number of rows or max bytes reached"); } final int prev = getFacts().putIfAbsent(key, rowIndex); if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) { numEntries.incrementAndGet(); - sizeInBytes.incrementAndGet(); + bytesInMemory.incrementAndGet(); } else { // We lost a race aggs = indexedMap.get(prev); @@ -224,7 +221,7 @@ && getFacts().getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) { rowContainer.set(null); - return new AddToFactsResult(numEntries.get(), sizeInBytes.get(), new ArrayList<>()); + return new AddToFactsResult(numEntries.get(), bytesInMemory.get(), new ArrayList<>()); } @Override diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java index ccf5e4c5421b..29cddf275589 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java @@ -279,7 +279,7 @@ public long getBytesInMemory() return 0; } - return currHydrant.getIndex().getBytesInMemory().get(); + return currHydrant.getIndex().getBytesInMemory(); } } From 32852ee920e626a67793db03465e164a7cf9ca20 Mon Sep 17 00:00:00 2001 From: Liran Funaro Date: Thu, 6 Jan 2022 11:38:14 +0200 Subject: [PATCH 2/5] Move common code to IncrementalIndex --- .../segment/incremental/IncrementalIndex.java | 21 +++++++++++++------ .../incremental/OnheapIncrementalIndex.java | 21 ------------------- 2 files changed, 15 insertions(+), 27 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index eb995e8e2534..d77cace75fc3 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -84,6 +84,7 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.Deque; @@ -99,6 +100,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.stream.Stream; public abstract class IncrementalIndex extends AbstractIndex implements Iterable, Closeable, ColumnInspector @@ -365,12 +367,19 @@ public String getOutOfRowsReason() return outOfRowsReason; } - protected abstract Map generateSelectors( - AggregatorFactory[] metrics, - Supplier rowSupplier, - boolean deserializeComplexMetrics, - boolean concurrentEventAdd - ); + protected Map generateSelectors( + final AggregatorFactory[] metrics, + final Supplier rowSupplier, + final boolean deserializeComplexMetrics, + final boolean concurrentEventAdd + ) + { + return Arrays.stream(metrics).collect(Collectors.toMap( + AggregatorFactory::getName, + agg -> makeCachedColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics, concurrentEventAdd), + (agg1, agg2) -> agg2 + )); + } // Note: This method needs to be thread safe. protected abstract AddToFactsResult addToFacts( diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java index 87e771f088a4..70a9afef5d8e 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java @@ -29,15 +29,12 @@ import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; -import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -125,24 +122,6 @@ public FactsHolder getFacts() return facts; } - @Override - protected Map generateSelectors( - final AggregatorFactory[] metrics, - final Supplier rowSupplier, - final boolean deserializeComplexMetrics, - final boolean concurrentEventAdd - ) - { - Map selectors = new HashMap<>(); - for (AggregatorFactory agg : metrics) { - selectors.put( - agg.getName(), - makeCachedColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics, concurrentEventAdd) - ); - } - return selectors; - } - @Override public int size() { From 3b66cdf6be4958d903c612c2f41862ebe09027ce Mon Sep 17 00:00:00 2001 From: Liran Funaro Date: Fri, 7 Jan 2022 01:51:44 +0200 Subject: [PATCH 3/5] Add getIndexedDim() --- .../druid/segment/StringDimensionIndexer.java | 33 ++++++++----------- .../incremental/IncrementalIndexRow.java | 23 +++++++++++++ 2 files changed, 37 insertions(+), 19 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java index 9abb032eb10c..f2602cfc8a69 100644 --- a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java @@ -259,14 +259,17 @@ public DimensionSelector makeDimensionSelector( class IndexerDimensionSelector implements DimensionSelector, IdLookup { - private final ArrayBasedIndexedInts indexedInts = new ArrayBasedIndexedInts(); + private final ArrayBasedIndexedInts defaultIndexedInts = new ArrayBasedIndexedInts(); + + @MonotonicNonNull + private IndexedInts cachedIndexedInts = null; @Nullable @MonotonicNonNull private int[] nullIdIntArray; /** - * Tries to fetch the int array using getDim() and convert it to IndexedInts. + * Tries to fetch the dimention as an IndexedInts. * If the dim is null or with zero length, the value is considered null. * It may be null or empty due to currEntry's rowIndex being smaller than the row's rowIndex in which this * dim first appears. @@ -276,26 +279,18 @@ class IndexerDimensionSelector implements DimensionSelector, IdLookup @Nullable private IndexedInts getRowOrNull() { - IncrementalIndexRow key = currEntry.get(); - - if (key.isDimNull(dimIndex)) { - return null; + IndexedInts ret = currEntry.get().getIndexedDim(dimIndex, cachedIndexedInts); + if (ret != null) { + cachedIndexedInts = ret; + return ret.size() > 0 ? ret : null; } - - int[] indices = (int[]) key.getDim(dimIndex); - - if (indices == null || indices.length == 0) { - return null; - } - - indexedInts.setValues(indices, indices.length); - return indexedInts; + return null; } private IndexedInts getDefaultIndexedInts() { if (hasMultipleValues) { - indexedInts.setValues(IntArrays.EMPTY_ARRAY, 0); + defaultIndexedInts.setValues(IntArrays.EMPTY_ARRAY, 0); } else { final int nullId = getEncodedValue(null, false); if (nullId >= 0 && nullId < maxId) { @@ -303,15 +298,15 @@ private IndexedInts getDefaultIndexedInts() if (nullIdIntArray == null) { nullIdIntArray = new int[]{nullId}; } - indexedInts.setValues(nullIdIntArray, 1); + defaultIndexedInts.setValues(nullIdIntArray, 1); } else { // null doesn't exist in the dictionary; return an empty array. // Choose to use ArrayBasedIndexedInts later, instead of special "empty" IndexedInts, for monomorphism - indexedInts.setValues(IntArrays.EMPTY_ARRAY, 0); + defaultIndexedInts.setValues(IntArrays.EMPTY_ARRAY, 0); } } - return indexedInts; + return defaultIndexedInts; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java index b9661a7be81f..5f0abc86ac59 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java @@ -23,6 +23,8 @@ import com.google.common.collect.Lists; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.DimensionIndexer; +import org.apache.druid.segment.data.ArrayBasedIndexedInts; +import org.apache.druid.segment.data.IndexedInts; import javax.annotation.Nullable; import java.lang.reflect.Array; @@ -106,6 +108,27 @@ public Object getDim(int index) return dims[index]; } + @Nullable + public IndexedInts getIndexedDim(final int index, @Nullable IndexedInts cachedIndexedInts) + { + Object dim = getDim(index); + if (!(dim instanceof int[])) { + return null; + } + + int[] indices = (int[]) dim; + ArrayBasedIndexedInts indexedInts; + + if (!(cachedIndexedInts instanceof ArrayBasedIndexedInts)) { + indexedInts = new ArrayBasedIndexedInts(indices); + } else { + indexedInts = (ArrayBasedIndexedInts) cachedIndexedInts; + indexedInts.setValues(indices, indices.length); + } + + return indexedInts; + } + public int getDimsLength() { return dims.length; From 78eb67ab50eca945bda5cd7641987b16c627881c Mon Sep 17 00:00:00 2001 From: Liran Funaro Date: Mon, 10 Jan 2022 10:44:11 +0200 Subject: [PATCH 4/5] Fix spotbug --- .../java/org/apache/druid/segment/StringDimensionIndexer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java index f2602cfc8a69..0bdd9458b566 100644 --- a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java @@ -261,8 +261,9 @@ class IndexerDimensionSelector implements DimensionSelector, IdLookup { private final ArrayBasedIndexedInts defaultIndexedInts = new ArrayBasedIndexedInts(); + @Nullable @MonotonicNonNull - private IndexedInts cachedIndexedInts = null; + private IndexedInts cachedIndexedInts; @Nullable @MonotonicNonNull From 57f4d7eed016e1ab9b9aaea0e4e706e7a7a5594a Mon Sep 17 00:00:00 2001 From: Liran Funaro Date: Mon, 10 Jan 2022 16:44:38 +0200 Subject: [PATCH 5/5] Bug fix --- .../druid/segment/incremental/IncrementalIndexRow.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java index 5f0abc86ac59..cd17a8639c3d 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java @@ -119,13 +119,13 @@ public IndexedInts getIndexedDim(final int index, @Nullable IndexedInts cachedIn int[] indices = (int[]) dim; ArrayBasedIndexedInts indexedInts; - if (!(cachedIndexedInts instanceof ArrayBasedIndexedInts)) { - indexedInts = new ArrayBasedIndexedInts(indices); - } else { + if (cachedIndexedInts instanceof ArrayBasedIndexedInts) { indexedInts = (ArrayBasedIndexedInts) cachedIndexedInts; - indexedInts.setValues(indices, indices.length); + } else { + indexedInts = new ArrayBasedIndexedInts(); } + indexedInts.setValues(indices, indices.length); return indexedInts; }