diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 97888fd0ff14..e7e1e6eaf7a2 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -71,13 +71,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; /** */ -public abstract class IncrementalIndex implements Iterable, Closeable +public abstract class IncrementalIndex implements Iterable, Closeable { private volatile DateTime maxIngestedEventTime; @@ -337,7 +339,6 @@ public int lookupId(String name) private final QueryGranularity gran; private final List> rowTransformers; private final AggregatorFactory[] metrics; - private final AggregatorType[] aggs; private final boolean deserializeComplexMetrics; private final boolean reportParseExceptions; private final Metadata metadata; @@ -361,6 +362,12 @@ public InputRow get() } }; + protected final ConcurrentNavigableMap facts; + + protected final int maxRowCount; + protected final Map selectors; + protected String outOfRowsReason; + /** * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. @@ -374,7 +381,8 @@ public InputRow get() public IncrementalIndex( final IncrementalIndexSchema incrementalIndexSchema, final boolean deserializeComplexMetrics, - final boolean reportParseExceptions + final boolean reportParseExceptions, + final int maxRowCount ) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); @@ -384,10 +392,8 @@ public IncrementalIndex( this.deserializeComplexMetrics = deserializeComplexMetrics; this.reportParseExceptions = reportParseExceptions; - this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics)); - - this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics); this.columnCapabilities = Maps.newHashMap(); + this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics)); this.metricDescs = Maps.newLinkedHashMap(); for (AggregatorFactory metric : metrics) { @@ -418,6 +424,18 @@ public IncrementalIndex( capabilities.setHasSpatialIndexes(true); columnCapabilities.put(spatialDimension.getDimName(), capabilities); } + this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + this.maxRowCount = maxRowCount; + + selectors = Maps.newHashMap(); + for (AggregatorFactory agg : metrics) { + selectors.put( + agg.getName(), + new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)) + ); + } + + initAggs(metrics, rowSupplier, deserializeComplexMetrics); } private DimDim newDimDim(String dimension, ValueType type) { @@ -441,17 +459,33 @@ private DimDim newDimDim(String dimension, ValueType type) { // use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation protected abstract DimDim makeDimDim(String dimension, Object lock); - public abstract ConcurrentNavigableMap getFacts(); + public ConcurrentNavigableMap getFacts() + { + return facts; + } - public abstract boolean canAppendRow(); + public boolean canAppendRow() + { + final boolean canAdd = size() < maxRowCount; + if (!canAdd) { + outOfRowsReason = String.format("Maximum number of rows [%d] reached", maxRowCount); + } + return canAdd; + } - public abstract String getOutOfRowsReason(); + public String getOutOfRowsReason() + { + return outOfRowsReason; + } - protected abstract AggregatorType[] initAggs( + // called in constructor + protected void initAggs( AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics - ); + ) + { + } // Note: This method needs to be thread safe. protected abstract Integer addToFacts( @@ -465,20 +499,23 @@ protected abstract Integer addToFacts( Supplier rowSupplier ) throws IndexSizeExceededException; - protected abstract AggregatorType[] getAggsForRow(int rowOffset); - - protected abstract Object getAggVal(AggregatorType agg, int rowOffset, int aggPosition); + protected abstract float getMetricFloatValue(T agg, int aggOffset); - protected abstract float getMetricFloatValue(int rowOffset, int aggOffset); + protected abstract long getMetricLongValue(T agg, int aggOffset); - protected abstract long getMetricLongValue(int rowOffset, int aggOffset); + protected abstract Object getMetricObjectValue(T agg, int aggOffset); - protected abstract Object getMetricObjectValue(int rowOffset, int aggOffset); + protected abstract void destroy(T aggregators); @Override public void close() { + for (T agg : facts.values()) { + destroy(agg); + } + facts.clear(); dimValues.clear(); + selectors.clear(); } public InputRow formatRow(InputRow row) @@ -538,16 +575,17 @@ public Map getDimensionDescs() /** * Adds a new row. The row might correspond with another row that already exists, in which case this will * update that row instead of inserting a new one. - *

- *

+ *

+ *

* Calls to add() are thread safe. - *

+ *

* * @param row the row of data to add * * @return the number of rows in the data set after adding the InputRow */ - public int add(InputRow row) throws IndexSizeExceededException { + public int add(InputRow row) throws IndexSizeExceededException + { TimeAndDims key = toTimeAndDims(row); final int rv = addToFacts( metrics, @@ -708,11 +746,6 @@ private int[] getDimVals(final DimDim dimLookup, final List dimValue return retVal; } - public AggregatorType[] getAggs() - { - return aggs; - } - public AggregatorFactory[] getMetricAggs() { return metrics; @@ -829,7 +862,7 @@ public ColumnCapabilities getCapabilities(String column) return columnCapabilities.get(column); } - public ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) + public ConcurrentNavigableMap getSubMap(TimeAndDims start, TimeAndDims end) { return getFacts().subMap(start, end); } @@ -862,16 +895,18 @@ public Iterable iterableWithPostAggregations(final List pos public Iterator iterator() { final List dimensions = getDimensions(); - final ConcurrentNavigableMap facts = descending ? getFacts().descendingMap() : getFacts(); + final ConcurrentNavigableMap facts = descending + ? getFacts().descendingMap() + : getFacts(); return Iterators.transform( facts.entrySet().iterator(), - new Function, Row>() + new Function, Row>() { @Override - public Row apply(final Map.Entry input) + public Row apply(final Map.Entry input) { final TimeAndDims timeAndDims = input.getKey(); - final int rowOffset = input.getValue(); + final T aggs = input.getValue(); int[][] theDims = timeAndDims.getDims(); //TODO: remove dictionary encoding for numerics later ValueType[] types = timeAndDims.getTypes(); @@ -908,9 +943,8 @@ public Row apply(final Map.Entry input) } } - AggregatorType[] aggs = getAggsForRow(rowOffset); - for (int i = 0; i < aggs.length; ++i) { - theVals.put(metrics[i].getName(), getAggVal(aggs[i], rowOffset, i)); + for (int i = 0; i < metrics.length; ++i) { + theVals.put(metrics[i].getName(), getMetricObjectValue(aggs, i)); } if (postAggs != null) { @@ -1273,4 +1307,74 @@ public int compare(TimeAndDims lhs, TimeAndDims rhs) return retVal; } } + + // Caches references to selector objects for each column instead of creating a new object each time in order to save heap space. + // In general the selectorFactory need not to thread-safe. + // here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex. + static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory + { + private final ConcurrentMap longColumnSelectorMap = Maps.newConcurrentMap(); + private final ConcurrentMap floatColumnSelectorMap = Maps.newConcurrentMap(); + private final ConcurrentMap objectColumnSelectorMap = Maps.newConcurrentMap(); + private final ColumnSelectorFactory delegate; + + public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate) + { + this.delegate = delegate; + } + + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return delegate.makeDimensionSelector(dimensionSpec); + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(String columnName) + { + FloatColumnSelector existing = floatColumnSelectorMap.get(columnName); + if (existing != null) { + return existing; + } else { + FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName); + FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent( + columnName, + newSelector + ); + return prev != null ? prev : newSelector; + } + } + + @Override + public LongColumnSelector makeLongColumnSelector(String columnName) + { + LongColumnSelector existing = longColumnSelectorMap.get(columnName); + if (existing != null) { + return existing; + } else { + LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName); + LongColumnSelector prev = longColumnSelectorMap.putIfAbsent( + columnName, + newSelector + ); + return prev != null ? prev : newSelector; + } + } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(String columnName) + { + ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName); + if (existing != null) { + return existing; + } else { + ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName); + ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent( + columnName, + newSelector + ); + return prev != null ? prev : newSelector; + } + } + } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 057efbc67292..d190e41e65fe 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -50,7 +50,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter { private static final Logger log = new Logger(IncrementalIndexAdapter.class); private final Interval dataInterval; - private final IncrementalIndex index; + private final IncrementalIndex index; private final Set hasNullValueDimensions; private final Map indexers; @@ -239,16 +239,18 @@ public Iterator iterator() */ return Iterators.transform( index.getFacts().entrySet().iterator(), - new Function, Rowboat>() + new Function, Rowboat>() { int count = 0; @Override - public Rowboat apply(Map.Entry input) + public Rowboat apply( + Map.Entry input + ) { final IncrementalIndex.TimeAndDims timeAndDims = input.getKey(); final int[][] dimValues = timeAndDims.getDims(); - final int rowOffset = input.getValue(); + final Object aggrs = input.getValue(); int[][] dims = new int[dimValues.length][]; for (IncrementalIndex.DimensionDesc dimension : dimensions) { @@ -273,7 +275,7 @@ public Rowboat apply(Map.Entry input) Object[] metrics = new Object[index.getMetricAggs().length]; for (int i = 0; i < metrics.length; i++) { - metrics[i] = index.getMetricObjectValue(rowOffset, i); + metrics[i] = index.getMetricObjectValue(aggrs, i); } return new Rowboat( diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index aecf8b6764ba..0d641e630b01 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -225,8 +225,8 @@ public Cursor apply(@Nullable final Long input) return new Cursor() { - private Iterator> baseIter; - private ConcurrentNavigableMap cursorMap; + private Iterator> baseIter; + private ConcurrentNavigableMap cursorMap; final DateTime time; int numAdvanced = -1; boolean done; @@ -613,14 +613,14 @@ private ValueMatcher makeFilterMatcher(final Filter filter, final EntryHolder ho private static class EntryHolder { - Map.Entry currEntry = null; + Map.Entry currEntry = null; - public Map.Entry get() + public Map.Entry get() { return currEntry; } - public void set(Map.Entry currEntry) + public void set(Map.Entry currEntry) { this.currEntry = currEntry; } @@ -630,7 +630,7 @@ public IncrementalIndex.TimeAndDims getKey() return currEntry.getKey(); } - public Integer getValue() + public Object getValue() { return currEntry.getValue(); } diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 47f4c5ca4f1c..db5c507e2243 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -21,7 +21,6 @@ import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.collect.Maps; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.parsers.ParseException; @@ -37,27 +36,20 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; /** */ -public class OffheapIncrementalIndex extends IncrementalIndex +public class OffheapIncrementalIndex extends IncrementalIndex { private final StupidPool bufferPool; private final List> aggBuffers = new ArrayList<>(); - private final List indexAndOffsets = new ArrayList<>(); - private final ConcurrentNavigableMap facts; + private final int[] nextBufferIndexAndOffset = new int[]{0, 0}; - private final AtomicInteger indexIncrement = new AtomicInteger(0); - - protected final int maxRowCount; - - private volatile Map selectors; + private ColumnSelectorFactory[] selectorFactories; + private BufferAggregator[] aggregators; //given a ByteBuffer and an offset where all aggregates for a row are stored //offset + aggOffsetInBuffer[i] would give position in ByteBuffer where ith aggregate @@ -65,8 +57,6 @@ public class OffheapIncrementalIndex extends IncrementalIndex private volatile int[] aggOffsetInBuffer; private volatile int aggsTotalSize; - private String outOfRowsReason = null; - public OffheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, @@ -75,10 +65,8 @@ public OffheapIncrementalIndex( StupidPool bufferPool ) { - super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); - this.maxRowCount = maxRowCount; + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, maxRowCount); this.bufferPool = bufferPool; - this.facts = new ConcurrentSkipListMap<>(dimsComparator()); //check that stupid pool gives buffers that can hold at least one row's aggregators ResourceHolder bb = bufferPool.take(); @@ -86,7 +74,8 @@ public OffheapIncrementalIndex( RuntimeException ex = new IAE("bufferPool buffers capacity must be >= [%s]", aggsTotalSize); try { bb.close(); - } catch(IOException ioe){ + } + catch (IOException ioe) { ex.addSuppressed(ioe); } throw ex; @@ -136,12 +125,6 @@ public OffheapIncrementalIndex( ); } - @Override - public ConcurrentNavigableMap getFacts() - { - return facts; - } - @Override protected DimDim makeDimDim(String dimension, Object lock) { @@ -149,37 +132,27 @@ protected DimDim makeDimDim(String dimension, Object lock) } @Override - protected BufferAggregator[] initAggs( + protected void initAggs( AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics ) { - selectors = Maps.newHashMap(); aggOffsetInBuffer = new int[metrics.length]; + selectorFactories = new ColumnSelectorFactory[metrics.length]; for (int i = 0; i < metrics.length; i++) { - AggregatorFactory agg = metrics[i]; - - ColumnSelectorFactory columnSelectorFactory = makeColumnSelectorFactory( - agg, + selectorFactories[i] = makeColumnSelectorFactory( + metrics[i], rowSupplier, deserializeComplexMetrics ); - - selectors.put( - agg.getName(), - new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory) - ); - - if (i == 0) { - aggOffsetInBuffer[i] = 0; - } else { - aggOffsetInBuffer[i] = aggOffsetInBuffer[i-1] + metrics[i-1].getMaxIntermediateSize(); + if (i > 0) { + aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i - 1].getMaxIntermediateSize(); } } - aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSize(); - - return new BufferAggregator[metrics.length]; + if (metrics.length > 0) { + aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSize(); + } } @Override @@ -199,40 +172,33 @@ protected Integer addToFacts( int bufferOffset; synchronized (this) { - final Integer priorIndex = facts.get(key); - if (null != priorIndex) { - final int[] indexAndOffset = indexAndOffsets.get(priorIndex); + int[] indexAndOffset = facts.get(key); + if (indexAndOffset != null) { bufferIndex = indexAndOffset[0]; bufferOffset = indexAndOffset[1]; aggBuffer = aggBuffers.get(bufferIndex).get(); } else { - if (metrics.length > 0 && getAggs()[0] == null) { + if (aggregators == null) { // note: creation of Aggregators is done lazily when at least one row from input is available // so that FilteredAggregators could be initialized correctly. rowContainer.set(row); + aggregators = new BufferAggregator[metrics.length]; for (int i = 0; i < metrics.length; i++) { - final AggregatorFactory agg = metrics[i]; - getAggs()[i] = agg.factorizeBuffered( - makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics) - ); + aggregators[i] = metrics[i].factorizeBuffered(selectorFactories[i]); } rowContainer.set(null); } bufferIndex = aggBuffers.size() - 1; - ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get(); - int[] lastAggregatorsIndexAndOffset = indexAndOffsets.isEmpty() - ? null - : indexAndOffsets.get(indexAndOffsets.size() - 1); + ByteBuffer lastBuffer = aggBuffers.get(bufferIndex).get(); - if (lastAggregatorsIndexAndOffset != null && lastAggregatorsIndexAndOffset[0] != bufferIndex) { + if (nextBufferIndexAndOffset[0] != bufferIndex) { throw new ISE("last row's aggregate's buffer and last buffer index must be same"); } - bufferOffset = aggsTotalSize + (lastAggregatorsIndexAndOffset != null ? lastAggregatorsIndexAndOffset[1] : 0); - if (lastBuffer != null && - lastBuffer.capacity() - bufferOffset >= aggsTotalSize) { + if (lastBuffer.capacity() >= nextBufferIndexAndOffset[1] + aggsTotalSize) { aggBuffer = lastBuffer; + bufferOffset = nextBufferIndexAndOffset[1]; } else { ResourceHolder bb = bufferPool.take(); aggBuffers.add(bb); @@ -242,7 +208,7 @@ protected Integer addToFacts( } for (int i = 0; i < metrics.length; i++) { - getAggs()[i].init(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); + aggregators[i].init(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); } // Last ditch sanity checks @@ -250,25 +216,20 @@ protected Integer addToFacts( throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); } - final Integer rowIndex = indexIncrement.getAndIncrement(); - - // note that indexAndOffsets must be updated before facts, because as soon as we update facts - // concurrent readers get hold of it and might ask for newly added row - indexAndOffsets.add(new int[]{bufferIndex, bufferOffset}); - final Integer prev = facts.putIfAbsent(key, rowIndex); + final Object prev = facts.putIfAbsent(key, new int[]{bufferIndex, bufferOffset}); if (null == prev) { numEntries.incrementAndGet(); + nextBufferIndexAndOffset[0] = bufferIndex; + nextBufferIndexAndOffset[1] = bufferOffset + aggsTotalSize; } else { - throw new ISE("WTF! we are in sychronized block."); + throw new ISE("WTF! we are in synchronized block."); } } } rowContainer.set(row); - - for (int i = 0; i < metrics.length; i++) { - final BufferAggregator agg = getAggs()[i]; - + for (int i = 0; i < aggregators.length; i++) { + final BufferAggregator agg = aggregators[i]; synchronized (agg) { try { agg.aggregate(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); @@ -281,64 +242,34 @@ protected Integer addToFacts( } } rowContainer.set(null); - return numEntries.get(); - } - - @Override - public boolean canAppendRow() - { - final boolean canAdd = size() < maxRowCount; - if (!canAdd) { - outOfRowsReason = String.format("Maximum number of rows [%d] reached", maxRowCount); - } - return canAdd; - } - - @Override - public String getOutOfRowsReason() - { - return outOfRowsReason; - } - @Override - protected BufferAggregator[] getAggsForRow(int rowOffset) - { - return getAggs(); + return numEntries.get(); } @Override - protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition) + public float getMetricFloatValue(int[] indexAndOffset, int aggOffset) { - int[] indexAndOffset = indexAndOffsets.get(rowOffset); ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggPosition]); + return aggregators[aggOffset].getFloat(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } @Override - public float getMetricFloatValue(int rowOffset, int aggOffset) + public long getMetricLongValue(int[] indexAndOffset, int aggOffset) { - BufferAggregator agg = getAggs()[aggOffset]; - int[] indexAndOffset = indexAndOffsets.get(rowOffset); ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.getFloat(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); + return aggregators[aggOffset].getLong(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } @Override - public long getMetricLongValue(int rowOffset, int aggOffset) + public Object getMetricObjectValue(int[] indexAndOffset, int aggOffset) { - BufferAggregator agg = getAggs()[aggOffset]; - int[] indexAndOffset = indexAndOffsets.get(rowOffset); ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.getLong(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); + return aggregators[aggOffset].get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } @Override - public Object getMetricObjectValue(int rowOffset, int aggOffset) + protected void destroy(int[] indexAndOffset) { - BufferAggregator agg = getAggs()[aggOffset]; - int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); - return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } /** @@ -348,18 +279,13 @@ public Object getMetricObjectValue(int rowOffset, int aggOffset) public void close() { super.close(); - facts.clear(); - indexAndOffsets.clear(); - - if (selectors != null) { - selectors.clear(); - } RuntimeException ex = null; for (ResourceHolder buffHolder : aggBuffers) { try { buffHolder.close(); - } catch(IOException ioe) { + } + catch (IOException ioe) { if (ex == null) { ex = Throwables.propagate(ioe); } else { diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 5f4f58e0e182..bca554522ed0 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -22,38 +22,22 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.metamx.common.logger.Logger; import com.metamx.common.parsers.ParseException; import io.druid.data.input.InputRow; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.dimension.DimensionSpec; -import io.druid.segment.ColumnSelectorFactory; -import io.druid.segment.DimensionSelector; -import io.druid.segment.FloatColumnSelector; -import io.druid.segment.LongColumnSelector; -import io.druid.segment.ObjectColumnSelector; -import io.druid.segment.column.ValueType; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; /** */ -public class OnheapIncrementalIndex extends IncrementalIndex +public class OnheapIncrementalIndex extends IncrementalIndex { - private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); - private final ConcurrentNavigableMap facts; - private final AtomicInteger indexIncrement = new AtomicInteger(0); - protected final int maxRowCount; - private volatile Map selectors; - - private String outOfRowsReason = null; + private static final Logger log = new Logger(OnheapIncrementalIndex.class); public OnheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, @@ -62,9 +46,7 @@ public OnheapIncrementalIndex( int maxRowCount ) { - super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); - this.maxRowCount = maxRowCount; - this.facts = new ConcurrentSkipListMap<>(dimsComparator()); + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, maxRowCount); } public OnheapIncrementalIndex( @@ -114,34 +96,12 @@ public OnheapIncrementalIndex( this(incrementalIndexSchema, true, reportParseExceptions, maxRowCount); } - @Override - public ConcurrentNavigableMap getFacts() - { - return facts; - } - @Override protected DimDim makeDimDim(String dimension, Object lock) { return new OnHeapDimDim(lock); } - @Override - protected Aggregator[] initAggs( - AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics - ) - { - selectors = Maps.newHashMap(); - for (AggregatorFactory agg : metrics) { - selectors.put( - agg.getName(), - new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)) - ); - } - - return new Aggregator[metrics.length]; - } - @Override protected Integer addToFacts( AggregatorFactory[] metrics, @@ -154,47 +114,35 @@ protected Integer addToFacts( Supplier rowSupplier ) throws IndexSizeExceededException { - final Integer priorIndex = facts.get(key); - - Aggregator[] aggs; + Aggregator[] aggs = facts.get(key); - if (null != priorIndex) { - aggs = concurrentGet(priorIndex); - } else { + if (aggs == null) { aggs = new Aggregator[metrics.length]; rowContainer.set(row); for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; - aggs[i] = agg.factorize( - selectors.get(agg.getName()) - ); + aggs[i] = agg.factorize(selectors.get(agg.getName())); } rowContainer.set(null); - final Integer rowIndex = indexIncrement.getAndIncrement(); - - concurrentSet(rowIndex, aggs); - // Last ditch sanity checks if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) { throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); } - final Integer prev = facts.putIfAbsent(key, rowIndex); - if (null == prev) { + final Aggregator[] prev = facts.putIfAbsent(key, aggs); + if (prev == null) { numEntries.incrementAndGet(); } else { - // We lost a race - aggs = concurrentGet(prev); - // Free up the misfire - concurrentRemove(rowIndex); + // We lost a race, free up the misfire + destroy(aggs); + aggs = prev; // This is expected to occur ~80% of the time in the worst scenarios } } rowContainer.set(row); - - for (Aggregator agg : aggs) { + for (final Aggregator agg : aggs) { synchronized (agg) { try { agg.aggregate(); @@ -206,88 +154,40 @@ protected Integer addToFacts( } } } - rowContainer.set(null); - return numEntries.get(); } - protected Aggregator[] concurrentGet(int offset) - { - // All get operations should be fine - return aggregators.get(offset); - } - - protected void concurrentSet(int offset, Aggregator[] value) - { - aggregators.put(offset, value); - } - - protected void concurrentRemove(int offset) - { - aggregators.remove(offset); - } - @Override - public boolean canAppendRow() + protected void destroy(Aggregator[] aggregators) { - final boolean canAdd = size() < maxRowCount; - if (!canAdd) { - outOfRowsReason = String.format("Maximum number of rows [%d] reached", maxRowCount); + for (Aggregator aggregator : aggregators) { + try { + aggregator.close(); + } + catch (Throwable t) { + log.error(t, t.toString()); + } } - return canAdd; } @Override - public String getOutOfRowsReason() + public float getMetricFloatValue(Aggregator[] agg, int aggOffset) { - return outOfRowsReason; + return agg[aggOffset].getFloat(); } @Override - protected Aggregator[] getAggsForRow(int rowOffset) + public long getMetricLongValue(Aggregator[] agg, int aggOffset) { - return concurrentGet(rowOffset); + return agg[aggOffset].getLong(); } @Override - protected Object getAggVal(Aggregator agg, int rowOffset, int aggPosition) + public Object getMetricObjectValue(Aggregator[] agg, int aggOffset) { - return agg.get(); - } - - @Override - public float getMetricFloatValue(int rowOffset, int aggOffset) - { - return concurrentGet(rowOffset)[aggOffset].getFloat(); - } - - @Override - public long getMetricLongValue(int rowOffset, int aggOffset) - { - return concurrentGet(rowOffset)[aggOffset].getLong(); - } - - @Override - public Object getMetricObjectValue(int rowOffset, int aggOffset) - { - return concurrentGet(rowOffset)[aggOffset].get(); - } - - /** - * Clear out maps to allow GC - * NOTE: This is NOT thread-safe with add... so make sure all the adding is DONE before closing - */ - @Override - public void close() - { - super.close(); - aggregators.clear(); - facts.clear(); - if (selectors != null) { - selectors.clear(); - } + return agg[aggOffset].get(); } static class OnHeapDimDim> implements DimDim @@ -416,75 +316,4 @@ public int getSortedIdFromUnsortedId(int id) return idToIndex[id]; } } - - // Caches references to selector objects for each column instead of creating a new object each time in order to save heap space. - // In general the selectorFactory need not to thread-safe. - // here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex. - static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory - { - private final ConcurrentMap longColumnSelectorMap = Maps.newConcurrentMap(); - private final ConcurrentMap floatColumnSelectorMap = Maps.newConcurrentMap(); - private final ConcurrentMap objectColumnSelectorMap = Maps.newConcurrentMap(); - private final ColumnSelectorFactory delegate; - - public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate) - { - this.delegate = delegate; - } - - @Override - public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) - { - return delegate.makeDimensionSelector(dimensionSpec); - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(String columnName) - { - FloatColumnSelector existing = floatColumnSelectorMap.get(columnName); - if (existing != null) { - return existing; - } else { - FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName); - FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent( - columnName, - newSelector - ); - return prev != null ? prev : newSelector; - } - } - - @Override - public LongColumnSelector makeLongColumnSelector(String columnName) - { - LongColumnSelector existing = longColumnSelectorMap.get(columnName); - if (existing != null) { - return existing; - } else { - LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName); - LongColumnSelector prev = longColumnSelectorMap.putIfAbsent( - columnName, - newSelector - ); - return prev != null ? prev : newSelector; - } - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(String columnName) - { - ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName); - if (existing != null) { - return existing; - } else { - ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName); - ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent( - columnName, - newSelector - ); - return prev != null ? prev : newSelector; - } - } - } - } diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index e7987d5f6d35..2bd7c1eed9fb 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -310,11 +310,13 @@ public static > List> makeQueryRunn ) throws IOException { - final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final IncrementalIndex rtIndexOnHeap = TestIndex.getIncrementalTestIndex(false); + final IncrementalIndex rtIndexOffHeap = TestIndex.getIncrementalTestIndex(true); final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); return ImmutableList.of( - makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)), + makeQueryRunner(factory, new IncrementalIndexSegment(rtIndexOnHeap, segmentId)), + makeQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffHeap, segmentId)), makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)), makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)) ); @@ -327,12 +329,14 @@ public static Collection makeUnionQueryRunners( ) throws IOException { - final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(); + final IncrementalIndex rtIndexOnHeap = TestIndex.getIncrementalTestIndex(false); + final IncrementalIndex rtIndexOffHeap = TestIndex.getIncrementalTestIndex(true); final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex(); final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex(); return Arrays.asList( - makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)), + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOnHeap, segmentId)), + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffHeap, segmentId)), makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)), makeUnionQueryRunner( factory, diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java index b7beeebeed44..2b42fb1ce31a 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentAnalyzerTest.java @@ -59,7 +59,7 @@ public void testIncrementalWorks() throws Exception private void testIncrementalWorksHelper(EnumSet analyses) throws Exception { final List results = getSegmentAnalysises( - new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), null), + new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), null), analyses ); diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java index 628fcfa6a207..9870ab0b2469 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java @@ -95,7 +95,7 @@ public static QueryRunner makeIncrementalIndexQueryRunner( return QueryRunnerTestHelper.makeQueryRunner( factory, segmentId, - new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId) + new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId) ); } diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerWithCaseTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerWithCaseTest.java index c9332adf536f..dd9f3b5f1951 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerWithCaseTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerWithCaseTest.java @@ -83,8 +83,8 @@ public static Iterable constructorFeeder() throws IOException "2011-01-13T00:00:00.000Z\tspot\tautomotive\tpreferred\ta\u0001preferred\t94.874713" ); - IncrementalIndex index1 = TestIndex.makeRealtimeIndex(input); - IncrementalIndex index2 = TestIndex.makeRealtimeIndex(input); + IncrementalIndex index1 = TestIndex.makeRealtimeIndex(input, false); + IncrementalIndex index2 = TestIndex.makeRealtimeIndex(input, false); QueryableIndex index3 = TestIndex.persistRealtimeAndLoadMMapped(index1); QueryableIndex index4 = TestIndex.persistRealtimeAndLoadMMapped(index2); diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java index 965a0c6560e9..9d08223b4055 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -121,7 +121,7 @@ public void testMinTopNThreshold() throws Exception ); QueryRunner> runner = QueryRunnerTestHelper.makeQueryRunner( factory, - new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId) + new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId) ); Map context = Maps.newHashMap(); diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java index 4a865f8275c9..ee141be4c86d 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java @@ -106,7 +106,7 @@ public ByteBuffer get() TestCases.rtIndex, QueryRunnerTestHelper.makeQueryRunner( factory, - new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId) + new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId) ) ); testCaseMap.put( diff --git a/processing/src/test/java/io/druid/segment/IndexIOTest.java b/processing/src/test/java/io/druid/segment/IndexIOTest.java index 804fd74f2f09..e7c918f42dcd 100644 --- a/processing/src/test/java/io/druid/segment/IndexIOTest.java +++ b/processing/src/test/java/io/druid/segment/IndexIOTest.java @@ -261,7 +261,7 @@ public IndexIOTest( this.exception = exception; } - final IncrementalIndex incrementalIndex1 = new OnheapIncrementalIndex( + final IncrementalIndex incrementalIndex1 = new OnheapIncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis()) .withQueryGranularity(QueryGranularity.NONE) .withMetrics( @@ -283,7 +283,7 @@ public IndexIOTest( 1000000 ); - final IncrementalIndex incrementalIndex2 = new OnheapIncrementalIndex( + final IncrementalIndex incrementalIndex2 = new OnheapIncrementalIndex( new IncrementalIndexSchema.Builder().withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis()) .withQueryGranularity(QueryGranularity.NONE) .withMetrics( diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index c6ecc351c29b..f407cd0f1872 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -20,12 +20,14 @@ package io.druid.segment; import com.google.common.base.Charsets; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.hash.Hashing; import com.google.common.io.CharSource; import com.google.common.io.LineProcessor; import com.google.common.io.Resources; import com.metamx.common.logger.Logger; +import io.druid.collections.StupidPool; import io.druid.data.input.impl.DelimitedParseSpec; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.StringInputRowParser; @@ -37,6 +39,7 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; @@ -45,6 +48,7 @@ import java.io.File; import java.io.IOException; import java.net.URL; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; @@ -89,19 +93,23 @@ public class TestIndex } } - private static IncrementalIndex realtimeIndex = null; + private static IncrementalIndex realtimeIndexOnHeap = null; + private static IncrementalIndex realtimeIndexOffHeap = null; private static QueryableIndex mmappedIndex = null; private static QueryableIndex mergedRealtime = null; - public static IncrementalIndex getIncrementalTestIndex() + public static IncrementalIndex getIncrementalTestIndex(boolean offHeap) { synchronized (log) { - if (realtimeIndex != null) { - return realtimeIndex; + if (offHeap) { + return realtimeIndexOffHeap != null + ? realtimeIndexOffHeap + : (realtimeIndexOffHeap = makeRealtimeIndex("druid.sample.tsv", offHeap)); } + return realtimeIndexOnHeap != null + ? realtimeIndexOnHeap + : (realtimeIndexOnHeap = makeRealtimeIndex("druid.sample.tsv", offHeap)); } - - return realtimeIndex = makeRealtimeIndex("druid.sample.tsv"); } public static QueryableIndex getMMappedTestIndex() @@ -112,7 +120,7 @@ public static QueryableIndex getMMappedTestIndex() } } - IncrementalIndex incrementalIndex = getIncrementalTestIndex(); + IncrementalIndex incrementalIndex = getIncrementalTestIndex(false); mmappedIndex = persistRealtimeAndLoadMMapped(incrementalIndex); return mmappedIndex; @@ -126,8 +134,8 @@ public static QueryableIndex mergedRealtimeIndex() } try { - IncrementalIndex top = makeRealtimeIndex("druid.sample.tsv.top"); - IncrementalIndex bottom = makeRealtimeIndex("druid.sample.tsv.bottom"); + IncrementalIndex top = makeRealtimeIndex("druid.sample.tsv.top", false); + IncrementalIndex bottom = makeRealtimeIndex("druid.sample.tsv.bottom", false); File tmpFile = File.createTempFile("yay", "who"); tmpFile.delete(); @@ -163,7 +171,7 @@ public static QueryableIndex mergedRealtimeIndex() } } - private static IncrementalIndex makeRealtimeIndex(final String resourceFilename) + private static IncrementalIndex makeRealtimeIndex(final String resourceFilename, final boolean offHeap) { final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename); if (resource == null) { @@ -171,17 +179,36 @@ private static IncrementalIndex makeRealtimeIndex(final String resourceFilename) } log.info("Realtime loading index file[%s]", resource); CharSource stream = Resources.asByteSource(resource).asCharSource(Charsets.UTF_8); - return makeRealtimeIndex(stream); + return makeRealtimeIndex(stream, offHeap); } - public static IncrementalIndex makeRealtimeIndex(final CharSource source) + public static IncrementalIndex makeRealtimeIndex(final CharSource source, final boolean offHeap) { final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) .withQueryGranularity(QueryGranularity.NONE) .withMetrics(METRIC_AGGS) .build(); - final IncrementalIndex retVal = new OnheapIncrementalIndex(schema, true, 10000); + final IncrementalIndex retVal = offHeap ? new OffheapIncrementalIndex( + schema, + true, + true, + 10000, + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ) : new OnheapIncrementalIndex( + schema, + true, + 10000 + ); final AtomicLong startTime = new AtomicLong(); int lineCount; @@ -223,7 +250,6 @@ public Integer getResult() ); } catch (IOException e) { - realtimeIndex = null; throw Throwables.propagate(e); } diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 0658e02fc744..ff35351ed3db 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -610,7 +610,7 @@ public void run() @Test public void testgetDimensions() { - final IncrementalIndex incrementalIndex = new OnheapIncrementalIndex( + final IncrementalIndex incrementalIndex = new OnheapIncrementalIndex( new IncrementalIndexSchema.Builder().withQueryGranularity(QueryGranularity.NONE) .withMetrics( new AggregatorFactory[]{ diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index 89a1195e3a8f..786f11be3fd8 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -22,7 +22,6 @@ import com.carrotsearch.junitbenchmarks.AbstractBenchmark; import com.carrotsearch.junitbenchmarks.BenchmarkOptions; import com.carrotsearch.junitbenchmarks.Clock; -import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -32,8 +31,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.metamx.common.guava.Sequences; -import com.metamx.common.parsers.ParseException; -import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; import io.druid.query.Druids; @@ -42,7 +39,6 @@ import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; -import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -68,7 +64,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -104,114 +99,11 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark factories = ingestAggregatorFactories.toArray(new AggregatorFactory[0]); } - private static final class MapIncrementalIndex extends OnheapIncrementalIndex - { - private final AtomicInteger indexIncrement = new AtomicInteger(0); - ConcurrentHashMap indexedMap = new ConcurrentHashMap(); - - public MapIncrementalIndex( - long minTimestamp, - QueryGranularity gran, - AggregatorFactory[] metrics, - int maxRowCount - ) - { - super(minTimestamp, gran, metrics, maxRowCount); - } - - @Override - protected Aggregator[] concurrentGet(int offset) - { - // All get operations should be fine - return indexedMap.get(offset); - } - - @Override - protected void concurrentSet(int offset, Aggregator[] value) - { - indexedMap.put(offset, value); - } - - @Override - protected Integer addToFacts( - AggregatorFactory[] metrics, - boolean deserializeComplexMetrics, - boolean reportParseExceptions, - InputRow row, - AtomicInteger numEntries, - TimeAndDims key, - ThreadLocal rowContainer, - Supplier rowSupplier - ) throws IndexSizeExceededException - { - - final Integer priorIdex = getFacts().get(key); - - Aggregator[] aggs; - - if (null != priorIdex) { - aggs = indexedMap.get(priorIdex); - } else { - aggs = new Aggregator[metrics.length]; - - for (int i = 0; i < metrics.length; i++) { - final AggregatorFactory agg = metrics[i]; - aggs[i] = agg.factorize( - makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics) - ); - } - Integer rowIndex; - - do { - rowIndex = indexIncrement.incrementAndGet(); - } while (null != indexedMap.putIfAbsent(rowIndex, aggs)); - - - // Last ditch sanity checks - if (numEntries.get() >= maxRowCount && !getFacts().containsKey(key)) { - throw new IndexSizeExceededException("Maximum number of rows reached"); - } - final Integer prev = getFacts().putIfAbsent(key, rowIndex); - if (null == prev) { - numEntries.incrementAndGet(); - } else { - // We lost a race - aggs = indexedMap.get(prev); - // Free up the misfire - indexedMap.remove(rowIndex); - // This is expected to occur ~80% of the time in the worst scenarios - } - } - - rowContainer.set(row); - - for (Aggregator agg : aggs) { - synchronized (agg) { - try { - agg.aggregate(); - } - catch (ParseException e) { - // "aggregate" can throw ParseExceptions if a selector expects something but gets something else. - if (reportParseExceptions) { - throw e; - } - } - } - } - - rowContainer.set(null); - - - return numEntries.get(); - } - } - @Parameterized.Parameters public static Collection getParameters() { return ImmutableList.of( - new Object[]{OnheapIncrementalIndex.class}, - new Object[]{MapIncrementalIndex.class} + new Object[]{OnheapIncrementalIndex.class} ); }