diff --git a/processing/src/main/java/org/apache/druid/segment/DoubleDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/DoubleDimensionIndexer.java index 53207b51d1f7..173469f2c809 100644 --- a/processing/src/main/java/org/apache/druid/segment/DoubleDimensionIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/DoubleDimensionIndexer.java @@ -125,20 +125,19 @@ class IndexerDoubleColumnSelector implements DoubleColumnSelector @Override public boolean isNull() { - final Object[] dims = currEntry.get().getDims(); - return hasNulls && (dimIndex >= dims.length || dims[dimIndex] == null); + return hasNulls && currEntry.get().isDimNull(dimIndex); } @Override public double getDouble() { - final Object[] dims = currEntry.get().getDims(); + final Object dim = currEntry.get().getDim(dimIndex); - if (dimIndex >= dims.length || dims[dimIndex] == null) { + if (dim == null) { assert NullHandling.replaceWithDefault(); return 0.0; } - return (Double) dims[dimIndex]; + return (Double) dim; } @SuppressWarnings("deprecation") @@ -146,12 +145,12 @@ public double getDouble() @Override public Double getObject() { - final Object[] dims = currEntry.get().getDims(); + final Object dim = currEntry.get().getDim(dimIndex); - if (dimIndex >= dims.length || dims[dimIndex] == null) { + if (dim == null) { return NullHandling.defaultDoubleValue(); } - return (Double) dims[dimIndex]; + return (Double) dim; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/FloatDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/FloatDimensionIndexer.java index 2da428f28734..0b44f65b4144 100644 --- a/processing/src/main/java/org/apache/druid/segment/FloatDimensionIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/FloatDimensionIndexer.java @@ -126,21 +126,20 @@ class IndexerFloatColumnSelector implements FloatColumnSelector @Override public boolean isNull() { - final Object[] dims = currEntry.get().getDims(); - return hasNulls && (dimIndex >= dims.length || dims[dimIndex] == null); + return hasNulls && currEntry.get().isDimNull(dimIndex); } @Override public float getFloat() { - final Object[] dims = currEntry.get().getDims(); + final Object dim = currEntry.get().getDim(dimIndex); - if (dimIndex >= dims.length || dims[dimIndex] == null) { + if (dim == null) { assert NullHandling.replaceWithDefault(); return 0.0f; } - return (Float) dims[dimIndex]; + return (Float) dim; } @SuppressWarnings("deprecation") @@ -148,13 +147,13 @@ public float getFloat() @Override public Float getObject() { - final Object[] dims = currEntry.get().getDims(); + final Object dim = currEntry.get().getDim(dimIndex); - if (dimIndex >= dims.length || dims[dimIndex] == null) { + if (dim == null) { return NullHandling.defaultFloatValue(); } - return (Float) dims[dimIndex]; + return (Float) dim; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/LongDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/LongDimensionIndexer.java index 48960767dab3..c1073d7f1ca0 100644 --- a/processing/src/main/java/org/apache/druid/segment/LongDimensionIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/LongDimensionIndexer.java @@ -126,21 +126,20 @@ class IndexerLongColumnSelector implements LongColumnSelector @Override public boolean isNull() { - final Object[] dims = currEntry.get().getDims(); - return hasNulls && (dimIndex >= dims.length || dims[dimIndex] == null); + return hasNulls && currEntry.get().isDimNull(dimIndex); } @Override public long getLong() { - final Object[] dims = currEntry.get().getDims(); + final Object dim = currEntry.get().getDim(dimIndex); - if (dimIndex >= dims.length || dims[dimIndex] == null) { + if (dim == null) { assert NullHandling.replaceWithDefault(); return 0; } - return (Long) dims[dimIndex]; + return (Long) dim; } @SuppressWarnings("deprecation") @@ -148,13 +147,13 @@ public long getLong() @Override public Long getObject() { - final Object[] dims = currEntry.get().getDims(); + final Object dim = currEntry.get().getDim(dimIndex); - if (dimIndex >= dims.length || dims[dimIndex] == null) { + if (dim == null) { return NullHandling.defaultLongValue(); } - return (Long) dims[dimIndex]; + return (Long) dim; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java index c88f22f477be..1348461290aa 100644 --- a/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java @@ -281,57 +281,62 @@ public DimensionSelector makeDimensionSelector( class IndexerDimensionSelector implements DimensionSelector, IdLookup { - private final ArrayBasedIndexedInts indexedInts = new ArrayBasedIndexedInts(); + private final ArrayBasedIndexedInts defaultIndexedInts = new ArrayBasedIndexedInts(); + + @Nullable + @MonotonicNonNull + private IndexedInts cachedIndexedInts; @Nullable @MonotonicNonNull private int[] nullIdIntArray; - @Override - public IndexedInts getRow() + /** + * Tries to fetch the dimention as an IndexedInts. + * If the dim is null or with zero length, the value is considered null. + * It may be null or empty due to currEntry's rowIndex being smaller than the row's rowIndex in which this + * dim first appears. + * + * @return IndexedInts instance, or null if the dim is null. + */ + @Nullable + private IndexedInts getRowOrNull() { - final Object[] dims = currEntry.get().getDims(); - - int[] indices; - if (dimIndex < dims.length) { - indices = (int[]) dims[dimIndex]; - } else { - indices = null; + IndexedInts ret = currEntry.get().getIndexedDim(dimIndex, cachedIndexedInts); + if (ret != null) { + cachedIndexedInts = ret; + return ret.size() > 0 ? ret : null; } + return null; + } - int[] row = null; - int rowSize = 0; - - // usually due to currEntry's rowIndex is smaller than the row's rowIndex in which this dim first appears - if (indices == null || indices.length == 0) { - if (hasMultipleValues) { - row = IntArrays.EMPTY_ARRAY; - rowSize = 0; - } else { - final int nullId = getEncodedValue(null, false); - if (nullId >= 0 && nullId < maxId) { - // null was added to the dictionary before this selector was created; return its ID. - if (nullIdIntArray == null) { - nullIdIntArray = new int[]{nullId}; - } - row = nullIdIntArray; - rowSize = 1; - } else { - // null doesn't exist in the dictionary; return an empty array. - // Choose to use ArrayBasedIndexedInts later, instead of special "empty" IndexedInts, for monomorphism - row = IntArrays.EMPTY_ARRAY; - rowSize = 0; + private IndexedInts getDefaultIndexedInts() + { + if (hasMultipleValues) { + defaultIndexedInts.setValues(IntArrays.EMPTY_ARRAY, 0); + } else { + final int nullId = getEncodedValue(null, false); + if (nullId >= 0 && nullId < maxId) { + // null was added to the dictionary before this selector was created; return its ID. + if (nullIdIntArray == null) { + nullIdIntArray = new int[]{nullId}; } + defaultIndexedInts.setValues(nullIdIntArray, 1); + } else { + // null doesn't exist in the dictionary; return an empty array. + // Choose to use ArrayBasedIndexedInts later, instead of special "empty" IndexedInts, for monomorphism + defaultIndexedInts.setValues(IntArrays.EMPTY_ARRAY, 0); } } - if (row == null && indices != null && indices.length > 0) { - row = indices; - rowSize = indices.length; - } + return defaultIndexedInts; + } - indexedInts.setValues(row, rowSize); - return indexedInts; + @Override + public IndexedInts getRow() + { + IndexedInts ret = getRowOrNull(); + return ret != null ? ret : getDefaultIndexedInts(); } @Override @@ -345,18 +350,14 @@ public ValueMatcher makeValueMatcher(final String value) @Override public boolean matches() { - Object[] dims = currEntry.get().getDims(); - if (dimIndex >= dims.length) { + IndexedInts dimsInt = getRowOrNull(); + if (dimsInt == null) { return value == null; } - int[] dimsInt = (int[]) dims[dimIndex]; - if (dimsInt == null || dimsInt.length == 0) { - return value == null; - } - - for (int id : dimsInt) { - if (id == valueId) { + int size = dimsInt.size(); + for (int i = 0; i < size; i++) { + if (dimsInt.get(i) == valueId) { return true; } } @@ -391,17 +392,14 @@ public ValueMatcher makeValueMatcher(final Predicate predicate) @Override public boolean matches() { - Object[] dims = currEntry.get().getDims(); - if (dimIndex >= dims.length) { - return matchNull; - } - - int[] dimsInt = (int[]) dims[dimIndex]; - if (dimsInt == null || dimsInt.length == 0) { + IndexedInts dimsInt = getRowOrNull(); + if (dimsInt == null) { return matchNull; } - for (int id : dimsInt) { + int size = dimsInt.size(); + for (int i = 0; i < size; i++) { + int id = dimsInt.get(i); if (checkedIds.get(id)) { if (matchingIds.get(id)) { return true; @@ -486,12 +484,12 @@ public Object getObject() return null; } - Object[] dims = key.getDims(); - if (dimIndex >= dims.length) { + Object dim = key.getDim(dimIndex); + if (dim == null) { return null; } - return convertUnsortedEncodedKeyComponentToActualList((int[]) dims[dimIndex]); + return convertUnsortedEncodedKeyComponentToActualList((int[]) dim); } @SuppressWarnings("deprecation") diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index 484bc89fc955..7208070b5416 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -34,6 +34,7 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -41,6 +42,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.UnparseableColumnsParseException; @@ -84,6 +86,7 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.Deque; @@ -99,7 +102,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import java.util.stream.Stream; public abstract class IncrementalIndex extends AbstractIndex implements Iterable, Closeable, ColumnInspector @@ -222,6 +225,36 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final long minTimestamp; private final Granularity gran; private final boolean rollup; + protected final int maxRowCount; + protected final long maxBytesInMemory; + + /** + * Flag denoting if max possible values should be used to estimate on-heap mem + * usage. + *

+ * There is one instance of Aggregator per metric per row key. + *

+ * Old Method: {@code useMaxMemoryEstimates = true} (default) + *

+ * + * New Method: {@code useMaxMemoryEstimates = false} + * + *

+ * Thus the new method eliminates over-estimations. + */ + protected final boolean useMaxMemoryEstimates; + private final List> rowTransformers; private final VirtualColumns virtualColumns; private final AggregatorFactory[] metrics; @@ -231,18 +264,22 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final Map metricDescs; private final Map dimensionDescs; - private final List dimensionDescsList; + protected final List dimensionDescsList; // dimension capabilities are provided by the indexers private final Map timeAndMetricsColumnCapabilities; - private final AtomicInteger numEntries = new AtomicInteger(); - private final AtomicLong bytesInMemory = new AtomicLong(); - private final boolean useMaxMemoryEstimates; + protected final Map selectors; + + protected final AtomicInteger indexIncrement = new AtomicInteger(0); // This is modified on add() in a critical section. private final ThreadLocal in = new ThreadLocal<>(); private final Supplier rowSupplier = in::get; - private volatile DateTime maxIngestedEventTime; + @Nullable + private volatile DateTime maxIngestedEventTime = null; + + @Nullable + private String outOfRowsReason = null; /** @@ -256,12 +293,16 @@ public ColumnCapabilities getColumnCapabilities(String columnName) * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input * value for aggregators that return metrics other than float. * @param concurrentEventAdd flag whether ot not adding of input rows should be thread-safe + * @param maxRowCount maximal number of rows before persist + * @param maxBytesInMemory maximal number of bytes before persist * @param useMaxMemoryEstimates true if max values should be used to estimate memory */ protected IncrementalIndex( final IncrementalIndexSchema incrementalIndexSchema, final boolean deserializeComplexMetrics, final boolean concurrentEventAdd, + final int maxRowCount, + final long maxBytesInMemory, final boolean useMaxMemoryEstimates ) { @@ -274,6 +315,9 @@ protected IncrementalIndex( this.deserializeComplexMetrics = deserializeComplexMetrics; this.useMaxMemoryEstimates = useMaxMemoryEstimates; + this.maxRowCount = maxRowCount; + this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory; + this.timeAndMetricsColumnCapabilities = new HashMap<>(); this.metricDescs = Maps.newLinkedHashMap(); this.dimensionDescs = Maps.newLinkedHashMap(); @@ -285,7 +329,7 @@ protected IncrementalIndex( this.rollup ); - initAggs(metrics, rowSupplier, deserializeComplexMetrics, concurrentEventAdd); + this.selectors = generateSelectors(metrics, rowSupplier, deserializeComplexMetrics, concurrentEventAdd); for (AggregatorFactory metric : metrics) { MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric); @@ -333,16 +377,44 @@ protected IncrementalIndex( public abstract FactsHolder getFacts(); - public abstract boolean canAppendRow(); + public boolean canAppendRow() + { + final boolean countCheck = size() < maxRowCount; + // if maxBytesInMemory = -1, then ignore sizeCheck + final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory() < maxBytesInMemory; + if (!countCheck && !sizeCheck) { + outOfRowsReason = StringUtils.format( + "Maximum number of rows [%d] and maximum size in bytes [%d] reached", + maxRowCount, + maxBytesInMemory + ); + } else if (!countCheck) { + outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount); + } else if (!sizeCheck) { + outOfRowsReason = StringUtils.format("Maximum size in bytes [%d] reached", maxBytesInMemory); + } + return countCheck && sizeCheck; + } - public abstract String getOutOfRowsReason(); + @Nullable + public String getOutOfRowsReason() + { + return outOfRowsReason; + } - protected abstract void initAggs( - AggregatorFactory[] metrics, - Supplier rowSupplier, - boolean deserializeComplexMetrics, - boolean concurrentEventAdd - ); + protected Map generateSelectors( + final AggregatorFactory[] metrics, + final Supplier rowSupplier, + final boolean deserializeComplexMetrics, + final boolean concurrentEventAdd + ) + { + return Arrays.stream(metrics).collect(Collectors.toMap( + AggregatorFactory::getName, + agg -> makeCachedColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics, concurrentEventAdd), + (agg1, agg2) -> agg2 + )); + } // Note: This method needs to be thread safe. protected abstract AddToFactsResult addToFacts( @@ -355,15 +427,15 @@ protected abstract AddToFactsResult addToFacts( public abstract int getLastRowIndex(); - protected abstract float getMetricFloatValue(int rowOffset, int aggOffset); + protected abstract float getMetricFloatValue(IncrementalIndexRow incrementalIndexRow, int aggOffset); - protected abstract long getMetricLongValue(int rowOffset, int aggOffset); + protected abstract long getMetricLongValue(IncrementalIndexRow incrementalIndexRow, int aggOffset); - protected abstract Object getMetricObjectValue(int rowOffset, int aggOffset); + protected abstract Object getMetricObjectValue(IncrementalIndexRow incrementalIndexRow, int aggOffset); - protected abstract double getMetricDoubleValue(int rowOffset, int aggOffset); + protected abstract double getMetricDoubleValue(IncrementalIndexRow incrementalIndexRow, int aggOffset); - protected abstract boolean isNull(int rowOffset, int aggOffset); + protected abstract boolean isNull(IncrementalIndexRow incrementalIndexRow, int aggOffset); static class IncrementalIndexRowResult { @@ -387,7 +459,7 @@ List getParseExceptionMessages() } } - static class AddToFactsResult + public static class AddToFactsResult { private final int rowCount; private final long bytesInMemory; @@ -428,6 +500,7 @@ public boolean isRollup() @Override public void close() { + selectors.clear(); } public InputRow formatRow(InputRow row) @@ -699,34 +772,23 @@ private synchronized void updateMaxIngestedTime(DateTime eventTime) public boolean isEmpty() { - return numEntries.get() == 0; + return size() == 0; } - public int size() - { - return numEntries.get(); - } + public abstract int size(); + + public abstract long getBytesInMemory(); boolean getDeserializeComplexMetrics() { return deserializeComplexMetrics; } - AtomicInteger getNumEntries() - { - return numEntries; - } - AggregatorFactory[] getMetrics() { return metrics; } - public AtomicLong getBytesInMemory() - { - return bytesInMemory; - } - private long getMinTimeMillis() { return getFacts().getMinTimeMillis(); @@ -928,6 +990,52 @@ public abstract Iterable iterableWithPostAggregations( boolean descending ); + /** + * Apply post aggregation on a row. + * @param row the row to apply on + * @param values a stream of the row's values + * @param postAggs the post aggregators + * @return a new row + */ + public MapBasedRow getMapBasedRowWithPostAggregations( + IncrementalIndexRow row, + Stream values, + @Nullable final List postAggs + ) + { + Map theVals = Maps.newLinkedHashMap(); + + for (int i = 0; i < row.getDimsLength(); ++i) { + Object dim = row.getDim(i); + DimensionDesc dimensionDesc = dimensionDescsList.get(i); + if (dimensionDesc == null) { + continue; + } + String dimensionName = dimensionDesc.getName(); + DimensionHandler handler = dimensionDesc.getHandler(); + if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) { + theVals.put(dimensionName, null); + continue; + } + final DimensionIndexer indexer = dimensionDesc.getIndexer(); + Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualList(dim); + theVals.put(dimensionName, rowVals); + } + + int i = 0; + for (Iterator it = values.iterator(); it.hasNext() && i < metrics.length; i++) { + theVals.put(metrics[i].getName(), it.next()); + } + + if (postAggs != null) { + for (PostAggregator postAgg : postAggs) { + theVals.put(postAgg.getName(), postAgg.compute(theVals)); + } + } + + return new MapBasedRow(row.getTimestamp(), theVals); + } + public DateTime getMaxIngestedEventTime() { return maxIngestedEventTime; @@ -1037,6 +1145,19 @@ protected ColumnSelectorFactory makeColumnSelectorFactory( return makeColumnSelectorFactory(virtualColumns, agg, in, deserializeComplexMetrics); } + protected ColumnSelectorFactory makeCachedColumnSelectorFactory( + final AggregatorFactory agg, + final Supplier in, + final boolean deserializeComplexMetrics, + final boolean concurrentEventAdd + ) + { + return new CachingColumnSelectorFactory( + makeColumnSelectorFactory(agg, in, deserializeComplexMetrics), + concurrentEventAdd + ); + } + protected final Comparator dimsComparator() { return new IncrementalIndexRowComparator(dimensionDescsList); @@ -1103,7 +1224,7 @@ private static boolean allNull(Object[] dims, int startPosition) return true; } - interface FactsHolder + public interface FactsHolder { /** * @return the previous rowIndex associated with the specified key, or @@ -1377,7 +1498,7 @@ public LongMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metricI public long getLong() { assert NullHandling.replaceWithDefault() || !isNull(); - return getMetricLongValue(currEntry.get().getRowIndex(), metricIndex); + return getMetricLongValue(currEntry.get(), metricIndex); } @Override @@ -1389,7 +1510,7 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) @Override public boolean isNull() { - return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), metricIndex); + return IncrementalIndex.this.isNull(currEntry.get(), metricIndex); } } @@ -1414,7 +1535,7 @@ public ObjectMetricColumnSelector( @Override public Object getObject() { - return getMetricObjectValue(currEntry.get().getRowIndex(), metricIndex); + return getMetricObjectValue(currEntry.get(), metricIndex); } @Override @@ -1445,7 +1566,7 @@ public FloatMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metric public float getFloat() { assert NullHandling.replaceWithDefault() || !isNull(); - return getMetricFloatValue(currEntry.get().getRowIndex(), metricIndex); + return getMetricFloatValue(currEntry.get(), metricIndex); } @Override @@ -1457,7 +1578,7 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) @Override public boolean isNull() { - return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), metricIndex); + return IncrementalIndex.this.isNull(currEntry.get(), metricIndex); } } @@ -1476,13 +1597,13 @@ public DoubleMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metri public double getDouble() { assert NullHandling.replaceWithDefault() || !isNull(); - return getMetricDoubleValue(currEntry.get().getRowIndex(), metricIndex); + return getMetricDoubleValue(currEntry.get(), metricIndex); } @Override public boolean isNull() { - return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), metricIndex); + return IncrementalIndex.this.isNull(currEntry.get(), metricIndex); } @Override @@ -1491,4 +1612,55 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector) inspector.visit("index", IncrementalIndex.this); } } + + /** + * Caches references to selector objects for each column instead of creating a new object each time in order to save + * heap space. In general the selectorFactory need not to thread-safe. If required, set concurrentEventAdd to true to + * use concurrent hash map instead of vanilla hash map for thread-safe operations. + */ + static class CachingColumnSelectorFactory implements ColumnSelectorFactory + { + private final Map> columnSelectorMap; + private final ColumnSelectorFactory delegate; + + public CachingColumnSelectorFactory(ColumnSelectorFactory delegate, boolean concurrentEventAdd) + { + this.delegate = delegate; + + if (concurrentEventAdd) { + columnSelectorMap = new ConcurrentHashMap<>(); + } else { + columnSelectorMap = new HashMap<>(); + } + } + + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return delegate.makeDimensionSelector(dimensionSpec); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + ColumnValueSelector existing = columnSelectorMap.get(columnName); + if (existing != null) { + return existing; + } + + // We cannot use columnSelectorMap.computeIfAbsent(columnName, delegate::makeColumnValueSelector) + // here since makeColumnValueSelector may modify the columnSelectorMap itself through + // virtual column references, triggering a ConcurrentModificationException in JDK 9 and above. + ColumnValueSelector columnValueSelector = delegate.makeColumnValueSelector(columnName); + existing = columnSelectorMap.putIfAbsent(columnName, columnValueSelector); + return existing != null ? existing : columnValueSelector; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String columnName) + { + return delegate.getColumnCapabilities(columnName); + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java index 1f2be9ed1ec7..16e9efb523bc 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexAdapter.java @@ -95,14 +95,13 @@ private void processRows( { int rowNum = 0; for (IncrementalIndexRow row : index.getFacts().persistIterable()) { - final Object[] dims = row.getDims(); - for (IncrementalIndex.DimensionDesc dimension : dimensions) { final int dimIndex = dimension.getIndex(); DimensionAccessor accessor = accessors.get(dimension.getName()); // Add 'null' to the dimension's dictionary. - if (dimIndex >= dims.length || dims[dimIndex] == null) { + // No need to check "dimIndex >= row.getDimsLength()" because isDimNull() verifies that. + if (row.isDimNull(dimIndex)) { accessor.indexer.processRowValsToUnsortedEncodedKeyComponent(null, true); continue; } @@ -111,7 +110,7 @@ private void processRows( if (capabilities.hasBitmapIndexes()) { final MutableBitmap[] bitmapIndexes = accessor.invertedIndexes; final DimensionIndexer indexer = accessor.indexer; - indexer.fillBitmapsFromUnsortedEncodedKeyComponent(dims[dimIndex], rowNum, bitmapIndexes, bitmapFactory); + indexer.fillBitmapsFromUnsortedEncodedKeyComponent(row.getDim(dimIndex), rowNum, bitmapIndexes, bitmapFactory); } } ++rowNum; diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java index 987ee5f8bf7c..cd17a8639c3d 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexRow.java @@ -23,6 +23,8 @@ import com.google.common.collect.Lists; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.DimensionIndexer; +import org.apache.druid.segment.data.ArrayBasedIndexedInts; +import org.apache.druid.segment.data.IndexedInts; import javax.annotation.Nullable; import java.lang.reflect.Array; @@ -30,7 +32,7 @@ import java.util.Collections; import java.util.List; -public final class IncrementalIndexRow +public class IncrementalIndexRow { public static final int EMPTY_ROW_INDEX = -1; @@ -56,7 +58,7 @@ public final class IncrementalIndexRow this(timestamp, dims, dimensionDescsList, EMPTY_ROW_INDEX); } - IncrementalIndexRow( + public IncrementalIndexRow( long timestamp, Object[] dims, List dimensionDescsList, @@ -97,9 +99,44 @@ public long getTimestamp() return timestamp; } - public Object[] getDims() + @Nullable + public Object getDim(int index) + { + if (index >= dims.length) { + return null; + } + return dims[index]; + } + + @Nullable + public IndexedInts getIndexedDim(final int index, @Nullable IndexedInts cachedIndexedInts) + { + Object dim = getDim(index); + if (!(dim instanceof int[])) { + return null; + } + + int[] indices = (int[]) dim; + ArrayBasedIndexedInts indexedInts; + + if (cachedIndexedInts instanceof ArrayBasedIndexedInts) { + indexedInts = (ArrayBasedIndexedInts) cachedIndexedInts; + } else { + indexedInts = new ArrayBasedIndexedInts(); + } + + indexedInts.setValues(indices, indices.length); + return indexedInts; + } + + public int getDimsLength() + { + return dims.length; + } + + public boolean isDimNull(int index) { - return dims; + return (index >= dims.length) || (dims[index] == null); } public int getRowIndex() @@ -107,7 +144,7 @@ public int getRowIndex() return rowIndex; } - void setRowIndex(int rowIndex) + public void setRowIndex(int rowIndex) { this.rowIndex = rowIndex; } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java index 493180db458e..c93b68f02397 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java @@ -21,11 +21,8 @@ import com.google.common.base.Supplier; import com.google.common.collect.Iterators; -import com.google.common.collect.Maps; import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.data.input.Row; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; @@ -33,25 +30,17 @@ import org.apache.druid.query.aggregation.AggregatorAndSize; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; -import org.apache.druid.query.dimension.DimensionSpec; -import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.DimensionHandler; -import org.apache.druid.segment.DimensionIndexer; -import org.apache.druid.segment.DimensionSelector; -import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; /** * @@ -74,42 +63,10 @@ public class OnheapIncrementalIndex extends IncrementalIndex private static final int ROUGH_OVERHEAD_PER_MAP_ENTRY = Long.BYTES * 5 + Integer.BYTES; private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); private final FactsHolder facts; - private final AtomicInteger indexIncrement = new AtomicInteger(0); private final long maxBytesPerRowForAggregators; - protected final int maxRowCount; - protected final long maxBytesInMemory; - /** - * Flag denoting if max possible values should be used to estimate on-heap mem - * usage. - *

- * There is one instance of Aggregator per metric per row key. - *

- * Old Method: {@code useMaxMemoryEstimates = true} (default) - *

    - *
  • Aggregator: For a given metric, compute the max memory an aggregator - * can use and multiply that by number of aggregators (same as number of - * aggregated rows or number of unique row keys)
  • - *
  • DimensionIndexer: For each row, encode dimension values and estimate - * size of original dimension values
  • - *
- * - * New Method: {@code useMaxMemoryEstimates = false} - *
    - *
  • Aggregator: Get the initialize of an Aggregator instance, and add the - * incremental size required in each aggregation step.
  • - *
  • DimensionIndexer: For each row, encode dimension values and estimate - * size of dimension values only if they are newly added to the dictionary
  • - *
- *

- * Thus the new method eliminates over-estimations. - */ - private final boolean useMaxMemoryEstimates; - - @Nullable - private volatile Map selectors; - @Nullable - private String outOfRowsReason = null; + protected final AtomicInteger numEntries = new AtomicInteger(); + protected final AtomicLong bytesInMemory = new AtomicLong(); OnheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, @@ -121,14 +78,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex boolean useMaxMemoryEstimates ) { - super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, useMaxMemoryEstimates); - this.maxRowCount = maxRowCount; - this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory; + super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, maxRowCount, maxBytesInMemory, useMaxMemoryEstimates); this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) : new PlainFactsHolder(sortFacts, dimsComparator()); maxBytesPerRowForAggregators = useMaxMemoryEstimates ? getMaxBytesPerRowForAggregators(incrementalIndexSchema) : 0; - this.useMaxMemoryEstimates = useMaxMemoryEstimates; } /** @@ -174,23 +128,15 @@ public FactsHolder getFacts() } @Override - protected void initAggs( - final AggregatorFactory[] metrics, - final Supplier rowSupplier, - final boolean deserializeComplexMetrics, - final boolean concurrentEventAdd - ) + public int size() { - selectors = new HashMap<>(); - for (AggregatorFactory agg : metrics) { - selectors.put( - agg.getName(), - new CachingColumnSelectorFactory( - makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics), - concurrentEventAdd - ) - ); - } + return numEntries.get(); + } + + @Override + public long getBytesInMemory() + { + return bytesInMemory.get(); } @Override @@ -207,12 +153,10 @@ protected AddToFactsResult addToFacts( Aggregator[] aggs; final AggregatorFactory[] metrics = getMetrics(); - final AtomicInteger numEntries = getNumEntries(); - final AtomicLong totalSizeInBytes = getBytesInMemory(); if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) { aggs = concurrentGet(priorIndex); long aggSizeDelta = doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages); - totalSizeInBytes.addAndGet(useMaxMemoryEstimates ? 0 : aggSizeDelta); + bytesInMemory.addAndGet(useMaxMemoryEstimates ? 0 : aggSizeDelta); } else { aggs = new Aggregator[metrics.length]; long aggSizeForRow = factorizeAggs(metrics, aggs, rowContainer, row); @@ -222,7 +166,7 @@ protected AddToFactsResult addToFacts( concurrentSet(rowIndex, aggs); // Last ditch sanity checks - if ((numEntries.get() >= maxRowCount || totalSizeInBytes.get() >= maxBytesInMemory) + if ((numEntries.get() >= maxRowCount || bytesInMemory.get() >= maxBytesInMemory) && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX && !skipMaxRowsInMemoryCheck) { throw new IndexSizeExceededException( @@ -252,10 +196,9 @@ protected AddToFactsResult addToFacts( final long rowSize = key.estimateBytesInMemory() + estimatedSizeOfAggregators + ROUGH_OVERHEAD_PER_MAP_ENTRY; - totalSizeInBytes.addAndGet(rowSize); + bytesInMemory.addAndGet(rowSize); } - - return new AddToFactsResult(numEntries.get(), totalSizeInBytes.get(), parseExceptionMessages); + return new AddToFactsResult(numEntries.get(), bytesInMemory.get(), parseExceptionMessages); } @Override @@ -372,69 +315,39 @@ protected void concurrentRemove(int offset) aggregators.remove(offset); } - @Override - public boolean canAppendRow() - { - final boolean countCheck = size() < maxRowCount; - // if maxBytesInMemory = -1, then ignore sizeCheck - final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory().get() < maxBytesInMemory; - final boolean canAdd = countCheck && sizeCheck; - if (!countCheck && !sizeCheck) { - outOfRowsReason = StringUtils.format( - "Maximum number of rows [%d] and maximum size in bytes [%d] reached", - maxRowCount, - maxBytesInMemory - ); - } else { - if (!countCheck) { - outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount); - } else if (!sizeCheck) { - outOfRowsReason = StringUtils.format("Maximum size in bytes [%d] reached", maxBytesInMemory); - } - } - - return canAdd; - } - - @Override - public String getOutOfRowsReason() - { - return outOfRowsReason; - } - protected Aggregator[] getAggsForRow(int rowOffset) { return concurrentGet(rowOffset); } @Override - public float getMetricFloatValue(int rowOffset, int aggOffset) + public float getMetricFloatValue(IncrementalIndexRow incrementalIndexRow, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getFloat(); + return concurrentGet(incrementalIndexRow.getRowIndex())[aggOffset].getFloat(); } @Override - public long getMetricLongValue(int rowOffset, int aggOffset) + public long getMetricLongValue(IncrementalIndexRow incrementalIndexRow, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getLong(); + return concurrentGet(incrementalIndexRow.getRowIndex())[aggOffset].getLong(); } @Override - public Object getMetricObjectValue(int rowOffset, int aggOffset) + public Object getMetricObjectValue(IncrementalIndexRow incrementalIndexRow, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].get(); + return concurrentGet(incrementalIndexRow.getRowIndex())[aggOffset].get(); } @Override - protected double getMetricDoubleValue(int rowOffset, int aggOffset) + protected double getMetricDoubleValue(IncrementalIndexRow incrementalIndexRow, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].getDouble(); + return concurrentGet(incrementalIndexRow.getRowIndex())[aggOffset].getDouble(); } @Override - public boolean isNull(int rowOffset, int aggOffset) + public boolean isNull(IncrementalIndexRow incrementalIndexRow, int aggOffset) { - return concurrentGet(rowOffset)[aggOffset].isNull(); + return concurrentGet(incrementalIndexRow.getRowIndex())[aggOffset].isNull(); } @Override @@ -443,53 +356,17 @@ public Iterable iterableWithPostAggregations( final boolean descending ) { - final AggregatorFactory[] metrics = getMetricAggs(); - - { - return () -> { - final List dimensions = getDimensions(); - - return Iterators.transform( - getFacts().iterator(descending), - incrementalIndexRow -> { - final int rowOffset = incrementalIndexRow.getRowIndex(); - - Object[] theDims = incrementalIndexRow.getDims(); - - Map theVals = Maps.newLinkedHashMap(); - for (int i = 0; i < theDims.length; ++i) { - Object dim = theDims[i]; - DimensionDesc dimensionDesc = dimensions.get(i); - if (dimensionDesc == null) { - continue; - } - String dimensionName = dimensionDesc.getName(); - DimensionHandler handler = dimensionDesc.getHandler(); - if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) { - theVals.put(dimensionName, null); - continue; - } - final DimensionIndexer indexer = dimensionDesc.getIndexer(); - Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualList(dim); - theVals.put(dimensionName, rowVals); - } - - Aggregator[] aggs = getAggsForRow(rowOffset); - for (int i = 0; i < aggs.length; ++i) { - theVals.put(metrics[i].getName(), aggs[i].get()); - } - - if (postAggs != null) { - for (PostAggregator postAgg : postAggs) { - theVals.put(postAgg.getName(), postAgg.compute(theVals)); - } - } - - return new MapBasedRow(incrementalIndexRow.getTimestamp(), theVals); - } - ); - }; - } + return () -> Iterators.transform( + facts.iterator(descending), + incrementalIndexRow -> { + final Aggregator[] aggs = getAggsForRow(incrementalIndexRow.getRowIndex()); + return getMapBasedRowWithPostAggregations( + incrementalIndexRow, + Stream.of(aggs).map(Aggregator::get), + postAggs + ); + } + ); } /** @@ -503,60 +380,6 @@ public void close() closeAggregators(); aggregators.clear(); facts.clear(); - if (selectors != null) { - selectors.clear(); - } - } - - /** - * Caches references to selector objects for each column instead of creating a new object each time in order to save - * heap space. In general the selectorFactory need not to thread-safe. If required, set concurrentEventAdd to true to - * use concurrent hash map instead of vanilla hash map for thread-safe operations. - */ - static class CachingColumnSelectorFactory implements ColumnSelectorFactory - { - private final Map> columnSelectorMap; - private final ColumnSelectorFactory delegate; - - public CachingColumnSelectorFactory(ColumnSelectorFactory delegate, boolean concurrentEventAdd) - { - this.delegate = delegate; - - if (concurrentEventAdd) { - columnSelectorMap = new ConcurrentHashMap<>(); - } else { - columnSelectorMap = new HashMap<>(); - } - } - - @Override - public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) - { - return delegate.makeDimensionSelector(dimensionSpec); - } - - @Override - public ColumnValueSelector makeColumnValueSelector(String columnName) - { - ColumnValueSelector existing = columnSelectorMap.get(columnName); - if (existing != null) { - return existing; - } - - // We cannot use columnSelectorMap.computeIfAbsent(columnName, delegate::makeColumnValueSelector) - // here since makeColumnValueSelector may modify the columnSelectorMap itself through - // virtual column references, triggering a ConcurrentModificationException in JDK 9 and above. - ColumnValueSelector columnValueSelector = delegate.makeColumnValueSelector(columnName); - existing = columnSelectorMap.putIfAbsent(columnName, columnValueSelector); - return existing != null ? existing : columnValueSelector; - } - - @Nullable - @Override - public ColumnCapabilities getColumnCapabilities(String columnName) - { - return delegate.getColumnCapabilities(columnName); - } } public static class Builder extends AppendableIndexBuilder diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java index 5200ada8ef31..097874e72f32 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java @@ -40,7 +40,9 @@ import org.junit.runners.Parameterized; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.function.Function; @@ -127,14 +129,26 @@ public void testGetRowsIterableNoRollup() throws Exception IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1); - - ArrayList dim1Vals = new ArrayList<>(); + // facts.keySet() return the rows in the order they are stored internally. + // In plain mode, OnheapInrementalIndex sort its rows internally by timestamp then by + // index (the order they were inserted). + // But facts.keySet() does not require this order. Other implementations, might sort their + // rows in their native order (as it would be expected by facts.persistIterable()). + // To mitigate this, we validate the row index without expecting a specific order. + HashMap dim1Vals = new HashMap<>(); + HashMap dim2Vals = new HashMap<>(); for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) { - dim1Vals.add(((int[]) row.getDims()[0])[0]); + dim1Vals.put(row.getRowIndex(), ((int[]) row.getDim(0))[0]); + dim2Vals.put(row.getRowIndex(), ((int[]) row.getDim(1))[0]); } - ArrayList dim2Vals = new ArrayList<>(); - for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) { - dim2Vals.add(((int[]) row.getDims()[1])[0]); + + Assert.assertEquals(6, dim1Vals.size()); + Assert.assertEquals(6, dim2Vals.size()); + + List expected = Arrays.asList(0, 1, 0, 1, 0, 1); + for (int i = 0; i < 6; i++) { + Assert.assertEquals(expected.get(i), dim1Vals.get(i)); + Assert.assertEquals(expected.get(i), dim2Vals.get(i)); } final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( @@ -184,13 +198,6 @@ public void testGetRowsIterableNoRollup() throws Exception Assert.assertEquals(6, rowStrings.size()); for (int i = 0; i < 6; i++) { - if (i % 2 == 0) { - Assert.assertEquals(0, (long) dim1Vals.get(i)); - Assert.assertEquals(0, (long) dim2Vals.get(i)); - } else { - Assert.assertEquals(1, (long) dim1Vals.get(i)); - Assert.assertEquals(1, (long) dim2Vals.get(i)); - } Assert.assertEquals(getExpected.apply(i), rowStrings.get(i)); } } diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java index 121f093b1a67..9d846b63d385 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java @@ -114,7 +114,7 @@ public void run() { while (!Thread.interrupted()) { for (IncrementalIndexRow row : index.getFacts().keySet()) { - if (index.getMetricLongValue(row.getRowIndex(), 0) != 1) { + if (index.getMetricLongValue(row, 0) != 1) { checkFailedCount.addAndGet(1); } } @@ -127,6 +127,7 @@ public void run() addThreads[i].join(); } checkThread.interrupt(); + checkThread.join(); Assert.assertEquals(0, checkFailedCount.get()); } @@ -204,11 +205,11 @@ public void run() long jsSum = 0; for (IncrementalIndexRow row : indexExpr.getFacts().keySet()) { - exprSum += indexExpr.getMetricLongValue(row.getRowIndex(), 0); + exprSum += indexExpr.getMetricLongValue(row, 0); } for (IncrementalIndexRow row : indexJs.getFacts().keySet()) { - jsSum += indexJs.getMetricLongValue(row.getRowIndex(), 0); + jsSum += indexJs.getMetricLongValue(row, 0); } Assert.assertEquals(exprSum, jsSum); diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index b33d16b07ddd..65137591873b 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -71,7 +71,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; /** * Extending AbstractBenchmark means only runs if explicitly called @@ -178,8 +177,6 @@ protected AddToFactsResult addToFacts( Aggregator[] aggs; final AggregatorFactory[] metrics = getMetrics(); - final AtomicInteger numEntries = getNumEntries(); - final AtomicLong sizeInBytes = getBytesInMemory(); if (null != priorIdex) { aggs = indexedMap.get(priorIdex); } else { @@ -199,14 +196,14 @@ protected AddToFactsResult addToFacts( // Last ditch sanity checks - if ((numEntries.get() >= maxRowCount || sizeInBytes.get() >= maxBytesInMemory) + if ((numEntries.get() >= maxRowCount || bytesInMemory.get() >= maxBytesInMemory) && getFacts().getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) { throw new IndexSizeExceededException("Maximum number of rows or max bytes reached"); } final int prev = getFacts().putIfAbsent(key, rowIndex); if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) { numEntries.incrementAndGet(); - sizeInBytes.incrementAndGet(); + bytesInMemory.incrementAndGet(); } else { // We lost a race aggs = indexedMap.get(prev); @@ -226,7 +223,7 @@ && getFacts().getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) { rowContainer.set(null); - return new AddToFactsResult(numEntries.get(), sizeInBytes.get(), new ArrayList<>()); + return new AddToFactsResult(numEntries.get(), bytesInMemory.get(), new ArrayList<>()); } @Override diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java index 883d6f5a58b2..03332aac3546 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java @@ -284,7 +284,7 @@ public long getBytesInMemory() return 0; } - return currHydrant.getIndex().getBytesInMemory().get(); + return currHydrant.getIndex().getBytesInMemory(); } }