From 4863e2ca4f8ccdf60e69721f95b5455d07baadd9 Mon Sep 17 00:00:00 2001 From: Nishant Date: Thu, 7 Jan 2016 22:11:16 +0530 Subject: [PATCH] cache metric selectors instead of creating new ones for every metric in each row clear selectors on close. Add comments about thread safety. --- .../segment/incremental/IncrementalIndex.java | 1 + .../incremental/OnheapIncrementalIndex.java | 91 ++++++++++++++++++- 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 53ae2753ce60..cada0c913cf6 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -355,6 +355,7 @@ protected abstract AggregatorType[] initAggs( boolean deserializeComplexMetrics ); + // Note: This method needs to be thread safe. protected abstract Integer addToFacts( AggregatorFactory[] metrics, boolean deserializeComplexMetrics, diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 4f220954fa55..455186e4e868 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -28,6 +28,12 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.dimension.DimensionSpec; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionSelector; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; import java.util.Arrays; import java.util.Map; @@ -45,6 +51,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex private final ConcurrentNavigableMap facts = new ConcurrentSkipListMap<>(); private final AtomicInteger indexIncrement = new AtomicInteger(0); protected final int maxRowCount; + private volatile Map selectors; private String outOfRowsReason = null; @@ -118,6 +125,14 @@ protected Aggregator[] initAggs( AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics ) { + selectors = Maps.newHashMap(); + for (AggregatorFactory agg : metrics) { + selectors.put( + agg.getName(), + new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)) + ); + } + return new Aggregator[metrics.length]; } @@ -144,7 +159,7 @@ protected Integer addToFacts( for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; aggs[i] = agg.factorize( - makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics) + selectors.get(agg.getName()) ); } final Integer rowIndex = indexIncrement.getAndIncrement(); @@ -253,6 +268,9 @@ public void close() super.close(); aggregators.clear(); facts.clear(); + if (selectors != null) { + selectors.clear(); + } } private static class OnHeapDimDim implements DimDim @@ -345,4 +363,75 @@ public boolean compareCanonicalValues(String s1, String s2) return s1 == s2; } } + + // Caches references to selector objetcs for each column instead of creating a new object each time in order to save heap space. + // In general the selectorFactory need not to thread-safe. + // here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex. + private static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory + { + private final ConcurrentMap longColumnSelectorMap = Maps.newConcurrentMap(); + private final ConcurrentMap floatColumnSelectorMap = Maps.newConcurrentMap(); + private final ConcurrentMap objectColumnSelectorMap = Maps.newConcurrentMap(); + private final ColumnSelectorFactory delegate; + + public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate) + { + this.delegate = delegate; + } + + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return delegate.makeDimensionSelector(dimensionSpec); + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(String columnName) + { + FloatColumnSelector existing = floatColumnSelectorMap.get(columnName); + if (existing != null) { + return existing; + } else { + FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName); + FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent( + columnName, + newSelector + ); + return prev != null ? prev : newSelector; + } + } + + @Override + public LongColumnSelector makeLongColumnSelector(String columnName) + { + LongColumnSelector existing = longColumnSelectorMap.get(columnName); + if (existing != null) { + return existing; + } else { + LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName); + LongColumnSelector prev = longColumnSelectorMap.putIfAbsent( + columnName, + newSelector + ); + return prev != null ? prev : newSelector; + } + } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String columnName) + { + ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName); + if (existing != null) { + return existing; + } else { + ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName); + ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent( + columnName, + newSelector + ); + return prev != null ? prev : newSelector; + } + } + } + }