diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java index 3828750c1ffb..1bec5a07fa86 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java @@ -34,6 +34,10 @@ public class GroupByQueryConfig @JsonProperty private int maxResults = 500000; + //max off-heap limit in bytes + @JsonProperty + private long maxOffheapSize = 1L<<34; + public boolean isSingleThreaded() { return singleThreaded; @@ -53,4 +57,14 @@ public int getMaxResults() { return maxResults; } + + public void setMaxOffheapSize(long maxOffheapSize) + { + this.maxOffheapSize = maxOffheapSize; + } + + public long getMaxOffheapSize() + { + return maxOffheapSize; + } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 637dce3f077d..2ae226c4646a 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -43,6 +43,7 @@ public class GroupByQueryHelper { private static final String CTX_KEY_MAX_RESULTS = "maxResults"; + private static final String CTX_KEY_MAX_OFF_HEAP_SIZE = "groupByMaxOffheapSize"; public final static String CTX_KEY_SORT_RESULTS = "sortResults"; public static Pair> createIndexAccumulatorPair( @@ -94,8 +95,7 @@ public String apply(DimensionSpec input) false, true, sortResults, - Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()), - bufferPool + Math.min(query.getContextValue(CTX_KEY_MAX_OFF_HEAP_SIZE, config.getMaxOffheapSize()), config.getMaxOffheapSize()) ); } else { index = new OnheapIncrementalIndex( diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 9707fada524e..d749ad804eb3 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -355,7 +355,7 @@ public int lookupId(String name) private final Map dimensionDescs; private final Map columnCapabilities; - private final List dimValues; + protected final List dimValues; // looks need a configuration private final Ordering ordering = Ordering.natural().nullsFirst(); diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 35fe24260244..5dc896e00357 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -26,23 +26,35 @@ import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.common.parsers.ParseException; -import io.druid.collections.ResourceHolder; -import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.BufferAggregator; import io.druid.segment.ColumnSelectorFactory; - +import org.apache.commons.io.FileUtils; +import org.mapdb.BTreeKeySerializer; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.MappedFileVolWrapper; +import org.mapdb.Serializer; +import org.mapdb.StoreDirect; +import org.mapdb.Volume; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; import java.io.IOException; +import java.io.Serializable; import java.nio.ByteBuffer; +import java.nio.file.Files; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.UUID; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** */ @@ -50,16 +62,16 @@ public class OffheapIncrementalIndex extends IncrementalIndex { private static final Logger log = new Logger(OffheapIncrementalIndex.class); - private final StupidPool bufferPool; + private final DiskBBFactory aggBBFactory; - private final List> aggBuffers = new ArrayList<>(); + private final List aggBuffers = new ArrayList<>(); private final List indexAndOffsets = new ArrayList<>(); + private final DB factsDb; private final ConcurrentMap facts; - private final AtomicInteger indexIncrement = new AtomicInteger(0); - protected final int maxRowCount; + private final AtomicInteger indexIncrement = new AtomicInteger(0); private volatile Map selectors; @@ -71,37 +83,63 @@ public class OffheapIncrementalIndex extends IncrementalIndex private String outOfRowsReason = null; + private File tmpDirLocation; + public OffheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, boolean reportParseExceptions, boolean sortFacts, - int maxRowCount, - StupidPool bufferPool + long maxOffheapSize ) { super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts); - this.maxRowCount = maxRowCount; - this.bufferPool = bufferPool; - if (sortFacts) { - this.facts = new ConcurrentSkipListMap<>(dimsComparator()); - } else { - this.facts = new ConcurrentHashMap<>(); - } + try { + tmpDirLocation = Files.createTempDirectory("druid-offheap-index").toFile(); - //check that stupid pool gives buffers that can hold at least one row's aggregators - ResourceHolder bb = bufferPool.take(); - if (bb.get().capacity() < aggsTotalSize) { - RuntimeException ex = new IAE("bufferPool buffers capacity must be >= [%s]", aggsTotalSize); - try { - bb.close(); - } catch(IOException ioe){ - ex.addSuppressed(ioe); + File mapDbFile = File.createTempFile("mapdb", "", tmpDirLocation); + File aggsBBFile = File.createTempFile("aggs", "", tmpDirLocation); + + AtomicLong availableTotalSpace = new AtomicLong(maxOffheapSize); + + final DBMaker dbMaker = new MyDBMaker(mapDbFile, 24, 27, availableTotalSpace) + .deleteFilesAfterClose() + .commitFileSyncDisable() + .cacheSize(1024) + .transactionDisable() + .mmapFileEnable(); + + this.factsDb = dbMaker.make(); + + if (sortFacts) { + TimeAndDimsBTreeKeySerializer timeAndDimsSerializer = new TimeAndDimsBTreeKeySerializer(this); + this.facts = factsDb.createTreeMap("__facts" + UUID.randomUUID()) + .keySerializer(timeAndDimsSerializer) + .comparator(timeAndDimsSerializer.getComparator()) + .valueSerializer(Serializer.INTEGER) + .make(); + } else { + this.facts = factsDb.createHashMap("__facts" + UUID.randomUUID()) + .keySerializer(TimeAndDimsSerializer.INSTANCE) + .valueSerializer(Serializer.INTEGER) + .make(); } - throw ex; + + this.aggBBFactory = new DiskBBFactory(aggsBBFile, 27, availableTotalSpace); + + //check that agg buffers can hold at least one row's aggregators + ByteBuffer bb = this.aggBBFactory.allocateBB(); + if (bb.capacity() < aggsTotalSize) { + throw new IAE("bufferPool buffers capacity must be >= [%s]", aggsTotalSize); + } + aggBuffers.add(bb); + + } + catch (Exception ex) { + FileUtils.deleteQuietly(tmpDirLocation); + throw Throwables.propagate(ex); } - aggBuffers.add(bb); } public OffheapIncrementalIndex( @@ -111,8 +149,7 @@ public OffheapIncrementalIndex( boolean deserializeComplexMetrics, boolean reportParseExceptions, boolean sortFacts, - int maxRowCount, - StupidPool bufferPool + long maxOffheapSize ) { this( @@ -123,8 +160,7 @@ public OffheapIncrementalIndex( deserializeComplexMetrics, reportParseExceptions, sortFacts, - maxRowCount, - bufferPool + maxOffheapSize ); } @@ -132,8 +168,7 @@ public OffheapIncrementalIndex( long minTimestamp, QueryGranularity gran, final AggregatorFactory[] metrics, - int maxRowCount, - StupidPool bufferPool + long maxOffheapSize ) { this( @@ -144,8 +179,7 @@ public OffheapIncrementalIndex( true, true, true, - maxRowCount, - bufferPool + maxOffheapSize ); } @@ -217,7 +251,7 @@ protected Integer addToFacts( final int[] indexAndOffset = indexAndOffsets.get(priorIndex); bufferIndex = indexAndOffset[0]; bufferOffset = indexAndOffset[1]; - aggBuffer = aggBuffers.get(bufferIndex).get(); + aggBuffer = aggBuffers.get(bufferIndex); } else { if (metrics.length > 0 && getAggs()[0] == null) { // note: creation of Aggregators is done lazily when at least one row from input is available @@ -233,7 +267,7 @@ protected Integer addToFacts( } bufferIndex = aggBuffers.size() - 1; - ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get(); + ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1); int[] lastAggregatorsIndexAndOffset = indexAndOffsets.isEmpty() ? null : indexAndOffsets.get(indexAndOffsets.size() - 1); @@ -247,28 +281,36 @@ protected Integer addToFacts( lastBuffer.capacity() - bufferOffset >= aggsTotalSize) { aggBuffer = lastBuffer; } else { - ResourceHolder bb = bufferPool.take(); + ByteBuffer bb; + try { + bb = aggBBFactory.allocateBB(); + } catch(Exception ex) { + throw new IndexSizeExceededException(ex, "failed to allocated buffer for aggregation"); + } aggBuffers.add(bb); bufferIndex = aggBuffers.size() - 1; bufferOffset = 0; - aggBuffer = bb.get(); + aggBuffer = bb; } for (int i = 0; i < metrics.length; i++) { getAggs()[i].init(aggBuffer, bufferOffset + aggOffsetInBuffer[i]); } - // Last ditch sanity checks - if (numEntries.get() >= maxRowCount && !facts.containsKey(key)) { - throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); - } - final Integer rowIndex = indexIncrement.getAndIncrement(); // note that indexAndOffsets must be updated before facts, because as soon as we update facts // concurrent readers get hold of it and might ask for newly added row indexAndOffsets.add(new int[]{bufferIndex, bufferOffset}); - final Integer prev = facts.putIfAbsent(key, rowIndex); + + Integer prev = null; + try { + prev = facts.putIfAbsent(key, rowIndex); + } catch(Exception ex) { + indexAndOffsets.remove(indexAndOffsets.size() - 1); + throw new IndexSizeExceededException(ex, "failed to allocate buffer for storing facts"); + } + if (null == prev) { numEntries.incrementAndGet(); } else { @@ -302,11 +344,9 @@ protected Integer addToFacts( @Override public boolean canAppendRow() { - final boolean canAdd = size() < maxRowCount; - if (!canAdd) { - outOfRowsReason = String.format("Maximum number of rows [%d] reached", maxRowCount); - } - return canAdd; + //For off-heap, its not possible to reliably implement this. Currently, Offheap is only used for groupBy merging + //where this is not used. + return true; } @Override @@ -325,7 +365,7 @@ protected BufferAggregator[] getAggsForRow(int rowOffset) protected Object getAggVal(BufferAggregator agg, int rowOffset, int aggPosition) { int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); + ByteBuffer bb = aggBuffers.get(indexAndOffset[0]); return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggPosition]); } @@ -334,7 +374,7 @@ public float getMetricFloatValue(int rowOffset, int aggOffset) { BufferAggregator agg = getAggs()[aggOffset]; int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); + ByteBuffer bb = aggBuffers.get(indexAndOffset[0]); return agg.getFloat(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } @@ -343,7 +383,7 @@ public long getMetricLongValue(int rowOffset, int aggOffset) { BufferAggregator agg = getAggs()[aggOffset]; int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); + ByteBuffer bb = aggBuffers.get(indexAndOffset[0]); return agg.getLong(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } @@ -352,7 +392,7 @@ public Object getMetricObjectValue(int rowOffset, int aggOffset) { BufferAggregator agg = getAggs()[aggOffset]; int[] indexAndOffset = indexAndOffsets.get(rowOffset); - ByteBuffer bb = aggBuffers.get(indexAndOffset[0]).get(); + ByteBuffer bb = aggBuffers.get(indexAndOffset[0]); return agg.get(bb, indexAndOffset[1] + aggOffsetInBuffer[aggOffset]); } @@ -362,29 +402,223 @@ public Object getMetricObjectValue(int rowOffset, int aggOffset) @Override public void close() { - super.close(); - facts.clear(); + //note that it is important to first clear mapDB stuff as it depends on dimLookups + //in the comparator which should be available before the cleanup here. + RuntimeException ex = null; + try { + factsDb.close(); + } catch (Exception e) { + ex = Throwables.propagate(e); + } + indexAndOffsets.clear(); + aggBuffers.clear(); + + try { + FileUtils.deleteDirectory(tmpDirLocation); + } catch(IOException ioe) { + if (ex == null) { + ex = Throwables.propagate(ioe); + } else { + ex.addSuppressed(ioe); + } + } if (selectors != null) { selectors.clear(); } - RuntimeException ex = null; - for (ResourceHolder buffHolder : aggBuffers) { - try { - buffHolder.close(); - } catch(IOException ioe) { - if (ex == null) { - ex = Throwables.propagate(ioe); + super.close(); + + if (ex != null) { + throw ex; + } + } + + private static class TimeAndDimsSerializer implements Serializer, Serializable + { + private static final int[] EMPTY_INT_ARRAY = new int[0]; + + public static final TimeAndDimsSerializer INSTANCE = new TimeAndDimsSerializer(); + + private TimeAndDimsSerializer() + { + + } + + @Override + public void serialize(DataOutput out, TimeAndDims value) throws IOException + { + out.writeLong(value.getTimestamp()); + out.writeInt(value.getDims().length); + for (int[] dims : value.getDims()) { + if (dims == null) { + writeArr(EMPTY_INT_ARRAY, out); } else { - ex.addSuppressed(ioe); + writeArr(dims, out); } } } - aggBuffers.clear(); - if (ex != null) { - throw ex; + + @Override + public TimeAndDims deserialize(DataInput in, int available) throws IOException + { + final long timeStamp = in.readLong(); + final int[][] dims = new int[in.readInt()][]; + + for (int j = 0; j < dims.length; j++) { + dims[j] = readArr(in); + } + + return new TimeAndDims(timeStamp, dims); + } + + @Override + public int fixedSize() + { + return -1; + } + + private void writeArr(int[] value, DataOutput out) throws IOException + { + out.writeInt(value.length); + for (int v : value) { + out.writeInt(v); + } + } + + private int[] readArr(DataInput in) throws IOException + { + int len = in.readInt(); + if (len == 0) { + return EMPTY_INT_ARRAY; + } else { + int[] result = new int[len]; + for (int i = 0; i < len; i++) { + result[i] = in.readInt(); + } + return result; + } + } + } + + private static class TimeAndDimsBTreeKeySerializer extends BTreeKeySerializer implements Serializable + { + private final Comparator comparator; + + TimeAndDimsBTreeKeySerializer(final OffheapIncrementalIndex offheapIncrementalIndex) + { + this.comparator = new OffHeapTimeAndDimsComparator( + new Supplier>() + { + @Override + public List get() + { + return offheapIncrementalIndex.dimValues; + } + } + ); + } + + @Override + public void serialize(DataOutput out, int start, int end, Object[] keys) throws IOException + { + for (int i = start; i < end; i++) { + TimeAndDims timeAndDim = (TimeAndDims) keys[i]; + TimeAndDimsSerializer.INSTANCE.serialize(out, timeAndDim); + } + } + + @Override + public Object[] deserialize(DataInput in, int start, int end, int size) throws IOException + { + Object[] ret = new Object[size]; + for (int i = start; i < end; i++) { + ret[i] = TimeAndDimsSerializer.INSTANCE.deserialize(in, 0); + } + + return ret; + } + + @Override + public Comparator getComparator() + { + return comparator; + } + } + + private static class OffHeapTimeAndDimsComparator implements Comparator, Serializable + { + private transient Supplier> dimDimsSupplier; + + OffHeapTimeAndDimsComparator(Supplier> dimDimsSupplier) + { + this.dimDimsSupplier = dimDimsSupplier; + } + + @Override + public int compare(TimeAndDims o1, TimeAndDims o2) + { + return new TimeAndDimsComp(dimDimsSupplier.get()).compare(o1, o2); + } + } + + private static class DiskBBFactory + { + private final long increments; + private final MappedFileVolWrapper vol; + + private long offset = 0; + + public DiskBBFactory(File file, int chunkShift, AtomicLong availableSpace) + { + vol = new MappedFileVolWrapper(file, chunkShift, availableSpace); + increments = 1L< 0) { + return delegate.makeNewBuffer(offset); + } else { + sizeAvailable.addAndGet(chunkSize); + throw new RuntimeException("No more space left for allocating buffers."); + } + } + + @Override + public void truncate(long size) + { + delegate.truncate(size); + } + + @Override + public void close() + { + delegate.close(); + } + + @Override + public void sync() + { + } + + @Override + public void deleteFile() + { + delegate.deleteFile(); + } + + @Override + public File getFile() + { + return delegate.getFile(); + } +} diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 841e682d0ab4..07970f85256a 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -19,7 +19,6 @@ package io.druid.segment.data; -import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -33,7 +32,6 @@ import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; -import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.Row; import io.druid.data.input.impl.DimensionsSpec; @@ -70,7 +68,6 @@ import org.junit.runners.Parameterized; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -130,17 +127,7 @@ public IncrementalIndex createIndex(AggregatorFactory[] factories) public IncrementalIndex createIndex(AggregatorFactory[] factories) { return new OffheapIncrementalIndex( - 0L, QueryGranularity.NONE, factories, 1000000, - new StupidPool( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(256 * 1024); - } - } - ) + 0L, QueryGranularity.NONE, factories, 1L<<32 ); } } diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index 9d37eef772a9..d1519141ab53 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -19,12 +19,10 @@ package io.druid.segment.incremental; -import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.common.ISE; -import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.Row; import io.druid.data.input.impl.DimensionsSpec; @@ -45,7 +43,6 @@ import org.junit.runners.Parameterized; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -116,16 +113,7 @@ public IncrementalIndex createIndex() public IncrementalIndex createIndex() { return new OffheapIncrementalIndex( - schema, true, true, sortFacts, 1000000, new StupidPool( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(256 * 1024); - } - } - ) + schema, true, true, sortFacts, 1L << 32 ); } }