From 471fa3c62e3c40350e7bb06c2f7b3bb66daf7c09 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 20 Jul 2017 17:43:37 +0900 Subject: [PATCH 01/13] Array-based aggregation --- docs/content/querying/groupbyquery.md | 4 + ...singSketchMergeAggregatorFactoryTest.java} | 8 +- .../query/groupby/GroupByQueryConfig.java | 12 + ...er.java => AbstractBufferHashGrouper.java} | 25 +- .../epinephelinae/BufferArrayGrouper.java | 268 ++++++++++++++++++ ...ferGrouper.java => BufferHashGrouper.java} | 6 +- .../epinephelinae/ByteBufferHashTable.java | 2 +- .../ByteBufferMinMaxOffsetHeap.java | 4 +- .../epinephelinae/GroupByQueryEngineV2.java | 215 +++++++++----- .../query/groupby/epinephelinae/Groupers.java | 16 ++ ...per.java => LimitedBufferHashGrouper.java} | 6 +- .../epinephelinae/SpillingGrouper.java | 6 +- ...ngStringGroupByColumnSelectorStrategy.java | 24 ++ .../DoubleGroupByColumnSelectorStrategy.java | 12 + .../FloatGroupByColumnSelectorStrategy.java | 12 + .../column/GroupByColumnSelectorStrategy.java | 20 +- .../LongGroupByColumnSelectorStrategy.java | 12 + .../StringGroupByColumnSelectorStrategy.java | 16 ++ .../groupby/strategy/GroupByStrategyV2.java | 2 +- .../query/groupby/GroupByQueryRunnerTest.java | 4 +- ...erTest.java => BufferHashGrouperTest.java} | 8 +- ...java => LimitedBufferHashGrouperTest.java} | 12 +- 22 files changed, 571 insertions(+), 123 deletions(-) rename extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/{BufferGrouperUsingSketchMergeAggregatorFactoryTest.java => BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java} (92%) rename processing/src/main/java/io/druid/query/groupby/epinephelinae/{AbstractBufferGrouper.java => AbstractBufferHashGrouper.java} (87%) create mode 100644 processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java rename processing/src/main/java/io/druid/query/groupby/epinephelinae/{BufferGrouper.java => BufferHashGrouper.java} (97%) rename processing/src/main/java/io/druid/query/groupby/epinephelinae/{LimitedBufferGrouper.java => LimitedBufferHashGrouper.java} (99%) rename processing/src/test/java/io/druid/query/groupby/epinephelinae/{BufferGrouperTest.java => BufferHashGrouperTest.java} (97%) rename processing/src/test/java/io/druid/query/groupby/epinephelinae/{LimitedBufferGrouperTest.java => LimitedBufferHashGrouperTest.java} (93%) diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index 32bbbf42492c..fc323217ec06 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -153,6 +153,9 @@ threads. You can adjust this as necessary to balance concurrency and memory usag historical nodes. - groupBy v1 supports using [chunkPeriod](query-context.html) to parallelize merging on the broker, whereas groupBy v2 ignores chunkPeriod. +- groupBy v2 supports both array-based aggregation and hash-based aggregation. The array-based aggregation is used only +when the grouping key is a single indexed string column. In array-based aggregation, the dictionary-encoded value is used +as the index, so the aggregated values in the array can be accessed directly without finding buckets based on hashing. #### Memory tuning and resource limits @@ -246,6 +249,7 @@ When using the "v2" strategy, the following query context parameters apply: |`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.| |`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.| |`forcePushDownLimit`|When all fields in the orderby are part of the grouping key, the broker will push limit application down to the historical nodes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.| +|`forceHashAggregation`|Force to use hash-based aggregation.| When using the "v1" strategy, the following query context parameters apply: diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java similarity index 92% rename from extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java rename to extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java index c70b60e42282..ef4cf90a8de0 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferGrouperUsingSketchMergeAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/BufferHashGrouperUsingSketchMergeAggregatorFactoryTest.java @@ -27,7 +27,7 @@ import io.druid.data.input.MapBasedRow; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; -import io.druid.query.groupby.epinephelinae.BufferGrouper; +import io.druid.query.groupby.epinephelinae.BufferHashGrouper; import io.druid.query.groupby.epinephelinae.Grouper; import io.druid.query.groupby.epinephelinae.GrouperTestUtil; import io.druid.query.groupby.epinephelinae.TestColumnSelectorFactory; @@ -36,15 +36,15 @@ import java.nio.ByteBuffer; -public class BufferGrouperUsingSketchMergeAggregatorFactoryTest +public class BufferHashGrouperUsingSketchMergeAggregatorFactoryTest { - private static BufferGrouper makeGrouper( + private static BufferHashGrouper makeGrouper( TestColumnSelectorFactory columnSelectorFactory, int bufferSize, int initialBuckets ) { - final BufferGrouper grouper = new BufferGrouper<>( + final BufferHashGrouper grouper = new BufferHashGrouper<>( Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)), GrouperTestUtil.intKeySerde(), columnSelectorFactory, diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java index 0e1516b1c71c..230acbd0a073 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java @@ -36,6 +36,7 @@ public class GroupByQueryConfig private static final String CTX_KEY_BUFFER_GROUPER_MAX_SIZE = "bufferGrouperMaxSize"; private static final String CTX_KEY_MAX_ON_DISK_STORAGE = "maxOnDiskStorage"; private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize"; + private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation"; @JsonProperty private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2; @@ -70,6 +71,9 @@ public class GroupByQueryConfig @JsonProperty private boolean forcePushDownLimit = false; + @JsonProperty + private boolean forceHashAggregation = false; + public String getDefaultStrategy() { return defaultStrategy; @@ -134,6 +138,11 @@ public boolean isForcePushDownLimit() { return forcePushDownLimit; } + + public boolean isForceHashAggregation() + { + return forceHashAggregation; + } public GroupByQueryConfig withOverrides(final GroupByQuery query) { @@ -169,6 +178,7 @@ public GroupByQueryConfig withOverrides(final GroupByQuery query) getMaxMergingDictionarySize() ); newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit()); + newConfig.forceHashAggregation = query.getContextBoolean(CTX_KEY_FORCE_HASH_AGGREGATION, isForceHashAggregation()); return newConfig; } @@ -185,6 +195,8 @@ public String toString() ", bufferGrouperInitialBuckets=" + bufferGrouperInitialBuckets + ", maxMergingDictionarySize=" + maxMergingDictionarySize + ", maxOnDiskStorage=" + maxOnDiskStorage + + ", forcePushDownLimit=" + forcePushDownLimit + + ", forceHashAggregation=" + forceHashAggregation + '}'; } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java similarity index 87% rename from processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferGrouper.java rename to processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java index 7d847b2b06e4..9f924aa21c39 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java @@ -28,21 +28,10 @@ import java.nio.ByteBuffer; -public abstract class AbstractBufferGrouper implements Grouper +public abstract class AbstractBufferHashGrouper implements Grouper { - private static final AggregateResult DICTIONARY_FULL = AggregateResult.failure( - "Not enough dictionary space to execute this query. Try increasing " - + "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting " - + "druid.query.groupBy.maxOnDiskStorage to a positive number." - ); - private static final AggregateResult HASHTABLE_FULL = AggregateResult.failure( - "Not enough aggregation table space to execute this query. Try increasing " - + "druid.processing.buffer.sizeBytes or enable disk spilling by setting " - + "druid.query.groupBy.maxOnDiskStorage to a positive number." - ); - protected static final int HASH_SIZE = Ints.BYTES; - protected static final Logger log = new Logger(AbstractBufferGrouper.class); + protected static final Logger log = new Logger(AbstractBufferHashGrouper.class); protected final Supplier bufferSupplier; protected final KeySerde keySerde; @@ -61,7 +50,7 @@ public abstract class AbstractBufferGrouper implements Grouper protected ByteBufferHashTable hashTable; protected ByteBuffer hashTableBuffer; // buffer for the entire hash table (total space, not individual growth) - public AbstractBufferGrouper( + public AbstractBufferHashGrouper( final Supplier bufferSupplier, final KeySerde keySerde, final AggregatorFactory[] aggregatorFactories, @@ -77,7 +66,7 @@ public AbstractBufferGrouper( } /** - * Called when a new bucket is used for an entry in the hash table. An implementing BufferGrouper class + * Called when a new bucket is used for an entry in the hash table. An implementing BufferHashGrouper class * can use this to update its own state, e.g. tracking bucket offsets in a structure outside of the hash table. * * @param bucketOffset offset of the new bucket, within the buffer returned by hashTable.getTableBuffer() @@ -95,7 +84,7 @@ public AbstractBufferGrouper( public abstract boolean canSkipAggregate(boolean bucketWasUsed, int bucketOffset); /** - * Called after a row is aggregated. An implementing BufferGrouper class can use this to update + * Called after a row is aggregated. An implementing BufferHashGrouper class can use this to update * its own state, e.g. reading the new aggregated values for the row's key and acting on that information. * * @param bucketOffset Offset of the bucket containing the row that was aggregated, @@ -134,7 +123,7 @@ public AggregateResult aggregate(KeyType key, int keyHash) if (keyBuffer == null) { // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will // be correct. - return DICTIONARY_FULL; + return Groupers.DICTIONARY_FULL; } if (keyBuffer.remaining() != keySize) { @@ -150,7 +139,7 @@ public AggregateResult aggregate(KeyType key, int keyHash) if (bucket < 0) { // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will // be correct. - return HASHTABLE_FULL; + return Groupers.BUFFER_OVERFLOW; } final int bucketStartOffset = hashTable.getOffsetForBucket(bucket); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java new file mode 100644 index 000000000000..568583d76a8e --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -0,0 +1,268 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby.epinephelinae; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.primitives.Ints; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.logger.Logger; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.ColumnSelectorFactory; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A grouper for array-based aggregation. The array consists of records. The memory format of the record is like below. + * + * +---------------+----------+--------------------+--------------------+-----+ + * | used flag (4) | key (4) | agg_1 (fixed size) | agg_2 (fixed size) | ... | + * +---------------+----------+--------------------+--------------------+-----+ + */ +public class BufferArrayGrouper implements Grouper +{ + private static final Logger LOG = new Logger(BufferArrayGrouper.class); + private static final int USED_FLAG_SIZE = Ints.BYTES; + + private final Supplier bufferSupplier; + private final KeySerde keySerde; + private final int keySize; // key(int) + used flag(int) + private final BufferAggregator[] aggregators; + private final int[] aggregatorOffsets; + private final int cardinality; + private final int recordSize; // keySize + size of all aggregated values + + private boolean initialized = false; + private ByteBuffer keyBuffer; + + public BufferArrayGrouper( + final Supplier bufferSupplier, + final KeySerde keySerde, + final ColumnSelectorFactory columnSelectorFactory, + final AggregatorFactory[] aggregatorFactories, + final int cardinality + ) + { + Preconditions.checkNotNull(aggregatorFactories, "aggregatorFactories"); + Preconditions.checkArgument(cardinality > 0, "Cardinality must a non-zero positive number"); + + this.bufferSupplier = Preconditions.checkNotNull(bufferSupplier, "bufferSupplier"); + this.keySerde = Preconditions.checkNotNull(keySerde, "keySerde"); + this.keySize = keySerde.keySize() + USED_FLAG_SIZE; + this.aggregators = new BufferAggregator[aggregatorFactories.length]; + this.aggregatorOffsets = new int[aggregatorFactories.length]; + this.cardinality = cardinality; + + int offset = 0; + for (int i = 0; i < aggregatorFactories.length; i++) { + aggregators[i] = aggregatorFactories[i].factorizeBuffered(columnSelectorFactory); + aggregatorOffsets[i] = offset; + offset += aggregatorFactories[i].getMaxIntermediateSize(); + } + recordSize = keySize + offset; + } + + @Override + public void init() + { + if (!initialized) { + keyBuffer = bufferSupplier.get(); + + reset(); + + initialized = true; + } + } + + @Override + public boolean isInitialized() + { + return initialized; + } + + @Override + public AggregateResult aggregate(KeyType key, int dimIndex) + { + final ByteBuffer fromKey = keySerde.toByteBuffer(key); + if (fromKey == null) { + // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will + // be correct. + return Groupers.DICTIONARY_FULL; + } + + if (fromKey.remaining() != keySerde.keySize()) { + throw new IAE( + "keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!", + fromKey.remaining(), + keySerde.keySize() + ); + } + + final int recordOffset = dimIndex * recordSize; + + if (recordOffset + recordSize > keyBuffer.capacity()) { + // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will + // be correct. + return Groupers.BUFFER_OVERFLOW; + } + + if (!isUsedKey(keyBuffer, recordOffset)) { + this.keyBuffer.position(recordOffset); + this.keyBuffer.putInt(Groupers.getUsedFlag(dimIndex)); + this.keyBuffer.put(fromKey); + } + + final int baseOffset = recordOffset + keySize; + for (int i = 0; i < aggregators.length; i++) { + aggregators[i].aggregate(keyBuffer, baseOffset + aggregatorOffsets[i]); + } + + return AggregateResult.ok(); + } + + @Override + public AggregateResult aggregate(KeyType key) + { + // BufferArrayGrouper is used only for dictionary-indexed single-value string dimensions. + // Here, the key contains the dictionary-encoded value of the grouping key + // and we use it as the index for the aggregation array. + final ByteBuffer fromKey = keySerde.toByteBuffer(key); + final int dimIndex = fromKey.getInt(); + fromKey.rewind(); + return aggregate(key, dimIndex); + } + + private static boolean isUsedKey(ByteBuffer buffer, int pos) + { + return (buffer.get(pos) & 0x80) == 0x80; + } + + @Override + public void reset() + { + for (int i = 0; i < cardinality; i++) { + keyBuffer.putInt(i * recordSize, 0); + final int baseOffset = i * recordSize + keySize; + for (int j = 0; j < aggregators.length; ++j) { + aggregators[j].init(keyBuffer, baseOffset + aggregatorOffsets[j]); + } + } + } + + @Override + public void close() + { + for (BufferAggregator aggregator : aggregators) { + try { + aggregator.close(); + } + catch (Exception e) { + LOG.warn(e, "Could not close aggregator [%s], skipping.", aggregator); + } + } + } + + @Override + public Iterator> iterator(boolean sorted) + { + return sorted ? sortedIterator() : plainIterator(); + } + + private Iterator> sortedIterator() + { + // Sorted iterator is currently not used because there is no way to get grouping key's cardinality when merging + // partial aggregation result in brokers and even data nodes (historicals and realtimes). + // However, it should be used in the future. + final BufferComparator comparator = keySerde.bufferComparator(); + final List wrappedOffsets = IntStream.range(0, cardinality).boxed().collect(Collectors.toList()); + wrappedOffsets.sort( + (lhs, rhs) -> comparator.compare(keyBuffer, keyBuffer, lhs + USED_FLAG_SIZE, rhs + USED_FLAG_SIZE) + ); + + return new ResultIterator(wrappedOffsets); + } + + private Iterator> plainIterator() + { + return new ResultIterator( + IntStream.range(0, cardinality).boxed().collect(Collectors.toList()) + ); + } + + private class ResultIterator implements Iterator> + { + private static final int NOT_FOUND = -1; + + private final Iterator keyIndexIterator; + private int cur; + private boolean needFindNext; + + ResultIterator(Collection keyOffsets) + { + keyIndexIterator = keyOffsets.iterator(); + cur = NOT_FOUND; + needFindNext = true; + } + + private int findNextKeyIndex() + { + while (keyIndexIterator.hasNext()) { + final int index = keyIndexIterator.next(); + if (isUsedKey(keyBuffer, index * recordSize)) { + return index; + } + } + return NOT_FOUND; + } + + @Override + public boolean hasNext() + { + if (needFindNext) { + cur = findNextKeyIndex(); + needFindNext = false; + } + return cur > NOT_FOUND; + } + + @Override + public Entry next() + { + if (cur == NOT_FOUND) { + throw new NoSuchElementException(); + } + + needFindNext = true; + final int baseOffset = cur * recordSize + keySize; + final Object[] values = new Object[aggregators.length]; + for (int i = 0; i < aggregators.length; i++) { + values[i] = aggregators[i].get(keyBuffer, baseOffset + aggregatorOffsets[i]); + } + return new Entry<>(keySerde.fromByteBuffer(keyBuffer, cur * recordSize + USED_FLAG_SIZE), values); + } + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java similarity index 97% rename from processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java rename to processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java index cb738ed4d0cd..4b709e3a0dba 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java @@ -35,9 +35,9 @@ import java.util.List; import java.util.NoSuchElementException; -public class BufferGrouper extends AbstractBufferGrouper +public class BufferHashGrouper extends AbstractBufferHashGrouper { - private static final Logger log = new Logger(BufferGrouper.class); + private static final Logger log = new Logger(BufferHashGrouper.class); private static final int MIN_INITIAL_BUCKETS = 4; private static final int DEFAULT_INITIAL_BUCKETS = 1024; private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f; @@ -51,7 +51,7 @@ public class BufferGrouper extends AbstractBufferGrouper private ByteBuffer offsetListBuffer; private ByteBufferIntList offsetList; - public BufferGrouper( + public BufferHashGrouper( final Supplier bufferSupplier, final KeySerde keySerde, final ColumnSelectorFactory columnSelectorFactory, diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ByteBufferHashTable.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ByteBufferHashTable.java index cd83b229c355..4723f831dba2 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ByteBufferHashTable.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ByteBufferHashTable.java @@ -238,7 +238,7 @@ protected void initializeNewBucketKey( { int offset = bucket * bucketSizeWithHash; tableBuffer.position(offset); - tableBuffer.putInt(keyHash | 0x80000000); + tableBuffer.putInt(Groupers.getUsedFlag(keyHash)); tableBuffer.put(keyBuffer); size++; diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java index ea203a660514..d352097de2d6 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ByteBufferMinMaxOffsetHeap.java @@ -41,7 +41,7 @@ public class ByteBufferMinMaxOffsetHeap private final Comparator maxComparator; private final ByteBuffer buf; private final int limit; - private final LimitedBufferGrouper.BufferGrouperOffsetHeapIndexUpdater heapIndexUpdater; + private final LimitedBufferHashGrouper.BufferGrouperOffsetHeapIndexUpdater heapIndexUpdater; private int heapSize; @@ -49,7 +49,7 @@ public ByteBufferMinMaxOffsetHeap( ByteBuffer buf, int limit, Comparator minComparator, - LimitedBufferGrouper.BufferGrouperOffsetHeapIndexUpdater heapIndexUpdater + LimitedBufferHashGrouper.BufferGrouperOffsetHeapIndexUpdater heapIndexUpdater ) { this.buf = buf; diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index e14d91729eef..89b0723b0163 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -113,6 +113,13 @@ public static Sequence process( null ); + final boolean allSingleValueDims = query.getDimensions().stream() + .noneMatch(dimension -> { + final ColumnCapabilities columnCapabilities = + storageAdapter.getColumnCapabilities(dimension.getDimension()); + return columnCapabilities == null || + columnCapabilities.hasMultipleValues(); + }); final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); @@ -150,7 +157,9 @@ public GroupByEngineIterator make() cursor, bufferHolder.get(), fudgeTimestamp, - createGroupBySelectorPlus(selectorPlus) + createGroupBySelectorPlus(selectorPlus), + storageAdapter::getDimensionCardinality, + allSingleValueDims ); } @@ -220,6 +229,8 @@ private static class GroupByEngineIterator implements Iterator, Closeable private int stackp = Integer.MIN_VALUE; private boolean currentRowWasPartiallyAggregated = false; private CloseableGrouperIterator delegate = null; + private final Function cardinalityFunction; // dimension name -> cardinality + private final boolean allSingleValueDims; public GroupByEngineIterator( final GroupByQuery query, @@ -227,7 +238,9 @@ public GroupByEngineIterator( final Cursor cursor, final ByteBuffer buffer, final DateTime fudgeTimestamp, - final GroupByColumnSelectorPlus[] dims + final GroupByColumnSelectorPlus[] dims, + final Function cardinalityFunction, + final boolean allSingleValueDims ) { final int dimCount = query.getDimensions().size(); @@ -244,38 +257,151 @@ public GroupByEngineIterator( // Time is the same for every row in the cursor this.timestamp = fudgeTimestamp != null ? fudgeTimestamp : cursor.getTime(); + this.cardinalityFunction = cardinalityFunction; + this.allSingleValueDims = allSingleValueDims; + } + + private CloseableGrouperIterator initNewDelegate() + { + final Grouper grouper = newGrouper(); + grouper.init(); + + if (allSingleValueDims) { + aggregateSingleValueDims(grouper); + } else { + aggregateMultiValueDims(grouper); + } + + return new CloseableGrouperIterator<>( + grouper, + false, + entry -> { + Map theMap = Maps.newLinkedHashMap(); + + // Add dimensions. + for (GroupByColumnSelectorPlus selectorPlus : dims) { + selectorPlus.getColumnSelectorStrategy().processValueFromGroupingKey( + selectorPlus, + entry.getKey(), + theMap + ); + } + + convertRowTypesToOutputTypes(query.getDimensions(), theMap); + + // Add aggregations. + for (int i = 0; i < entry.getValues().length; i++) { + theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]); + } + + return new MapBasedRow(timestamp, theMap); + }, + grouper + ); } @Override public Row next() { - if (delegate != null && delegate.hasNext()) { - return delegate.next(); + if (delegate == null || !delegate.hasNext()) { + throw new NoSuchElementException(); } - if (cursor.isDone()) { - throw new NoSuchElementException(); + return delegate.next(); + } + + @Override + public boolean hasNext() + { + if (delegate != null && delegate.hasNext()) { + return true; + } else { + if (!cursor.isDone()) { + if (delegate != null) { + delegate.close(); + } + delegate = initNewDelegate(); + return true; + } else { + return false; + } } + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } - // Make a new delegate iterator + @Override + public void close() + { if (delegate != null) { delegate.close(); - delegate = null; } + } - final Grouper grouper = new BufferGrouper<>( + private Grouper newGrouper() + { + final AggregatorFactory[] aggregatorFactories = query.getAggregatorSpecs() + .toArray( + new AggregatorFactory[query.getAggregatorSpecs().size()] + ); + + if (!querySpecificConfig.isForceHashAggregation()) { + if (dims.length == 1) { + final ColumnCapabilities columnCapabilities = cursor.getColumnCapabilities(dims[0].getName()); + + @SuppressWarnings("ConstantConditions") + final int cardinality = cardinalityFunction.apply(dims[0].getName()); + + // Choose array-based aggregation if the grouping key is a single string dimension of a known cardinality + if ((columnCapabilities == null || columnCapabilities.getType().equals(ValueType.STRING)) && + cardinality > 0) { + return new BufferArrayGrouper<>( + Suppliers.ofInstance(buffer), + keySerde, + cursor, + aggregatorFactories, + cardinality + ); + } + } + } + + return new BufferHashGrouper<>( Suppliers.ofInstance(buffer), keySerde, cursor, - query.getAggregatorSpecs() - .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]), + aggregatorFactories, querySpecificConfig.getBufferGrouperMaxSize(), querySpecificConfig.getBufferGrouperMaxLoadFactor(), querySpecificConfig.getBufferGrouperInitialBuckets() ); - grouper.init(); + } -outer: + private void aggregateSingleValueDims(Grouper grouper) + { + while (!cursor.isDone()) { + for (int i = 0; i < dims.length; i++) { + final GroupByColumnSelectorStrategy strategy = dims[i].getColumnSelectorStrategy(); + strategy.writeToKeyBuffer( + dims[i].getKeyBufferPosition(), + strategy.getOnlyValue(dims[i].getSelector()), + keyBuffer + ); + } + keyBuffer.rewind(); + if (!grouper.aggregate(keyBuffer).isOk()) { + return; + } + cursor.advance(); + } + } + + private void aggregateMultiValueDims(Grouper grouper) + { while (!cursor.isDone()) { if (!currentRowWasPartiallyAggregated) { // Set up stack, valuess, and first grouping in keyBuffer for this row @@ -307,7 +433,7 @@ public Row next() if (!grouper.aggregate(keyBuffer).isOk()) { // Buffer full while aggregating; break out and resume later currentRowWasPartiallyAggregated = true; - break outer; + return; } doAggregate = false; } @@ -344,67 +470,6 @@ public Row next() cursor.advance(); currentRowWasPartiallyAggregated = false; } - - delegate = new CloseableGrouperIterator<>( - grouper, - false, - new Function, Row>() - { - @Override - public Row apply(final Grouper.Entry entry) - { - Map theMap = Maps.newLinkedHashMap(); - - // Add dimensions. - for (GroupByColumnSelectorPlus selectorPlus : dims) { - selectorPlus.getColumnSelectorStrategy().processValueFromGroupingKey( - selectorPlus, - entry.getKey(), - theMap - ); - } - - convertRowTypesToOutputTypes(query.getDimensions(), theMap); - - // Add aggregations. - for (int i = 0; i < entry.getValues().length; i++) { - theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]); - } - - return new MapBasedRow(timestamp, theMap); - } - }, - new Closeable() - { - @Override - public void close() throws IOException - { - grouper.close(); - } - } - ); - - return delegate.next(); - } - - @Override - public boolean hasNext() - { - return (delegate != null && delegate.hasNext()) || !cursor.isDone(); - } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - - @Override - public void close() - { - if (delegate != null) { - delegate.close(); - } } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Groupers.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Groupers.java index 2324f0f3779d..b7dba3e92b5c 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Groupers.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Groupers.java @@ -31,6 +31,17 @@ private Groupers() // No instantiation } + static final AggregateResult DICTIONARY_FULL = AggregateResult.failure( + "Not enough dictionary space to execute this query. Try increasing " + + "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting " + + "druid.query.groupBy.maxOnDiskStorage to a positive number." + ); + static final AggregateResult BUFFER_OVERFLOW = AggregateResult.failure( + "Not enough aggregation buffer space to execute this query. Try increasing " + + "druid.processing.buffer.sizeBytes or enable disk spilling by setting " + + "druid.query.groupBy.maxOnDiskStorage to a positive number." + ); + private static final int C1 = 0xcc9e2d51; private static final int C2 = 0x1b873593; @@ -55,6 +66,11 @@ public static int hash(final Object obj) } + static int getUsedFlag(int keyHash) + { + return keyHash | 0x80000000; + } + public static Iterator> mergeIterators( final Iterable>> iterators, final Comparator> keyTypeComparator diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java similarity index 99% rename from processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferGrouper.java rename to processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java index dd6442835bab..0b10cc293430 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java @@ -35,7 +35,7 @@ import java.util.List; import java.util.NoSuchElementException; -public class LimitedBufferGrouper extends AbstractBufferGrouper +public class LimitedBufferHashGrouper extends AbstractBufferHashGrouper { private static final int MIN_INITIAL_BUCKETS = 4; private static final int DEFAULT_INITIAL_BUCKETS = 1024; @@ -51,7 +51,7 @@ public class LimitedBufferGrouper extends AbstractBufferGrouper extends AbstractBufferGrouper bufferSupplier, final Grouper.KeySerde keySerde, final ColumnSelectorFactory columnSelectorFactory, diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java index 41d8bdba178b..e8f1de3d0ea9 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -46,7 +46,7 @@ import java.util.List; /** - * Grouper based around a single underlying {@link BufferGrouper}. Not thread-safe. + * Grouper based around a single underlying {@link BufferHashGrouper}. Not thread-safe. * * When the underlying grouper is full, its contents are sorted and written to temporary files using "spillMapper". */ @@ -88,7 +88,7 @@ public SpillingGrouper( this.keyObjComparator = keySerdeFactory.objectComparator(false); this.defaultOrderKeyObjComparator = keySerdeFactory.objectComparator(true); if (limitSpec != null) { - this.grouper = new LimitedBufferGrouper<>( + this.grouper = new LimitedBufferHashGrouper<>( bufferSupplier, keySerde, columnSelectorFactory, @@ -100,7 +100,7 @@ public SpillingGrouper( sortHasNonGroupingFields ); } else { - this.grouper = new BufferGrouper<>( + this.grouper = new BufferHashGrouper<>( bufferSupplier, keySerde, columnSelectorFactory, diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java index 528f432baecf..503f80b6bd2e 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java @@ -19,6 +19,7 @@ package io.druid.query.groupby.epinephelinae.column; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import io.druid.segment.ColumnValueSelector; import io.druid.segment.DimensionSelector; @@ -83,4 +84,27 @@ public void initColumnValues(ColumnValueSelector selector, int columnIndex, Obje } valuess[columnIndex] = ArrayBasedIndexedInts.of(newIds); } + + @Override + public Object getOnlyValue(ColumnValueSelector selector) + { + final DimensionSelector dimSelector = (DimensionSelector) selector; + final IndexedInts row = dimSelector.getRow(); + + Preconditions.checkState(row.size() < 2, "Not supported for multi-value dimensions"); + + if (row.size() == 0) { + return GROUP_BY_MISSING_VALUE; + } + + final String value = dimSelector.lookupName(row.get(0)); + final int dictId = reverseDictionary.getInt(value); + if (dictId < 0) { + dictionary.add(value); + reverseDictionary.put(value, nextId); + return nextId++; + } else { + return dictId; + } + } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java index 3979f53dd2db..847fb092e4c7 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/DoubleGroupByColumnSelectorStrategy.java @@ -67,4 +67,16 @@ public boolean checkRowIndexAndAddValueToGroupingKey( // this method handles row values after the first in a multivalued row, so just return false return false; } + + @Override + public Object getOnlyValue(ColumnValueSelector selector) + { + return ((DoubleColumnSelector) selector).get(); + } + + @Override + public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer) + { + keyBuffer.putDouble(keyBufferPosition, (Double) obj); + } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java index 3e63be4d59d6..f29338a9b188 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/FloatGroupByColumnSelectorStrategy.java @@ -50,6 +50,18 @@ public void initColumnValues(ColumnValueSelector selector, int columnIndex, Obje valuess[columnIndex] = ((FloatColumnSelector) selector).get(); } + @Override + public Object getOnlyValue(ColumnValueSelector selector) + { + return ((FloatColumnSelector) selector).get(); + } + + @Override + public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer) + { + keyBuffer.putFloat(keyBufferPosition, (Float) obj); + } + @Override public void initGroupingKeyColumnValue( int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java index 7a11258af7e8..4b9f54cf4ad8 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java @@ -65,7 +65,7 @@ void processValueFromGroupingKey( ); /** - * Retrieve a row object from the ColumnSelectorPlus and put it in valuess at columnIndex. + * Retrieve a row object from the {@link ColumnValueSelector} and put it in valuess at columnIndex. * * @param selector Value selector for a column. * @param columnIndex Index of the column within the row values array @@ -101,4 +101,22 @@ void processValueFromGroupingKey( * @return true if rowValIdx < size of rowObj, false otherwise */ boolean checkRowIndexAndAddValueToGroupingKey(int keyBufferPosition, Object rowObj, int rowValIdx, ByteBuffer keyBuffer); + + /** + * Retrieve a single object using the {@link ColumnValueSelector}. The reading column must have a single value. + * + * @param selector Value selector for a column + * + * @return an object retrieved from the column + */ + Object getOnlyValue(ColumnValueSelector selector); + + /** + * Write a given object to the keyBuffer at keyBufferPosition. + * + * @param keyBufferPosition starting offset for this column's value within the grouping key + * @param obj row value object retrieved from {@link #getOnlyValue(ColumnValueSelector)} + * @param keyBuffer grouping key + */ + void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java index 2e5fed321941..178828076c69 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/LongGroupByColumnSelectorStrategy.java @@ -50,6 +50,18 @@ public void initColumnValues(ColumnValueSelector selector, int columnIndex, Obje valuess[columnIndex] = ((LongColumnSelector) selector).get(); } + @Override + public Object getOnlyValue(ColumnValueSelector selector) + { + return ((LongColumnSelector) selector).get(); + } + + @Override + public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer) + { + keyBuffer.putLong(keyBufferPosition, (Long) obj); + } + @Override public void initGroupingKeyColumnValue( int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java index c531437b2b16..ae738724e771 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java @@ -19,6 +19,7 @@ package io.druid.query.groupby.epinephelinae.column; +import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; import io.druid.segment.ColumnValueSelector; import io.druid.segment.DimensionSelector; @@ -61,6 +62,21 @@ public void initColumnValues(ColumnValueSelector selector, int columnIndex, Obje valuess[columnIndex] = row; } + @Override + public Object getOnlyValue(ColumnValueSelector selector) + { + final DimensionSelector dimSelector = (DimensionSelector) selector; + final IndexedInts row = dimSelector.getRow(); + Preconditions.checkState(row.size() < 2, "Not supported for multi-value dimensions"); + return row.size() == 1 ? row.get(0) : GROUP_BY_MISSING_VALUE; + } + + @Override + public void writeToKeyBuffer(int keyBufferPosition, Object obj, ByteBuffer keyBuffer) + { + keyBuffer.putInt(keyBufferPosition, (int) obj); + } + @Override public void initGroupingKeyColumnValue(int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack) { diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index add5387e75bf..6381fdebd323 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -287,7 +287,7 @@ public Row apply(final Row row) } ); - // Don't apply limit here for inner results, that will be pushed down to the BufferGrouper + // Don't apply limit here for inner results, that will be pushed down to the BufferHashGrouper if (query.getContextBoolean(CTX_KEY_OUTERMOST, true)) { return query.postProcess(rowSequence); } else { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index b32fd977a114..5109a7e354eb 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -1401,7 +1401,7 @@ public void testGroupByMaxOnDiskStorageContextOverride() List expectedResults = null; if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Not enough aggregation table space to execute this query"); + expectedException.expectMessage("Not enough aggregation buffer space to execute this query"); } else { expectedResults = Arrays.asList( GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), @@ -1579,7 +1579,7 @@ public void testSubqueryWithOuterMaxOnDiskStorageContextOverride() GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } else { expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Not enough aggregation table space to execute this query"); + expectedException.expectMessage("Not enough aggregation buffer space to execute this query"); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } } diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferGrouperTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java similarity index 97% rename from processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferGrouperTest.java rename to processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java index e7ca10b7171d..e28ed1aebc3a 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferGrouperTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferHashGrouperTest.java @@ -43,7 +43,7 @@ import java.util.Comparator; import java.util.List; -public class BufferGrouperTest +public class BufferHashGrouperTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -52,7 +52,7 @@ public class BufferGrouperTest public void testSimple() { final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); - final Grouper grouper = new BufferGrouper<>( + final Grouper grouper = new BufferHashGrouper<>( Suppliers.ofInstance(ByteBuffer.allocate(1000)), GrouperTestUtil.intKeySerde(), columnSelectorFactory, @@ -182,7 +182,7 @@ public void testNoGrowing() Assert.assertEquals(expected, Lists.newArrayList(grouper.iterator(true))); } - private BufferGrouper makeGrouper( + private BufferHashGrouper makeGrouper( TestColumnSelectorFactory columnSelectorFactory, int bufferSize, int initialBuckets @@ -197,7 +197,7 @@ private BufferGrouper makeGrouper( throw Throwables.propagate(e); } - final BufferGrouper grouper = new BufferGrouper<>( + final BufferHashGrouper grouper = new BufferHashGrouper<>( Suppliers.ofInstance(buffer), GrouperTestUtil.intKeySerde(), columnSelectorFactory, diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/LimitedBufferGrouperTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java similarity index 93% rename from processing/src/test/java/io/druid/query/groupby/epinephelinae/LimitedBufferGrouperTest.java rename to processing/src/test/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java index 7cd746379ee0..cdab2215801e 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/LimitedBufferGrouperTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouperTest.java @@ -35,7 +35,7 @@ import java.nio.ByteBuffer; import java.util.List; -public class LimitedBufferGrouperTest +public class LimitedBufferHashGrouperTest { @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -46,7 +46,7 @@ public void testLimitAndBufferSwapping() final int limit = 100; final int keyBase = 100000; final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); - final LimitedBufferGrouper grouper = makeGrouper(columnSelectorFactory, 20000, 2, limit); + final LimitedBufferHashGrouper grouper = makeGrouper(columnSelectorFactory, 20000, 2, limit); final int numRows = 1000; columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); @@ -100,7 +100,7 @@ public void testBufferTooSmall() { expectedException.expect(IAE.class); final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); - final LimitedBufferGrouper grouper = makeGrouper(columnSelectorFactory, 10, 2, 100); + final LimitedBufferHashGrouper grouper = makeGrouper(columnSelectorFactory, 10, 2, 100); } @Test @@ -109,7 +109,7 @@ public void testMinBufferSize() final int limit = 100; final int keyBase = 100000; final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); - final LimitedBufferGrouper grouper = makeGrouper(columnSelectorFactory, 11716, 2, limit); + final LimitedBufferHashGrouper grouper = makeGrouper(columnSelectorFactory, 11716, 2, limit); final int numRows = 1000; columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); @@ -146,14 +146,14 @@ public void testMinBufferSize() Assert.assertEquals(expected, Lists.newArrayList(grouper.iterator(true))); } - private static LimitedBufferGrouper makeGrouper( + private static LimitedBufferHashGrouper makeGrouper( TestColumnSelectorFactory columnSelectorFactory, int bufferSize, int initialBuckets, int limit ) { - LimitedBufferGrouper grouper = new LimitedBufferGrouper<>( + LimitedBufferHashGrouper grouper = new LimitedBufferHashGrouper<>( Suppliers.ofInstance(ByteBuffer.allocate(bufferSize)), GrouperTestUtil.intKeySerde(), columnSelectorFactory, From 400357ca78e54b8144a35ee253357caf334e897a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 20 Jul 2017 19:04:35 +0900 Subject: [PATCH 02/13] Fix handling missing grouping key --- .../groupby/epinephelinae/BufferArrayGrouper.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index 568583d76a8e..9af2c568ca62 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -26,6 +26,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.groupby.epinephelinae.column.StringGroupByColumnSelectorStrategy; import io.druid.segment.ColumnSelectorFactory; import java.nio.ByteBuffer; @@ -37,7 +38,9 @@ import java.util.stream.IntStream; /** - * A grouper for array-based aggregation. The array consists of records. The memory format of the record is like below. + * A grouper for array-based aggregation. The array consists of records. The first record is to store + * {@link StringGroupByColumnSelectorStrategy#GROUP_BY_MISSING_VALUE}. + * The memory format of the record is like below. * * +---------------+----------+--------------------+--------------------+-----+ * | used flag (4) | key (4) | agg_1 (fixed size) | agg_2 (fixed size) | ... | @@ -151,7 +154,7 @@ public AggregateResult aggregate(KeyType key) // Here, the key contains the dictionary-encoded value of the grouping key // and we use it as the index for the aggregation array. final ByteBuffer fromKey = keySerde.toByteBuffer(key); - final int dimIndex = fromKey.getInt(); + final int dimIndex = fromKey.getInt() + 1; // the first index is for missing value fromKey.rewind(); return aggregate(key, dimIndex); } @@ -164,7 +167,8 @@ private static boolean isUsedKey(ByteBuffer buffer, int pos) @Override public void reset() { - for (int i = 0; i < cardinality; i++) { + keyBuffer.putInt(0, 0); // for missing value + for (int i = 1; i < cardinality + 1; i++) { keyBuffer.putInt(i * recordSize, 0); final int baseOffset = i * recordSize + keySize; for (int j = 0; j < aggregators.length; ++j) { @@ -198,7 +202,7 @@ private Iterator> sortedIterator() // partial aggregation result in brokers and even data nodes (historicals and realtimes). // However, it should be used in the future. final BufferComparator comparator = keySerde.bufferComparator(); - final List wrappedOffsets = IntStream.range(0, cardinality).boxed().collect(Collectors.toList()); + final List wrappedOffsets = IntStream.range(0, cardinality + 1).boxed().collect(Collectors.toList()); wrappedOffsets.sort( (lhs, rhs) -> comparator.compare(keyBuffer, keyBuffer, lhs + USED_FLAG_SIZE, rhs + USED_FLAG_SIZE) ); @@ -209,7 +213,7 @@ private Iterator> sortedIterator() private Iterator> plainIterator() { return new ResultIterator( - IntStream.range(0, cardinality).boxed().collect(Collectors.toList()) + IntStream.range(0, cardinality + 1).boxed().collect(Collectors.toList()) ); } From 04bd4daa2c1132f04102c986548b476f3e103fb4 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 20 Jul 2017 22:43:36 +0900 Subject: [PATCH 03/13] Handle invalid offset --- .../epinephelinae/AbstractBufferHashGrouper.java | 2 +- .../groupby/epinephelinae/BufferArrayGrouper.java | 14 +++++++++++--- .../query/groupby/epinephelinae/Groupers.java | 2 +- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java index 9f924aa21c39..002d13b86b35 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java @@ -139,7 +139,7 @@ public AggregateResult aggregate(KeyType key, int keyHash) if (bucket < 0) { // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will // be correct. - return Groupers.BUFFER_OVERFLOW; + return Groupers.HASH_TABLE_FULL; } final int bucketStartOffset = hashTable.getOffsetForBucket(bucket); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index 9af2c568ca62..3de9f1a50a60 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -23,6 +23,7 @@ import com.google.common.base.Supplier; import com.google.common.primitives.Ints; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.BufferAggregator; @@ -110,6 +111,8 @@ public boolean isInitialized() @Override public AggregateResult aggregate(KeyType key, int dimIndex) { + Preconditions.checkArgument(dimIndex > -1, "Invalid dimIndex[%d]", dimIndex); + final ByteBuffer fromKey = keySerde.toByteBuffer(key); if (fromKey == null) { // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will @@ -128,9 +131,14 @@ public AggregateResult aggregate(KeyType key, int dimIndex) final int recordOffset = dimIndex * recordSize; if (recordOffset + recordSize > keyBuffer.capacity()) { - // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will - // be correct. - return Groupers.BUFFER_OVERFLOW; + // This error cannot be recoverd, and the query must fail + throw new ISE( + "A record of size [%d] cannot be written to the array buffer at offset[%d] " + + "because it exceeds the buffer capacity[%d]. Try increasing druid.processing.buffer.sizeBytes", + recordSize, + recordOffset, + keyBuffer.capacity() + ); } if (!isUsedKey(keyBuffer, recordOffset)) { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Groupers.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Groupers.java index b7dba3e92b5c..b9f9b547c7c7 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Groupers.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Groupers.java @@ -36,7 +36,7 @@ private Groupers() + "druid.query.groupBy.maxMergingDictionarySize or enable disk spilling by setting " + "druid.query.groupBy.maxOnDiskStorage to a positive number." ); - static final AggregateResult BUFFER_OVERFLOW = AggregateResult.failure( + static final AggregateResult HASH_TABLE_FULL = AggregateResult.failure( "Not enough aggregation buffer space to execute this query. Try increasing " + "druid.processing.buffer.sizeBytes or enable disk spilling by setting " + "druid.query.groupBy.maxOnDiskStorage to a positive number." From e694bbc1647d24fd1ec46feaed36b0def24ccf4d Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 20 Jul 2017 23:09:39 +0900 Subject: [PATCH 04/13] Fix compilation --- .../druid/query/groupby/epinephelinae/BufferArrayGrouper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index 3de9f1a50a60..629a8bc9218a 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -111,7 +111,7 @@ public boolean isInitialized() @Override public AggregateResult aggregate(KeyType key, int dimIndex) { - Preconditions.checkArgument(dimIndex > -1, "Invalid dimIndex[%d]", dimIndex); + Preconditions.checkArgument(dimIndex > -1, "Invalid dimIndex[%s]", dimIndex); final ByteBuffer fromKey = keySerde.toByteBuffer(key); if (fromKey == null) { From 7d3b8cd7e46b445ad9603d2d16e566a12770205e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 20 Jul 2017 23:25:07 +0900 Subject: [PATCH 05/13] Add cardinality check --- .../epinephelinae/BufferArrayGrouper.java | 39 +++++++++++-------- .../epinephelinae/GroupByQueryEngineV2.java | 24 ++++++++---- 2 files changed, 39 insertions(+), 24 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index 629a8bc9218a..cae526d7389e 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -61,7 +61,12 @@ public class BufferArrayGrouper implements Grouper private final int recordSize; // keySize + size of all aggregated values private boolean initialized = false; - private ByteBuffer keyBuffer; + private ByteBuffer buffer; + + static int keySize(KeySerde keySerde) + { + return keySerde.keySize() + USED_FLAG_SIZE; + } public BufferArrayGrouper( final Supplier bufferSupplier, @@ -76,7 +81,7 @@ public BufferArrayGrouper( this.bufferSupplier = Preconditions.checkNotNull(bufferSupplier, "bufferSupplier"); this.keySerde = Preconditions.checkNotNull(keySerde, "keySerde"); - this.keySize = keySerde.keySize() + USED_FLAG_SIZE; + this.keySize = keySize(keySerde); this.aggregators = new BufferAggregator[aggregatorFactories.length]; this.aggregatorOffsets = new int[aggregatorFactories.length]; this.cardinality = cardinality; @@ -94,7 +99,7 @@ public BufferArrayGrouper( public void init() { if (!initialized) { - keyBuffer = bufferSupplier.get(); + buffer = bufferSupplier.get(); reset(); @@ -130,26 +135,26 @@ public AggregateResult aggregate(KeyType key, int dimIndex) final int recordOffset = dimIndex * recordSize; - if (recordOffset + recordSize > keyBuffer.capacity()) { + if (recordOffset + recordSize > buffer.capacity()) { // This error cannot be recoverd, and the query must fail throw new ISE( "A record of size [%d] cannot be written to the array buffer at offset[%d] " + "because it exceeds the buffer capacity[%d]. Try increasing druid.processing.buffer.sizeBytes", recordSize, recordOffset, - keyBuffer.capacity() + buffer.capacity() ); } - if (!isUsedKey(keyBuffer, recordOffset)) { - this.keyBuffer.position(recordOffset); - this.keyBuffer.putInt(Groupers.getUsedFlag(dimIndex)); - this.keyBuffer.put(fromKey); + if (!isUsedKey(buffer, recordOffset)) { + this.buffer.position(recordOffset); + this.buffer.putInt(Groupers.getUsedFlag(dimIndex)); + this.buffer.put(fromKey); } final int baseOffset = recordOffset + keySize; for (int i = 0; i < aggregators.length; i++) { - aggregators[i].aggregate(keyBuffer, baseOffset + aggregatorOffsets[i]); + aggregators[i].aggregate(buffer, baseOffset + aggregatorOffsets[i]); } return AggregateResult.ok(); @@ -175,12 +180,12 @@ private static boolean isUsedKey(ByteBuffer buffer, int pos) @Override public void reset() { - keyBuffer.putInt(0, 0); // for missing value + buffer.putInt(0, 0); // for missing value for (int i = 1; i < cardinality + 1; i++) { - keyBuffer.putInt(i * recordSize, 0); + buffer.putInt(i * recordSize, 0); final int baseOffset = i * recordSize + keySize; for (int j = 0; j < aggregators.length; ++j) { - aggregators[j].init(keyBuffer, baseOffset + aggregatorOffsets[j]); + aggregators[j].init(buffer, baseOffset + aggregatorOffsets[j]); } } } @@ -212,7 +217,7 @@ private Iterator> sortedIterator() final BufferComparator comparator = keySerde.bufferComparator(); final List wrappedOffsets = IntStream.range(0, cardinality + 1).boxed().collect(Collectors.toList()); wrappedOffsets.sort( - (lhs, rhs) -> comparator.compare(keyBuffer, keyBuffer, lhs + USED_FLAG_SIZE, rhs + USED_FLAG_SIZE) + (lhs, rhs) -> comparator.compare(buffer, buffer, lhs + USED_FLAG_SIZE, rhs + USED_FLAG_SIZE) ); return new ResultIterator(wrappedOffsets); @@ -244,7 +249,7 @@ private int findNextKeyIndex() { while (keyIndexIterator.hasNext()) { final int index = keyIndexIterator.next(); - if (isUsedKey(keyBuffer, index * recordSize)) { + if (isUsedKey(buffer, index * recordSize)) { return index; } } @@ -272,9 +277,9 @@ public Entry next() final int baseOffset = cur * recordSize + keySize; final Object[] values = new Object[aggregators.length]; for (int i = 0; i < aggregators.length; i++) { - values[i] = aggregators[i].get(keyBuffer, baseOffset + aggregatorOffsets[i]); + values[i] = aggregators[i].get(buffer, baseOffset + aggregatorOffsets[i]); } - return new Entry<>(keySerde.fromByteBuffer(keyBuffer, cur * recordSize + USED_FLAG_SIZE), values); + return new Entry<>(keySerde.fromByteBuffer(buffer, cur * recordSize + USED_FLAG_SIZE), values); } } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 89b0723b0163..60a440db6261 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -61,6 +61,7 @@ import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -359,13 +360,22 @@ private Grouper newGrouper() // Choose array-based aggregation if the grouping key is a single string dimension of a known cardinality if ((columnCapabilities == null || columnCapabilities.getType().equals(ValueType.STRING)) && cardinality > 0) { - return new BufferArrayGrouper<>( - Suppliers.ofInstance(buffer), - keySerde, - cursor, - aggregatorFactories, - cardinality - ); + final int keySize = BufferArrayGrouper.keySize(keySerde); + final int aggValuesSize = Arrays.stream(aggregatorFactories) + .mapToInt(AggregatorFactory::getMaxIntermediateSize) + .sum(); + final int recordSize = keySize + aggValuesSize; + + // Check that all keys and aggregated values can be contained the buffer + if (cardinality * recordSize < buffer.capacity()) { + return new BufferArrayGrouper<>( + Suppliers.ofInstance(buffer), + keySerde, + cursor, + aggregatorFactories, + cardinality + ); + } } } } From ec417b77c996739e08280129acc1bb1f264c6cfb Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 21 Jul 2017 08:55:21 +0900 Subject: [PATCH 06/13] Fix cardinality check --- .../druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 60a440db6261..9ebd62ae441d 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -367,7 +367,7 @@ private Grouper newGrouper() final int recordSize = keySize + aggValuesSize; // Check that all keys and aggregated values can be contained the buffer - if (cardinality * recordSize < buffer.capacity()) { + if ((cardinality + 1 ) * recordSize < buffer.capacity()) { return new BufferArrayGrouper<>( Suppliers.ofInstance(buffer), keySerde, From 1f70823b79732ea91dc8aaf8ed0baa9e45bb6b5a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 26 Jul 2017 15:57:13 +0900 Subject: [PATCH 07/13] Address comments --- .../benchmark/query/GroupByBenchmark.java | 28 ++ .../epinephelinae/BufferArrayGrouper.java | 262 +++++++++--------- .../epinephelinae/GroupByQueryEngineV2.java | 40 ++- .../query/groupby/epinephelinae/Grouper.java | 5 +- .../epinephelinae/BufferArrayGrouperTest.java | 92 ++++++ 5 files changed, 272 insertions(+), 155 deletions(-) create mode 100644 processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouperTest.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index 09f8a1fd2f88..7720cb1358b8 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -24,6 +24,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -53,10 +54,13 @@ import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryToolChest; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleMinAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; +import io.druid.query.filter.BoundDimFilter; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryEngine; @@ -238,6 +242,30 @@ private void setupQueries() basicQueries.put("nested", queryA); } + + { // basic.filter + final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList(basicSchema.getDataInterval()) + ); + // Use multiple aggregators to see how the number of aggregators impact to the query performance + List queryAggs = ImmutableList.of( + new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"), + new LongSumAggregatorFactory("rows", "rows"), + new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal"), + new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf") + ); + GroupByQuery queryA = GroupByQuery + .builder() + .setDataSource("blah") + .setQuerySegmentSpec(intervalSpec) + .setDimensions(ImmutableList.of(new DefaultDimensionSpec("dimUniform", null))) + .setAggregatorSpecs(queryAggs) + .setGranularity(Granularity.fromString(queryGranularity)) + .setDimFilter(new BoundDimFilter("dimUniform", "0", "100", true, true, null, null, null)) + .build(); + + basicQueries.put("filter", queryA); + } SCHEMA_QUERY_MAP.put("basic", basicQueries); // simple one column schema, for testing performance difference between querying on numeric values as Strings and diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index cae526d7389e..e0d8c1837dd1 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import com.google.common.primitives.Ints; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; @@ -31,41 +30,59 @@ import io.druid.segment.ColumnSelectorFactory; import java.nio.ByteBuffer; -import java.util.Collection; +import java.util.Arrays; import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.stream.Collectors; import java.util.stream.IntStream; /** - * A grouper for array-based aggregation. The array consists of records. The first record is to store - * {@link StringGroupByColumnSelectorStrategy#GROUP_BY_MISSING_VALUE}. - * The memory format of the record is like below. - * - * +---------------+----------+--------------------+--------------------+-----+ - * | used flag (4) | key (4) | agg_1 (fixed size) | agg_2 (fixed size) | ... | - * +---------------+----------+--------------------+--------------------+-----+ + * A buffer grouper for array-based aggregation. This grouper stores aggregated values in the buffer using the grouping + * key as the index. To do so, the grouping key always has the integer type. + *

+ * The buffer is divided into 3 separate regions, i.e., key buffer, used flag buffer, and value buffer. The key buffer + * is used to temporaily store a single key when deserializing the key using {@link Grouper.KeySerde} in + * {@link #iterator(boolean)}. The used flag buffer is a bit set to represent which keys are valid. If a bit of an + * index is set, that key is valid. Finally, the value buffer is used to store aggregated values. The first index is + * reserved for {@link StringGroupByColumnSelectorStrategy#GROUP_BY_MISSING_VALUE}. + *

+ * This grouper is available only when the grouping key is a single indexed dimension of a known cardinality because it + * directly uses the dimension value as the index for array access. Since the cardinality for the grouping key across + * different segments cannot be currently retrieved, this grouper can be used only when performing per-segment query + * execution. + *

+ * Since the index directly represents the grouping key, the grouping key type is always the integer. However, the + * {@link KeyType} parameter is preserved because this grouper will be used for merging per-segment aggregation + * results with a proper cardinality computation in the future. */ public class BufferArrayGrouper implements Grouper { private static final Logger LOG = new Logger(BufferArrayGrouper.class); - private static final int USED_FLAG_SIZE = Ints.BYTES; private final Supplier bufferSupplier; private final KeySerde keySerde; - private final int keySize; // key(int) + used flag(int) private final BufferAggregator[] aggregators; private final int[] aggregatorOffsets; - private final int cardinality; - private final int recordSize; // keySize + size of all aggregated values + private final int cardinalityWithMissingValue; + private final int recordSize; // size of all aggregated values private boolean initialized = false; - private ByteBuffer buffer; - - static int keySize(KeySerde keySerde) + private ByteBuffer keyBuffer; + private ByteBuffer usedFlagBuffer; + private ByteBuffer valBuffer; + + static int requiredBufferCapacity( + KeySerde keySerde, + int cardinality, + AggregatorFactory[] aggregatorFactories + ) { - return keySerde.keySize() + USED_FLAG_SIZE; + final int cardinalityWithMissingValue = cardinality + 1; + final int recordSize = Arrays.stream(aggregatorFactories) + .mapToInt(AggregatorFactory::getMaxIntermediateSize) + .sum(); + + return keySerde.keySize() + // key size + (int) Math.ceil((double) cardinalityWithMissingValue / 8) + // total used flags size + cardinalityWithMissingValue * recordSize; // total values size } public BufferArrayGrouper( @@ -81,10 +98,9 @@ public BufferArrayGrouper( this.bufferSupplier = Preconditions.checkNotNull(bufferSupplier, "bufferSupplier"); this.keySerde = Preconditions.checkNotNull(keySerde, "keySerde"); - this.keySize = keySize(keySerde); this.aggregators = new BufferAggregator[aggregatorFactories.length]; this.aggregatorOffsets = new int[aggregatorFactories.length]; - this.cardinality = cardinality; + this.cardinalityWithMissingValue = cardinality + 1; int offset = 0; for (int i = 0; i < aggregatorFactories.length; i++) { @@ -92,14 +108,27 @@ public BufferArrayGrouper( aggregatorOffsets[i] = offset; offset += aggregatorFactories[i].getMaxIntermediateSize(); } - recordSize = keySize + offset; + recordSize = offset; } @Override public void init() { if (!initialized) { - buffer = bufferSupplier.get(); + final ByteBuffer buffer = bufferSupplier.get().duplicate(); + + buffer.position(0); + buffer.limit(keySerde.keySize()); + keyBuffer = buffer.slice(); + + final int usedBufferEnd = keySerde.keySize() + (int) Math.ceil((double) cardinalityWithMissingValue / 8); + buffer.position(keySerde.keySize()); + buffer.limit(usedBufferEnd); + usedFlagBuffer = buffer.slice(); + + buffer.position(usedBufferEnd); + buffer.limit(buffer.capacity()); + valBuffer = buffer.slice(); reset(); @@ -114,79 +143,108 @@ public boolean isInitialized() } @Override - public AggregateResult aggregate(KeyType key, int dimIndex) + public AggregateResult aggregate(KeyType key) { - Preconditions.checkArgument(dimIndex > -1, "Invalid dimIndex[%s]", dimIndex); + // BufferArrayGrouper is used only for dictionary-indexed single-value string dimensions. + // Here, the key contains the dictionary-encoded value of the grouping key + // and we use it as the index for the aggregation array. - final ByteBuffer fromKey = keySerde.toByteBuffer(key); + final ByteBuffer fromKey = checkAndGetKeyBuffer(keySerde, key); if (fromKey == null) { // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will // be correct. return Groupers.DICTIONARY_FULL; } - if (fromKey.remaining() != keySerde.keySize()) { - throw new IAE( - "keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!", - fromKey.remaining(), - keySerde.keySize() - ); + // The first index is reserved for missing value which is represented by -1. + final int dimIndex = fromKey.getInt() + 1; + fromKey.rewind(); + return aggregate(key, dimIndex); + } + + @Override + public AggregateResult aggregate(KeyType key, int dimIndex) + { + Preconditions.checkArgument( + dimIndex >= 0 && dimIndex < cardinalityWithMissingValue, + "Invalid dimIndex[%s]", + dimIndex + ); + + final ByteBuffer fromKey = checkAndGetKeyBuffer(keySerde, key); + if (fromKey == null) { + // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will + // be correct. + return Groupers.DICTIONARY_FULL; } final int recordOffset = dimIndex * recordSize; - if (recordOffset + recordSize > buffer.capacity()) { + if (recordOffset + recordSize > valBuffer.capacity()) { // This error cannot be recoverd, and the query must fail throw new ISE( "A record of size [%d] cannot be written to the array buffer at offset[%d] " + "because it exceeds the buffer capacity[%d]. Try increasing druid.processing.buffer.sizeBytes", recordSize, recordOffset, - buffer.capacity() + valBuffer.capacity() ); } - if (!isUsedKey(buffer, recordOffset)) { - this.buffer.position(recordOffset); - this.buffer.putInt(Groupers.getUsedFlag(dimIndex)); - this.buffer.put(fromKey); + if (!isUsedSlot(dimIndex)) { + initializeSlot(dimIndex); } - final int baseOffset = recordOffset + keySize; for (int i = 0; i < aggregators.length; i++) { - aggregators[i].aggregate(buffer, baseOffset + aggregatorOffsets[i]); + aggregators[i].aggregate(valBuffer, recordOffset + aggregatorOffsets[i]); } return AggregateResult.ok(); } - @Override - public AggregateResult aggregate(KeyType key) + private ByteBuffer checkAndGetKeyBuffer(KeySerde keySerde, KeyType key) { - // BufferArrayGrouper is used only for dictionary-indexed single-value string dimensions. - // Here, the key contains the dictionary-encoded value of the grouping key - // and we use it as the index for the aggregation array. final ByteBuffer fromKey = keySerde.toByteBuffer(key); - final int dimIndex = fromKey.getInt() + 1; // the first index is for missing value - fromKey.rewind(); - return aggregate(key, dimIndex); + if (fromKey == null) { + return null; + } + + if (fromKey.remaining() != keySerde.keySize()) { + throw new IAE( + "keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!", + fromKey.remaining(), + keySerde.keySize() + ); + } + return fromKey; } - private static boolean isUsedKey(ByteBuffer buffer, int pos) + private void initializeSlot(int dimIndex) { - return (buffer.get(pos) & 0x80) == 0x80; + final int index = dimIndex / 8; + final int extraIndex = dimIndex % 8; + usedFlagBuffer.put(index, (byte) (usedFlagBuffer.get(index) | 1 << extraIndex)); + + final int recordOffset = dimIndex * recordSize; + for (int i = 0; i < aggregators.length; i++) { + aggregators[i].init(valBuffer, recordOffset + aggregatorOffsets[i]); + } + } + + private boolean isUsedSlot(int dimIndex) + { + final int index = dimIndex / 8; + final int extraIndex = dimIndex % 8; + final int usedByte = 1 << extraIndex; + return (usedFlagBuffer.get(index) & usedByte) == usedByte; } @Override public void reset() { - buffer.putInt(0, 0); // for missing value - for (int i = 1; i < cardinality + 1; i++) { - buffer.putInt(i * recordSize, 0); - final int baseOffset = i * recordSize + keySize; - for (int j = 0; j < aggregators.length; ++j) { - aggregators[j].init(buffer, baseOffset + aggregatorOffsets[j]); - } + final int usedBufferCapacity = usedFlagBuffer.capacity(); + for (int i = 0; i < usedBufferCapacity; i++) { + usedFlagBuffer.put(i, (byte) 0); } } @@ -206,80 +264,18 @@ public void close() @Override public Iterator> iterator(boolean sorted) { - return sorted ? sortedIterator() : plainIterator(); - } - - private Iterator> sortedIterator() - { - // Sorted iterator is currently not used because there is no way to get grouping key's cardinality when merging - // partial aggregation result in brokers and even data nodes (historicals and realtimes). - // However, it should be used in the future. - final BufferComparator comparator = keySerde.bufferComparator(); - final List wrappedOffsets = IntStream.range(0, cardinality + 1).boxed().collect(Collectors.toList()); - wrappedOffsets.sort( - (lhs, rhs) -> comparator.compare(buffer, buffer, lhs + USED_FLAG_SIZE, rhs + USED_FLAG_SIZE) - ); - - return new ResultIterator(wrappedOffsets); - } - - private Iterator> plainIterator() - { - return new ResultIterator( - IntStream.range(0, cardinality + 1).boxed().collect(Collectors.toList()) - ); - } - - private class ResultIterator implements Iterator> - { - private static final int NOT_FOUND = -1; - - private final Iterator keyIndexIterator; - private int cur; - private boolean needFindNext; - - ResultIterator(Collection keyOffsets) - { - keyIndexIterator = keyOffsets.iterator(); - cur = NOT_FOUND; - needFindNext = true; - } - - private int findNextKeyIndex() - { - while (keyIndexIterator.hasNext()) { - final int index = keyIndexIterator.next(); - if (isUsedKey(buffer, index * recordSize)) { - return index; - } - } - return NOT_FOUND; - } - - @Override - public boolean hasNext() - { - if (needFindNext) { - cur = findNextKeyIndex(); - needFindNext = false; - } - return cur > NOT_FOUND; - } - - @Override - public Entry next() - { - if (cur == NOT_FOUND) { - throw new NoSuchElementException(); - } - - needFindNext = true; - final int baseOffset = cur * recordSize + keySize; - final Object[] values = new Object[aggregators.length]; - for (int i = 0; i < aggregators.length; i++) { - values[i] = aggregators[i].get(buffer, baseOffset + aggregatorOffsets[i]); - } - return new Entry<>(keySerde.fromByteBuffer(buffer, cur * recordSize + USED_FLAG_SIZE), values); - } + // result is always natually sorted by keys + return IntStream.range(0, cardinalityWithMissingValue) + .filter(this::isUsedSlot) + .mapToObj(index -> { + keyBuffer.putInt(0, index - 1); // Restore key values from the index + + final Object[] values = new Object[aggregators.length]; + final int recordOffset = index * recordSize; + for (int i = 0; i < aggregators.length; i++) { + values[i] = aggregators[i].get(valBuffer, recordOffset + aggregatorOffsets[i]); + } + return new Entry<>(keySerde.fromByteBuffer(keyBuffer, 0), values); + }).iterator(); } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 9ebd62ae441d..ff7e3460f6c1 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -61,7 +61,6 @@ import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -114,13 +113,13 @@ public static Sequence process( null ); - final boolean allSingleValueDims = query.getDimensions().stream() - .noneMatch(dimension -> { - final ColumnCapabilities columnCapabilities = - storageAdapter.getColumnCapabilities(dimension.getDimension()); - return columnCapabilities == null || - columnCapabilities.hasMultipleValues(); - }); + final boolean allSingleValueDims = query + .getDimensions() + .stream() + .noneMatch(dimension -> { + final ColumnCapabilities columnCapabilities = storageAdapter.getColumnCapabilities(dimension.getDimension()); + return columnCapabilities == null || columnCapabilities.hasMultipleValues(); + }); final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); @@ -345,29 +344,22 @@ public void close() private Grouper newGrouper() { - final AggregatorFactory[] aggregatorFactories = query.getAggregatorSpecs() - .toArray( - new AggregatorFactory[query.getAggregatorSpecs().size()] - ); + final AggregatorFactory[] aggregatorFactories = query + .getAggregatorSpecs() + .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]); if (!querySpecificConfig.isForceHashAggregation()) { if (dims.length == 1) { final ColumnCapabilities columnCapabilities = cursor.getColumnCapabilities(dims[0].getName()); - - @SuppressWarnings("ConstantConditions") - final int cardinality = cardinalityFunction.apply(dims[0].getName()); + final int cardinality = computeCardinality(cardinalityFunction, dims[0]); // Choose array-based aggregation if the grouping key is a single string dimension of a known cardinality if ((columnCapabilities == null || columnCapabilities.getType().equals(ValueType.STRING)) && cardinality > 0) { - final int keySize = BufferArrayGrouper.keySize(keySerde); - final int aggValuesSize = Arrays.stream(aggregatorFactories) - .mapToInt(AggregatorFactory::getMaxIntermediateSize) - .sum(); - final int recordSize = keySize + aggValuesSize; + final int requiredBufferCapacity = BufferArrayGrouper.requiredBufferCapacity(keySerde, cardinality, aggregatorFactories); // Check that all keys and aggregated values can be contained the buffer - if ((cardinality + 1 ) * recordSize < buffer.capacity()) { + if (requiredBufferCapacity <= buffer.capacity()) { return new BufferArrayGrouper<>( Suppliers.ofInstance(buffer), keySerde, @@ -391,6 +383,12 @@ private Grouper newGrouper() ); } + @SuppressWarnings("ConstantConditions") + private static int computeCardinality(Function cardinalityFunction, GroupByColumnSelectorPlus dim) + { + return cardinalityFunction.apply(dim.getName()); + } + private void aggregateSingleValueDims(Grouper grouper) { while (!cursor.isDone()) { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java index 2f6f795fd091..103c1dd5f68c 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java @@ -87,7 +87,7 @@ public interface Grouper extends Closeable void close(); /** - * Iterate through entries. If a comparator is provided, do a sorted iteration. + * Iterate through entries. *

* Once this method is called, writes are no longer safe. After you are done with the iterator returned by this * method, you should either call {@link #close()} (if you are done with the Grouper), {@link #reset()} (if you @@ -96,6 +96,9 @@ public interface Grouper extends Closeable * If "sorted" is true then the iterator will return sorted results. It will use KeyType's natural ordering on * deserialized objects, and will use the {@link KeySerde#comparator()} on serialized objects. Woe be unto you * if these comparators are not equivalent. + *

+ * Callers must process and discard the returned {@link Entry}s immediately because some implementations can reuse the + * key objects. * * @param sorted return sorted results * diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouperTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouperTest.java new file mode 100644 index 000000000000..2b8baebf2004 --- /dev/null +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouperTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby.epinephelinae; + +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.google.common.primitives.Ints; +import io.druid.data.input.MapBasedRow; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.groupby.epinephelinae.Grouper.Entry; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.List; + +public class BufferArrayGrouperTest +{ + @Test + public void testAggregate() + { + final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); + final Grouper grouper = newGrouper(columnSelectorFactory, 1024); + + columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); + grouper.aggregate(12); + grouper.aggregate(6); + grouper.aggregate(10); + grouper.aggregate(6); + grouper.aggregate(12); + grouper.aggregate(6); + + final List> expected = ImmutableList.of( + new Grouper.Entry<>(6, new Object[]{30L, 3L}), + new Grouper.Entry<>(10, new Object[]{10L, 1L}), + new Grouper.Entry<>(12, new Object[]{20L, 2L}) + ); + final List> unsortedEntries = Lists.newArrayList(grouper.iterator(false)); + final List> sortedEntries = Lists.newArrayList(grouper.iterator(true)); + + Assert.assertEquals(expected, sortedEntries); + Assert.assertEquals( + expected, + Ordering.from((Comparator>) (o1, o2) -> Ints.compare(o1.getKey(), o2.getKey())) + .sortedCopy(unsortedEntries) + ); + } + + private BufferArrayGrouper newGrouper( + TestColumnSelectorFactory columnSelectorFactory, + int bufferSize + ) + { + final ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + + final BufferArrayGrouper grouper = new BufferArrayGrouper<>( + Suppliers.ofInstance(buffer), + GrouperTestUtil.intKeySerde(), + columnSelectorFactory, + new AggregatorFactory[]{ + new LongSumAggregatorFactory("valueSum", "value"), + new CountAggregatorFactory("count") + }, + 1000 + ); + grouper.init(); + return grouper; + } +} From e0f2663527bec0a14304b9a37a5fdffb79ec1995 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 31 Jul 2017 23:32:46 +0900 Subject: [PATCH 08/13] Address comments --- .../AbstractBufferHashGrouper.java | 6 - .../epinephelinae/BufferArrayGrouper.java | 172 +++++---- .../epinephelinae/ConcurrentGrouper.java | 6 - .../epinephelinae/GroupByQueryEngineV2.java | 345 ++++++++++++++---- .../query/groupby/epinephelinae/Grouper.java | 15 +- .../epinephelinae/SpillingGrouper.java | 6 - .../column/GroupByColumnSelectorStrategy.java | 2 + .../StringGroupByColumnSelectorStrategy.java | 2 - .../epinephelinae/BufferArrayGrouperTest.java | 9 +- 9 files changed, 367 insertions(+), 196 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java index 002d13b86b35..c8ee101e9cd5 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java @@ -170,12 +170,6 @@ public AggregateResult aggregate(KeyType key, int keyHash) return AggregateResult.ok(); } - @Override - public AggregateResult aggregate(final KeyType key) - { - return aggregate(key, Groupers.hash(key)); - } - @Override public void close() { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index e0d8c1837dd1..92259186d431 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -21,56 +21,48 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.BufferAggregator; -import io.druid.query.groupby.epinephelinae.column.StringGroupByColumnSelectorStrategy; +import io.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy; import io.druid.segment.ColumnSelectorFactory; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; -import java.util.stream.IntStream; +import java.util.NoSuchElementException; +import java.util.function.Function; /** * A buffer grouper for array-based aggregation. This grouper stores aggregated values in the buffer using the grouping - * key as the index. To do so, the grouping key always has the integer type. + * key as the index. *

- * The buffer is divided into 3 separate regions, i.e., key buffer, used flag buffer, and value buffer. The key buffer - * is used to temporaily store a single key when deserializing the key using {@link Grouper.KeySerde} in - * {@link #iterator(boolean)}. The used flag buffer is a bit set to represent which keys are valid. If a bit of an - * index is set, that key is valid. Finally, the value buffer is used to store aggregated values. The first index is - * reserved for {@link StringGroupByColumnSelectorStrategy#GROUP_BY_MISSING_VALUE}. + * The buffer is divided into 2 separate regions, i.e., used flag buffer and value buffer. The used flag buffer is a + * bit set to represent which keys are valid. If a bit of an index is set, that key is valid. Finally, the value + * buffer is used to store aggregated values. The first index is reserved for + * {@link GroupByColumnSelectorStrategy#GROUP_BY_MISSING_VALUE}. *

* This grouper is available only when the grouping key is a single indexed dimension of a known cardinality because it * directly uses the dimension value as the index for array access. Since the cardinality for the grouping key across * different segments cannot be currently retrieved, this grouper can be used only when performing per-segment query * execution. - *

- * Since the index directly represents the grouping key, the grouping key type is always the integer. However, the - * {@link KeyType} parameter is preserved because this grouper will be used for merging per-segment aggregation - * results with a proper cardinality computation in the future. */ -public class BufferArrayGrouper implements Grouper +public class BufferArrayGrouper implements Grouper { private static final Logger LOG = new Logger(BufferArrayGrouper.class); private final Supplier bufferSupplier; - private final KeySerde keySerde; private final BufferAggregator[] aggregators; private final int[] aggregatorOffsets; private final int cardinalityWithMissingValue; private final int recordSize; // size of all aggregated values private boolean initialized = false; - private ByteBuffer keyBuffer; private ByteBuffer usedFlagBuffer; private ByteBuffer valBuffer; - static int requiredBufferCapacity( - KeySerde keySerde, + static int requiredBufferCapacity( int cardinality, AggregatorFactory[] aggregatorFactories ) @@ -80,14 +72,18 @@ static int requiredBufferCapacity( .mapToInt(AggregatorFactory::getMaxIntermediateSize) .sum(); - return keySerde.keySize() + // key size - (int) Math.ceil((double) cardinalityWithMissingValue / 8) + // total used flags size - cardinalityWithMissingValue * recordSize; // total values size + return Integer.BYTES + // key size + getUserBufferCapacity(cardinalityWithMissingValue) + // total used flags size + cardinalityWithMissingValue * recordSize; // total values size + } + + private static int getUserBufferCapacity(int cardinalityWithMissingValue) + { + return (int) Math.ceil((double) cardinalityWithMissingValue / 8); } public BufferArrayGrouper( final Supplier bufferSupplier, - final KeySerde keySerde, final ColumnSelectorFactory columnSelectorFactory, final AggregatorFactory[] aggregatorFactories, final int cardinality @@ -97,7 +93,6 @@ public BufferArrayGrouper( Preconditions.checkArgument(cardinality > 0, "Cardinality must a non-zero positive number"); this.bufferSupplier = Preconditions.checkNotNull(bufferSupplier, "bufferSupplier"); - this.keySerde = Preconditions.checkNotNull(keySerde, "keySerde"); this.aggregators = new BufferAggregator[aggregatorFactories.length]; this.aggregatorOffsets = new int[aggregatorFactories.length]; this.cardinalityWithMissingValue = cardinality + 1; @@ -117,12 +112,8 @@ public void init() if (!initialized) { final ByteBuffer buffer = bufferSupplier.get().duplicate(); + final int usedBufferEnd = getUserBufferCapacity(cardinalityWithMissingValue); buffer.position(0); - buffer.limit(keySerde.keySize()); - keyBuffer = buffer.slice(); - - final int usedBufferEnd = keySerde.keySize() + (int) Math.ceil((double) cardinalityWithMissingValue / 8); - buffer.position(keySerde.keySize()); buffer.limit(usedBufferEnd); usedFlagBuffer = buffer.slice(); @@ -143,27 +134,7 @@ public boolean isInitialized() } @Override - public AggregateResult aggregate(KeyType key) - { - // BufferArrayGrouper is used only for dictionary-indexed single-value string dimensions. - // Here, the key contains the dictionary-encoded value of the grouping key - // and we use it as the index for the aggregation array. - - final ByteBuffer fromKey = checkAndGetKeyBuffer(keySerde, key); - if (fromKey == null) { - // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will - // be correct. - return Groupers.DICTIONARY_FULL; - } - - // The first index is reserved for missing value which is represented by -1. - final int dimIndex = fromKey.getInt() + 1; - fromKey.rewind(); - return aggregate(key, dimIndex); - } - - @Override - public AggregateResult aggregate(KeyType key, int dimIndex) + public AggregateResult aggregate(Integer key, int dimIndex) { Preconditions.checkArgument( dimIndex >= 0 && dimIndex < cardinalityWithMissingValue, @@ -171,12 +142,7 @@ public AggregateResult aggregate(KeyType key, int dimIndex) dimIndex ); - final ByteBuffer fromKey = checkAndGetKeyBuffer(keySerde, key); - if (fromKey == null) { - // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will - // be correct. - return Groupers.DICTIONARY_FULL; - } + Preconditions.checkNotNull(key); final int recordOffset = dimIndex * recordSize; @@ -202,23 +168,6 @@ public AggregateResult aggregate(KeyType key, int dimIndex) return AggregateResult.ok(); } - private ByteBuffer checkAndGetKeyBuffer(KeySerde keySerde, KeyType key) - { - final ByteBuffer fromKey = keySerde.toByteBuffer(key); - if (fromKey == null) { - return null; - } - - if (fromKey.remaining() != keySerde.keySize()) { - throw new IAE( - "keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!", - fromKey.remaining(), - keySerde.keySize() - ); - } - return fromKey; - } - private void initializeSlot(int dimIndex) { final int index = dimIndex / 8; @@ -243,11 +192,23 @@ private boolean isUsedSlot(int dimIndex) public void reset() { final int usedBufferCapacity = usedFlagBuffer.capacity(); - for (int i = 0; i < usedBufferCapacity; i++) { + + final int n = usedBufferCapacity / 8; + for (int i = 0; i < n; i += 8) { + usedFlagBuffer.putLong(i, 0L); + } + + for (int i = n; i < usedBufferCapacity; i++) { usedFlagBuffer.put(i, (byte) 0); } } + @Override + public Function hashFunction() + { + return key -> key + 1; + } + @Override public void close() { @@ -262,20 +223,57 @@ public void close() } @Override - public Iterator> iterator(boolean sorted) + public Iterator> iterator(boolean sorted) { - // result is always natually sorted by keys - return IntStream.range(0, cardinalityWithMissingValue) - .filter(this::isUsedSlot) - .mapToObj(index -> { - keyBuffer.putInt(0, index - 1); // Restore key values from the index - - final Object[] values = new Object[aggregators.length]; - final int recordOffset = index * recordSize; - for (int i = 0; i < aggregators.length; i++) { - values[i] = aggregators[i].get(valBuffer, recordOffset + aggregatorOffsets[i]); - } - return new Entry<>(keySerde.fromByteBuffer(keyBuffer, 0), values); - }).iterator(); + if (sorted) { + throw new UnsupportedOperationException("sorted iterator is not supported yet"); + } + + return new Iterator>() + { + int cur = -1; + boolean findNext = false; + + { + cur = findNext(); + } + + @Override + public boolean hasNext() + { + if (findNext) { + cur = findNext(); + findNext = false; + } + return cur != -1; + } + + private int findNext() + { + for (int i = cur + 1; i < cardinalityWithMissingValue; i++) { + if (isUsedSlot(i)) { + return i; + } + } + return -1; + } + + @Override + public Entry next() + { + if (cur == -1) { + throw new NoSuchElementException(); + } + + findNext = true; + + final Object[] values = new Object[aggregators.length]; + final int recordOffset = cur * recordSize; + for (int i = 0; i < aggregators.length; i++) { + values[i] = aggregators[i].get(valBuffer, recordOffset + aggregatorOffsets[i]); + } + return new Entry<>(cur - 1, values); + } + }; } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 71d3ff6f1543..8bd1b597c41a 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -187,12 +187,6 @@ public AggregateResult aggregate(KeyType key, int keyHash) } } - @Override - public AggregateResult aggregate(KeyType key) - { - return aggregate(key, Groupers.hash(key)); - } - @Override public void reset() { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index ff7e3460f6c1..409c62a43c6a 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -20,6 +20,7 @@ package io.druid.query.groupby.epinephelinae; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Suppliers; import com.google.common.collect.Maps; @@ -54,6 +55,7 @@ import io.druid.segment.StorageAdapter; import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ValueType; +import io.druid.segment.data.IndexedInts; import io.druid.segment.filter.Filters; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -116,9 +118,9 @@ public static Sequence process( final boolean allSingleValueDims = query .getDimensions() .stream() - .noneMatch(dimension -> { + .allMatch(dimension -> { final ColumnCapabilities columnCapabilities = storageAdapter.getColumnCapabilities(dimension.getDimension()); - return columnCapabilities == null || columnCapabilities.hasMultipleValues(); + return columnCapabilities != null && !columnCapabilities.hasMultipleValues(); }); final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); @@ -141,24 +143,71 @@ public static Sequence process( public Sequence apply(final Cursor cursor) { return new BaseSequence<>( - new BaseSequence.IteratorMaker() + new BaseSequence.IteratorMaker>() { @Override public GroupByEngineIterator make() { - ColumnSelectorPlus[] selectorPlus = DimensionHandlerUtils.createColumnSelectorPluses( - STRATEGY_FACTORY, - query.getDimensions(), - cursor - ); - return new GroupByEngineIterator( + ColumnSelectorPlus[] selectorPlus = DimensionHandlerUtils + .createColumnSelectorPluses( + STRATEGY_FACTORY, + query.getDimensions(), + cursor + ); + GroupByColumnSelectorPlus[] dims = createGroupBySelectorPlus(selectorPlus); + + final ByteBuffer buffer = bufferHolder.get(); + + // Check array-based aggregation is applicable + if (!config.isForceHashAggregation()) { + final ColumnCapabilities columnCapabilities; + final int cardinality; + if (dims.length == 0) { + columnCapabilities = null; + cardinality = 1; + } else if (dims.length == 1) { + columnCapabilities = cursor.getColumnCapabilities(dims[0].getName()); + cardinality = storageAdapter.getDimensionCardinality(dims[0].getName()); + } else { + columnCapabilities = null; + cardinality = -1; // ArrayAggregateIterator is not available + } + + // Choose array-based aggregation if the grouping key is a single string dimension of a + // known cardinality + if ((columnCapabilities == null || columnCapabilities.getType().equals(ValueType.STRING)) + && cardinality > 0) { + final AggregatorFactory[] aggregatorFactories = query + .getAggregatorSpecs() + .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]); + final int requiredBufferCapacity = BufferArrayGrouper.requiredBufferCapacity( + cardinality, + aggregatorFactories + ); + + // Check that all keys and aggregated values can be contained the buffer + if (requiredBufferCapacity <= buffer.capacity()) { + return new ArrayAggregateIterator( + query, + config, + cursor, + buffer, + fudgeTimestamp, + dims, + allSingleValueDims, + cardinality + ); + } + } + } + + return new HashAggregateIterator( query, config, cursor, - bufferHolder.get(), + buffer, fudgeTimestamp, - createGroupBySelectorPlus(selectorPlus), - storageAdapter::getDimensionCardinality, + dims, allSingleValueDims ); } @@ -213,24 +262,18 @@ public GroupByColumnSelectorStrategy makeColumnSelectorStrategy( } } - private static class GroupByEngineIterator implements Iterator, Closeable + private abstract static class GroupByEngineIterator implements Iterator, Closeable { - private final GroupByQuery query; - private final GroupByQueryConfig querySpecificConfig; - private final Cursor cursor; - private final ByteBuffer buffer; - private final Grouper.KeySerde keySerde; - private final DateTime timestamp; - private final ByteBuffer keyBuffer; - private final int[] stack; - private final Object[] valuess; - private final GroupByColumnSelectorPlus[] dims; + protected final GroupByQuery query; + protected final GroupByQueryConfig querySpecificConfig; + protected final Cursor cursor; + protected final ByteBuffer buffer; + protected final Grouper.KeySerde keySerde; + protected final GroupByColumnSelectorPlus[] dims; + protected final DateTime timestamp; - private int stackp = Integer.MIN_VALUE; - private boolean currentRowWasPartiallyAggregated = false; - private CloseableGrouperIterator delegate = null; - private final Function cardinalityFunction; // dimension name -> cardinality - private final boolean allSingleValueDims; + protected CloseableGrouperIterator delegate = null; + protected final boolean allSingleValueDims; public GroupByEngineIterator( final GroupByQuery query, @@ -239,31 +282,24 @@ public GroupByEngineIterator( final ByteBuffer buffer, final DateTime fudgeTimestamp, final GroupByColumnSelectorPlus[] dims, - final Function cardinalityFunction, final boolean allSingleValueDims ) { - final int dimCount = query.getDimensions().size(); - this.query = query; this.querySpecificConfig = config.withOverrides(query); this.cursor = cursor; this.buffer = buffer; this.keySerde = new GroupByEngineKeySerde(dims); - this.keyBuffer = ByteBuffer.allocate(keySerde.keySize()); this.dims = dims; - this.stack = new int[dimCount]; - this.valuess = new Object[dimCount]; // Time is the same for every row in the cursor this.timestamp = fudgeTimestamp != null ? fudgeTimestamp : cursor.getTime(); - this.cardinalityFunction = cardinalityFunction; this.allSingleValueDims = allSingleValueDims; } - private CloseableGrouperIterator initNewDelegate() + private CloseableGrouperIterator initNewDelegate() { - final Grouper grouper = newGrouper(); + final Grouper grouper = newGrouper(); grouper.init(); if (allSingleValueDims) { @@ -279,13 +315,7 @@ private CloseableGrouperIterator initNewDelegate() Map theMap = Maps.newLinkedHashMap(); // Add dimensions. - for (GroupByColumnSelectorPlus selectorPlus : dims) { - selectorPlus.getColumnSelectorStrategy().processValueFromGroupingKey( - selectorPlus, - entry.getKey(), - theMap - ); - } + putToMap(entry.getKey(), theMap); convertRowTypesToOutputTypes(query.getDimensions(), theMap); @@ -342,54 +372,81 @@ public void close() } } - private Grouper newGrouper() + /** + * Create a new grouper. + */ + protected abstract Grouper newGrouper(); + + /** + * Grouping dimensions are all single-valued, and thus the given grouper don't have to worry about multi-valued + * dimensions. + */ + protected abstract void aggregateSingleValueDims(Grouper grouper); + + /** + * Grouping dimensions can be multi-valued, and thus the given grouper should handle them properly during + * aggregation. + */ + protected abstract void aggregateMultiValueDims(Grouper grouper); + + /** + * Add the key to the result map. Some pre-processing like deserialization might be done for the key before + * adding to the map. + */ + protected abstract void putToMap(KeyType key, Map map); + + protected int getSingleValue(IndexedInts indexedInts) { - final AggregatorFactory[] aggregatorFactories = query - .getAggregatorSpecs() - .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]); - - if (!querySpecificConfig.isForceHashAggregation()) { - if (dims.length == 1) { - final ColumnCapabilities columnCapabilities = cursor.getColumnCapabilities(dims[0].getName()); - final int cardinality = computeCardinality(cardinalityFunction, dims[0]); - - // Choose array-based aggregation if the grouping key is a single string dimension of a known cardinality - if ((columnCapabilities == null || columnCapabilities.getType().equals(ValueType.STRING)) && - cardinality > 0) { - final int requiredBufferCapacity = BufferArrayGrouper.requiredBufferCapacity(keySerde, cardinality, aggregatorFactories); - - // Check that all keys and aggregated values can be contained the buffer - if (requiredBufferCapacity <= buffer.capacity()) { - return new BufferArrayGrouper<>( - Suppliers.ofInstance(buffer), - keySerde, - cursor, - aggregatorFactories, - cardinality - ); - } - } - } - } + Preconditions.checkArgument(indexedInts.size() < 2, "should be single value"); + return indexedInts.size() == 1 ? indexedInts.get(0) : GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE; + } + + } + + private static class HashAggregateIterator extends GroupByEngineIterator + { + private final int[] stack; + private final Object[] valuess; + private final ByteBuffer keyBuffer; + + private int stackp = Integer.MIN_VALUE; + protected boolean currentRowWasPartiallyAggregated = false; + + public HashAggregateIterator( + GroupByQuery query, + GroupByQueryConfig config, + Cursor cursor, + ByteBuffer buffer, + DateTime fudgeTimestamp, + GroupByColumnSelectorPlus[] dims, + boolean allSingleValueDims + ) + { + super(query, config, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims); + final int dimCount = query.getDimensions().size(); + stack = new int[dimCount]; + valuess = new Object[dimCount]; + keyBuffer = ByteBuffer.allocate(keySerde.keySize()); + } + + @Override + protected Grouper newGrouper() + { return new BufferHashGrouper<>( Suppliers.ofInstance(buffer), keySerde, cursor, - aggregatorFactories, + query.getAggregatorSpecs() + .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]), querySpecificConfig.getBufferGrouperMaxSize(), querySpecificConfig.getBufferGrouperMaxLoadFactor(), querySpecificConfig.getBufferGrouperInitialBuckets() ); } - @SuppressWarnings("ConstantConditions") - private static int computeCardinality(Function cardinalityFunction, GroupByColumnSelectorPlus dim) - { - return cardinalityFunction.apply(dim.getName()); - } - - private void aggregateSingleValueDims(Grouper grouper) + @Override + protected void aggregateSingleValueDims(Grouper grouper) { while (!cursor.isDone()) { for (int i = 0; i < dims.length; i++) { @@ -401,6 +458,7 @@ private void aggregateSingleValueDims(Grouper grouper) ); } keyBuffer.rewind(); + if (!grouper.aggregate(keyBuffer).isOk()) { return; } @@ -408,7 +466,8 @@ private void aggregateSingleValueDims(Grouper grouper) } } - private void aggregateMultiValueDims(Grouper grouper) + @Override + protected void aggregateMultiValueDims(Grouper grouper) { while (!cursor.isDone()) { if (!currentRowWasPartiallyAggregated) { @@ -479,6 +538,130 @@ private void aggregateMultiValueDims(Grouper grouper) currentRowWasPartiallyAggregated = false; } } + + @Override + protected void putToMap(ByteBuffer key, Map map) + { + for (GroupByColumnSelectorPlus selectorPlus : dims) { + selectorPlus.getColumnSelectorStrategy().processValueFromGroupingKey( + selectorPlus, + key, + map + ); + } + } + } + + private static class ArrayAggregateIterator extends GroupByEngineIterator + { + private final int cardinality; + private final GroupByColumnSelectorPlus dim; + + private IndexedInts multiValues; + private int nextValIndex; + + public ArrayAggregateIterator( + GroupByQuery query, + GroupByQueryConfig config, + Cursor cursor, + ByteBuffer buffer, + DateTime fudgeTimestamp, + GroupByColumnSelectorPlus[] dims, + boolean allSingleValueDims, + int cardinality + ) + { + super(query, config, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims); + this.cardinality = cardinality; + if (dims.length == 1) { + this.dim = dims[0]; + } else if (dims.length == 0) { + this.dim = null; + } else { + throw new IAE("Group key should be a single dimension"); + } + } + + @Override + protected Grouper newGrouper() + { + return new BufferArrayGrouper( + Suppliers.ofInstance(buffer), + cursor, + query.getAggregatorSpecs() + .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]), + cardinality + ); + } + + @Override + protected void aggregateSingleValueDims(Grouper grouper) + { + while (!cursor.isDone()) { + final int key; + if (dim != null) { + // dim is always an indexed string dimension + final IndexedInts indexedInts = ((DimensionSelector) dim.getSelector()).getRow(); + key = getSingleValue(indexedInts); + } else { + key = 0; + } + if (!grouper.aggregate(key).isOk()) { + return; + } + cursor.advance(); + } + } + + @Override + protected void aggregateMultiValueDims(Grouper grouper) + { + if (dim == null) { + throw new ISE("dim must exist"); + } + + if (multiValues == null) { + // dim is always an indexed string dimension + multiValues = ((DimensionSelector) dim.getSelector()).getRow(); + nextValIndex = 0; + } + + while (!cursor.isDone()) { + if (multiValues.size() == 0) { + if (!grouper.aggregate(GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE).isOk()) { + return; + } + } else { + for (; nextValIndex < multiValues.size(); nextValIndex++) { + if (!grouper.aggregate(multiValues.get(nextValIndex)).isOk()) { + return; + } + } + } + + cursor.advance(); + if (!cursor.isDone()) { + // dim is always an indexed string dimension + multiValues = ((DimensionSelector) dim.getSelector()).getRow(); + nextValIndex = multiValues.size() == 0 ? -1 : 0; + } + } + } + + @Override + protected void putToMap(Integer key, Map map) + { + if (dim != null) { + if (key != -1) { + map.put( + dim.getOutputName(), + ((DimensionSelector) dim.getSelector()).lookupName(key) + ); + } else { + map.put(dim.getOutputName(), ""); + } + } + } } private static void convertRowTypesToOutputTypes(List dimensionSpecs, Map rowMap) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java index 103c1dd5f68c..6ca737251187 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import io.druid.query.aggregation.AggregatorFactory; import java.io.Closeable; @@ -28,6 +29,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; +import java.util.function.Function; /** * Groupers aggregate metrics from rows that they typically get from a ColumnSelectorFactory, under @@ -59,7 +61,7 @@ public interface Grouper extends Closeable * some are not. * * @param key key object - * @param keyHash result of {@link Groupers#hash(Object)} on the key + * @param keyHash result of {@link #hashFunction()} on the key * * @return result that is ok if the row was aggregated, not ok if a resource limit was hit */ @@ -73,13 +75,22 @@ public interface Grouper extends Closeable * * @return result that is ok if the row was aggregated, not ok if a resource limit was hit */ - AggregateResult aggregate(KeyType key); + default AggregateResult aggregate(KeyType key) + { + Preconditions.checkNotNull(key, "key"); + return aggregate(key, hashFunction().apply(key)); + } /** * Reset the grouper to its initial state. */ void reset(); + default Function hashFunction() + { + return Groupers::hash; + } + /** * Close the grouper and release associated resources. */ diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java index e8f1de3d0ea9..60e618def1a9 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -153,12 +153,6 @@ public AggregateResult aggregate(KeyType key, int keyHash) } } - @Override - public AggregateResult aggregate(KeyType key) - { - return aggregate(key, Groupers.hash(key)); - } - @Override public void reset() { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java index 4b9f54cf4ad8..ceda42753424 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/GroupByColumnSelectorStrategy.java @@ -33,6 +33,8 @@ */ public interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy { + int GROUP_BY_MISSING_VALUE = -1; + /** * Return the size, in bytes, of this dimension's values in the grouping key. * diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java index ae738724e771..9cabaa5aa1c7 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/StringGroupByColumnSelectorStrategy.java @@ -30,8 +30,6 @@ public class StringGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy { - private static final int GROUP_BY_MISSING_VALUE = -1; - @Override public int getGroupingKeySize() { diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouperTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouperTest.java index 2b8baebf2004..83a16a7a45f5 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouperTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouperTest.java @@ -58,10 +58,8 @@ public void testAggregate() new Grouper.Entry<>(10, new Object[]{10L, 1L}), new Grouper.Entry<>(12, new Object[]{20L, 2L}) ); - final List> unsortedEntries = Lists.newArrayList(grouper.iterator(false)); - final List> sortedEntries = Lists.newArrayList(grouper.iterator(true)); + final List> unsortedEntries = Lists.newArrayList(grouper.iterator(false)); - Assert.assertEquals(expected, sortedEntries); Assert.assertEquals( expected, Ordering.from((Comparator>) (o1, o2) -> Ints.compare(o1.getKey(), o2.getKey())) @@ -69,16 +67,15 @@ public void testAggregate() ); } - private BufferArrayGrouper newGrouper( + private BufferArrayGrouper newGrouper( TestColumnSelectorFactory columnSelectorFactory, int bufferSize ) { final ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - final BufferArrayGrouper grouper = new BufferArrayGrouper<>( + final BufferArrayGrouper grouper = new BufferArrayGrouper( Suppliers.ofInstance(buffer), - GrouperTestUtil.intKeySerde(), columnSelectorFactory, new AggregatorFactory[]{ new LongSumAggregatorFactory("valueSum", "value"), From 7d50647253e64341b26087777034e4bda1dfadbf Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 2 Aug 2017 10:56:32 +0900 Subject: [PATCH 09/13] Address comments --- .../AbstractBufferHashGrouper.java | 1 + .../epinephelinae/BufferArrayGrouper.java | 54 +++---- .../epinephelinae/GroupByQueryEngineV2.java | 135 ++++++++++-------- .../query/groupby/epinephelinae/Grouper.java | 6 +- 4 files changed, 112 insertions(+), 84 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java index c8ee101e9cd5..ef0d57b7331d 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java @@ -51,6 +51,7 @@ public abstract class AbstractBufferHashGrouper implements Grouper bufferSupplier, final KeySerde keySerde, final AggregatorFactory[] aggregatorFactories, diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index 92259186d431..6778e2672e28 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -32,7 +32,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.function.Function; +import java.util.function.ToIntFunction; /** * A buffer grouper for array-based aggregation. This grouper stores aggregated values in the buffer using the grouping @@ -72,17 +72,21 @@ static int requiredBufferCapacity( .mapToInt(AggregatorFactory::getMaxIntermediateSize) .sum(); - return Integer.BYTES + // key size - getUserBufferCapacity(cardinalityWithMissingValue) + // total used flags size - cardinalityWithMissingValue * recordSize; // total values size + return Integer.BYTES + // grouping key size + getUsedFlagBufferCapacity(cardinalityWithMissingValue) + // total used flags size + cardinalityWithMissingValue * recordSize; // total values size } - private static int getUserBufferCapacity(int cardinalityWithMissingValue) + /** + * Compute the number of bytes to store all used flag bits. + */ + private static int getUsedFlagBufferCapacity(int cardinalityWithMissingValue) { - return (int) Math.ceil((double) cardinalityWithMissingValue / 8); + return (cardinalityWithMissingValue + 7) / 8; } public BufferArrayGrouper( + // the buffer returned from the below supplier can have dirty bits and should be cleared during initialization final Supplier bufferSupplier, final ColumnSelectorFactory columnSelectorFactory, final AggregatorFactory[] aggregatorFactories, @@ -110,14 +114,14 @@ public BufferArrayGrouper( public void init() { if (!initialized) { - final ByteBuffer buffer = bufferSupplier.get().duplicate(); + final ByteBuffer buffer = bufferSupplier.get(); - final int usedBufferEnd = getUserBufferCapacity(cardinalityWithMissingValue); + final int usedFlagBufferEnd = getUsedFlagBufferCapacity(cardinalityWithMissingValue); buffer.position(0); - buffer.limit(usedBufferEnd); + buffer.limit(usedFlagBufferEnd); usedFlagBuffer = buffer.slice(); - buffer.position(usedBufferEnd); + buffer.position(usedFlagBufferEnd); buffer.limit(buffer.capacity()); valBuffer = buffer.slice(); @@ -170,9 +174,9 @@ public AggregateResult aggregate(Integer key, int dimIndex) private void initializeSlot(int dimIndex) { - final int index = dimIndex / 8; - final int extraIndex = dimIndex % 8; - usedFlagBuffer.put(index, (byte) (usedFlagBuffer.get(index) | 1 << extraIndex)); + final int index = dimIndex / Byte.SIZE; + final int extraIndex = dimIndex % Byte.SIZE; + usedFlagBuffer.put(index, (byte) (usedFlagBuffer.get(index) | (1 << extraIndex))); final int recordOffset = dimIndex * recordSize; for (int i = 0; i < aggregators.length; i++) { @@ -182,29 +186,31 @@ private void initializeSlot(int dimIndex) private boolean isUsedSlot(int dimIndex) { - final int index = dimIndex / 8; - final int extraIndex = dimIndex % 8; - final int usedByte = 1 << extraIndex; - return (usedFlagBuffer.get(index) & usedByte) == usedByte; + final int index = dimIndex / Byte.SIZE; + final int extraIndex = dimIndex % Byte.SIZE; + final int usedFlagByte = 1 << extraIndex; + return (usedFlagBuffer.get(index) & usedFlagByte) != 0; } @Override public void reset() { - final int usedBufferCapacity = usedFlagBuffer.capacity(); + // Clear the entire usedFlagBuffer + final int usedFlagBufferCapacity = usedFlagBuffer.capacity(); - final int n = usedBufferCapacity / 8; - for (int i = 0; i < n; i += 8) { + // putLong() instead of put() can boost the performance of clearing the buffer + final int n = usedFlagBufferCapacity / Long.BYTES; + for (int i = 0; i < n; i += Long.BYTES) { usedFlagBuffer.putLong(i, 0L); } - for (int i = n; i < usedBufferCapacity; i++) { + for (int i = n; i < usedFlagBufferCapacity; i++) { usedFlagBuffer.put(i, (byte) 0); } } @Override - public Function hashFunction() + public ToIntFunction hashFunction() { return key -> key + 1; } @@ -245,7 +251,7 @@ public boolean hasNext() cur = findNext(); findNext = false; } - return cur != -1; + return cur >= 0; } private int findNext() @@ -261,7 +267,7 @@ private int findNext() @Override public Entry next() { - if (cur == -1) { + if (cur < 0) { throw new NoSuchElementException(); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 409c62a43c6a..bc8312271048 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -60,6 +60,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; @@ -159,57 +160,29 @@ public GroupByEngineIterator make() final ByteBuffer buffer = bufferHolder.get(); // Check array-based aggregation is applicable - if (!config.isForceHashAggregation()) { - final ColumnCapabilities columnCapabilities; - final int cardinality; - if (dims.length == 0) { - columnCapabilities = null; - cardinality = 1; - } else if (dims.length == 1) { - columnCapabilities = cursor.getColumnCapabilities(dims[0].getName()); - cardinality = storageAdapter.getDimensionCardinality(dims[0].getName()); - } else { - columnCapabilities = null; - cardinality = -1; // ArrayAggregateIterator is not available - } - - // Choose array-based aggregation if the grouping key is a single string dimension of a - // known cardinality - if ((columnCapabilities == null || columnCapabilities.getType().equals(ValueType.STRING)) - && cardinality > 0) { - final AggregatorFactory[] aggregatorFactories = query - .getAggregatorSpecs() - .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]); - final int requiredBufferCapacity = BufferArrayGrouper.requiredBufferCapacity( - cardinality, - aggregatorFactories - ); - - // Check that all keys and aggregated values can be contained the buffer - if (requiredBufferCapacity <= buffer.capacity()) { - return new ArrayAggregateIterator( - query, + if (isArrayAggregateApplicable(config, query, dims, storageAdapter, buffer)) { + return new ArrayAggregateIterator( + query, config, cursor, buffer, fudgeTimestamp, dims, allSingleValueDims, - cardinality - ); - } - } + // There must be 0 or 1 dimension if isArrayAggregateApplicable() is true + dims.length == 0 ? 1 : storageAdapter.getDimensionCardinality(dims[0].getName()) + ); + } else { + return new HashAggregateIterator( + query, + config, + cursor, + buffer, + fudgeTimestamp, + dims, + allSingleValueDims + ); } - - return new HashAggregateIterator( - query, - config, - cursor, - buffer, - fudgeTimestamp, - dims, - allSingleValueDims - ); } @Override @@ -234,6 +207,52 @@ public void close() throws IOException ); } + private static boolean isArrayAggregateApplicable( + GroupByQueryConfig config, + GroupByQuery query, + GroupByColumnSelectorPlus[] dims, + StorageAdapter storageAdapter, + ByteBuffer buffer + ) + { + if (!config.isForceHashAggregation()) { + final ColumnCapabilities columnCapabilities; + final int cardinality; + + // Find cardinality + if (dims.length == 0) { + columnCapabilities = null; + cardinality = 1; + } else if (dims.length == 1) { + columnCapabilities = storageAdapter.getColumnCapabilities(dims[0].getName()); + cardinality = storageAdapter.getDimensionCardinality(dims[0].getName()); + } else { + columnCapabilities = null; + cardinality = -1; // ArrayAggregateIterator is not available + } + + // Choose array-based aggregation if the grouping key is a single string dimension of a + // known cardinality + if ((columnCapabilities == null || columnCapabilities.getType().equals(ValueType.STRING)) + && cardinality > 0) { + final AggregatorFactory[] aggregatorFactories = query + .getAggregatorSpecs() + .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]); + final int requiredBufferCapacity = BufferArrayGrouper.requiredBufferCapacity( + cardinality, + aggregatorFactories + ); + + // Check that all keys and aggregated values can be contained the buffer + if (requiredBufferCapacity <= buffer.capacity()) { + return true; + } + } + } + + return false; + } + private static class GroupByStrategyFactory implements ColumnSelectorStrategyFactory { @Override @@ -409,7 +428,7 @@ private static class HashAggregateIterator extends GroupByEngineIterator grouper) while (!cursor.isDone()) { if (!currentRowWasPartiallyAggregated) { // Set up stack, valuess, and first grouping in keyBuffer for this row - stackp = stack.length - 1; + stackPointer = stack.length - 1; for (int i = 0; i < dims.length; i++) { GroupByColumnSelectorStrategy strategy = dims[i].getColumnSelectorStrategy(); @@ -493,7 +512,7 @@ protected void aggregateMultiValueDims(Grouper grouper) // Aggregate groupings for this row boolean doAggregate = true; - while (stackp >= -1) { + while (stackPointer >= -1) { // Aggregate additional grouping for this row if (doAggregate) { keyBuffer.rewind(); @@ -505,17 +524,17 @@ protected void aggregateMultiValueDims(Grouper grouper) doAggregate = false; } - if (stackp >= 0) { - doAggregate = dims[stackp].getColumnSelectorStrategy().checkRowIndexAndAddValueToGroupingKey( - dims[stackp].getKeyBufferPosition(), - valuess[stackp], - stack[stackp], + if (stackPointer >= 0) { + doAggregate = dims[stackPointer].getColumnSelectorStrategy().checkRowIndexAndAddValueToGroupingKey( + dims[stackPointer].getKeyBufferPosition(), + valuess[stackPointer], + stack[stackPointer], keyBuffer ); if (doAggregate) { - stack[stackp]++; - for (int i = stackp + 1; i < stack.length; i++) { + stack[stackPointer]++; + for (int i = stackPointer + 1; i < stack.length; i++) { dims[i].getColumnSelectorStrategy().initGroupingKeyColumnValue( dims[i].getKeyBufferPosition(), i, @@ -524,12 +543,12 @@ protected void aggregateMultiValueDims(Grouper grouper) stack ); } - stackp = stack.length - 1; + stackPointer = stack.length - 1; } else { - stackp--; + stackPointer--; } } else { - stackp--; + stackPointer--; } } @@ -555,6 +574,8 @@ protected void putToMap(ByteBuffer key, Map map) private static class ArrayAggregateIterator extends GroupByEngineIterator { private final int cardinality; + + @Nullable private final GroupByColumnSelectorPlus dim; private IndexedInts multiValues; diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java index 6ca737251187..2ca544485eb2 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java @@ -29,7 +29,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; -import java.util.function.Function; +import java.util.function.ToIntFunction; /** * Groupers aggregate metrics from rows that they typically get from a ColumnSelectorFactory, under @@ -78,7 +78,7 @@ public interface Grouper extends Closeable default AggregateResult aggregate(KeyType key) { Preconditions.checkNotNull(key, "key"); - return aggregate(key, hashFunction().apply(key)); + return aggregate(key, hashFunction().applyAsInt(key)); } /** @@ -86,7 +86,7 @@ default AggregateResult aggregate(KeyType key) */ void reset(); - default Function hashFunction() + default ToIntFunction hashFunction() { return Groupers::hash; } From 218001b03db46e51b742a52f0648527ebebc6e1b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 3 Aug 2017 02:43:58 +0900 Subject: [PATCH 10/13] Address comments --- .../epinephelinae/BufferArrayGrouper.java | 5 +- .../epinephelinae/GroupByQueryEngineV2.java | 62 ++++++++++--------- 2 files changed, 34 insertions(+), 33 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index 6778e2672e28..9068bddc721f 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -72,8 +72,7 @@ static int requiredBufferCapacity( .mapToInt(AggregatorFactory::getMaxIntermediateSize) .sum(); - return Integer.BYTES + // grouping key size - getUsedFlagBufferCapacity(cardinalityWithMissingValue) + // total used flags size + return getUsedFlagBufferCapacity(cardinalityWithMissingValue) + // total used flags size cardinalityWithMissingValue * recordSize; // total values size } @@ -199,7 +198,7 @@ public void reset() final int usedFlagBufferCapacity = usedFlagBuffer.capacity(); // putLong() instead of put() can boost the performance of clearing the buffer - final int n = usedFlagBufferCapacity / Long.BYTES; + final int n = (usedFlagBufferCapacity / Long.BYTES) * Long.BYTES; for (int i = 0; i < n; i += Long.BYTES) { usedFlagBuffer.putLong(i, 0L); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index bc8312271048..cd8242972393 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -215,38 +215,40 @@ private static boolean isArrayAggregateApplicable( ByteBuffer buffer ) { - if (!config.isForceHashAggregation()) { - final ColumnCapabilities columnCapabilities; - final int cardinality; - - // Find cardinality - if (dims.length == 0) { - columnCapabilities = null; - cardinality = 1; - } else if (dims.length == 1) { - columnCapabilities = storageAdapter.getColumnCapabilities(dims[0].getName()); - cardinality = storageAdapter.getDimensionCardinality(dims[0].getName()); - } else { - columnCapabilities = null; - cardinality = -1; // ArrayAggregateIterator is not available - } + if (config.isForceHashAggregation()) { + return false; + } - // Choose array-based aggregation if the grouping key is a single string dimension of a - // known cardinality - if ((columnCapabilities == null || columnCapabilities.getType().equals(ValueType.STRING)) - && cardinality > 0) { - final AggregatorFactory[] aggregatorFactories = query - .getAggregatorSpecs() - .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]); - final int requiredBufferCapacity = BufferArrayGrouper.requiredBufferCapacity( - cardinality, - aggregatorFactories - ); + final ColumnCapabilities columnCapabilities; + final int cardinality; + + // Find cardinality + if (dims.length == 0) { + columnCapabilities = null; + cardinality = 1; + } else if (dims.length == 1) { + columnCapabilities = storageAdapter.getColumnCapabilities(dims[0].getName()); + cardinality = storageAdapter.getDimensionCardinality(dims[0].getName()); + } else { + columnCapabilities = null; + cardinality = -1; // ArrayAggregateIterator is not available + } - // Check that all keys and aggregated values can be contained the buffer - if (requiredBufferCapacity <= buffer.capacity()) { - return true; - } + // Choose array-based aggregation if the grouping key is a single string dimension of a + // known cardinality + if ((columnCapabilities == null || columnCapabilities.getType().equals(ValueType.STRING)) + && cardinality > 0) { + final AggregatorFactory[] aggregatorFactories = query + .getAggregatorSpecs() + .toArray(new AggregatorFactory[query.getAggregatorSpecs().size()]); + final int requiredBufferCapacity = BufferArrayGrouper.requiredBufferCapacity( + cardinality, + aggregatorFactories + ); + + // Check that all keys and aggregated values can be contained the buffer + if (requiredBufferCapacity <= buffer.capacity()) { + return true; } } From 4a55a3519fb0280e3c2991254715e11ba75142b4 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 3 Aug 2017 09:25:08 +0900 Subject: [PATCH 11/13] Cleanup GroupByQueryEngineV2.process --- .../util/common/guava/ConcatSequence.java | 4 +- .../java/util/common/guava/Sequence.java | 6 + .../java/util/common/guava/Sequences.java | 2 +- .../epinephelinae/GroupByQueryEngineV2.java | 113 +++++++----------- 4 files changed, 55 insertions(+), 70 deletions(-) diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/ConcatSequence.java b/java-util/src/main/java/io/druid/java/util/common/guava/ConcatSequence.java index 7479b7c0a952..efbab0213909 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/ConcatSequence.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/ConcatSequence.java @@ -31,10 +31,10 @@ public class ConcatSequence implements Sequence private final Sequence> baseSequences; public ConcatSequence( - Sequence> baseSequences + Sequence> baseSequences ) { - this.baseSequences = baseSequences; + this.baseSequences = (Sequence>) baseSequences; } @Override diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/Sequence.java b/java-util/src/main/java/io/druid/java/util/common/guava/Sequence.java index b3851e706c17..8a0cc69769a0 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/Sequence.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/Sequence.java @@ -21,6 +21,7 @@ import com.google.common.collect.Ordering; +import java.io.Closeable; import java.util.concurrent.Executor; import java.util.function.Function; @@ -80,4 +81,9 @@ default Sequence withEffect(Runnable effect, Executor effectExecutor) { return Sequences.withEffect(this, effect, effectExecutor); } + + default Sequence withBaggage(Closeable baggage) + { + return Sequences.withBaggage(this, baggage); + } } diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java b/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java index ccbe990c1ba8..8d9a77904a85 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/Sequences.java @@ -75,7 +75,7 @@ public static Sequence concat(Iterable> sequences) return concat(Sequences.simple(sequences)); } - public static Sequence concat(Sequence> sequences) + public static Sequence concat(Sequence> sequences) { return new ConcatSequence<>(sequences); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 9010460b86bb..3734825dc584 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -19,7 +19,6 @@ package io.druid.query.groupby.epinephelinae; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Suppliers; @@ -31,7 +30,6 @@ import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.BaseSequence; -import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.ColumnSelectorPlus; @@ -62,7 +60,6 @@ import javax.annotation.Nullable; import java.io.Closeable; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -135,75 +132,57 @@ public static Sequence process( : new DateTime(Long.parseLong(fudgeTimestampString)); return Sequences.concat( - Sequences.withBaggage( - Sequences.map( - cursors, - new Function>() + cursors.map( + cursor -> new BaseSequence<>( + new BaseSequence.IteratorMaker>() { @Override - public Sequence apply(final Cursor cursor) + public GroupByEngineIterator make() { - return new BaseSequence<>( - new BaseSequence.IteratorMaker>() - { - @Override - public GroupByEngineIterator make() - { - ColumnSelectorPlus[] selectorPlus = DimensionHandlerUtils - .createColumnSelectorPluses( - STRATEGY_FACTORY, - query.getDimensions(), - cursor - ); - GroupByColumnSelectorPlus[] dims = createGroupBySelectorPlus(selectorPlus); - - final ByteBuffer buffer = bufferHolder.get(); - - // Check array-based aggregation is applicable - if (isArrayAggregateApplicable(config, query, dims, storageAdapter, buffer)) { - return new ArrayAggregateIterator( - query, - config, - cursor, - buffer, - fudgeTimestamp, - dims, - allSingleValueDims, - // There must be 0 or 1 dimension if isArrayAggregateApplicable() is true - dims.length == 0 ? 1 : storageAdapter.getDimensionCardinality(dims[0].getName()) - ); - } else { - return new HashAggregateIterator( - query, - config, - cursor, - buffer, - fudgeTimestamp, - dims, - allSingleValueDims - ); - } - } - - @Override - public void cleanup(GroupByEngineIterator iterFromMake) - { - iterFromMake.close(); - } - } - ); + ColumnSelectorPlus[] selectorPlus = DimensionHandlerUtils + .createColumnSelectorPluses( + STRATEGY_FACTORY, + query.getDimensions(), + cursor + ); + GroupByColumnSelectorPlus[] dims = createGroupBySelectorPlus(selectorPlus); + + final ByteBuffer buffer = bufferHolder.get(); + + // Check array-based aggregation is applicable + if (isArrayAggregateApplicable(config, query, dims, storageAdapter, buffer)) { + return new ArrayAggregateIterator( + query, + config, + cursor, + buffer, + fudgeTimestamp, + dims, + allSingleValueDims, + // There must be 0 or 1 dimension if isArrayAggregateApplicable() is true + dims.length == 0 ? 1 : storageAdapter.getDimensionCardinality(dims[0].getName()) + ); + } else { + return new HashAggregateIterator( + query, + config, + cursor, + buffer, + fudgeTimestamp, + dims, + allSingleValueDims + ); + } + } + + @Override + public void cleanup(GroupByEngineIterator iterFromMake) + { + iterFromMake.close(); } } - ), - new Closeable() - { - @Override - public void close() throws IOException - { - CloseQuietly.close(bufferHolder); - } - } - ) + ) + ).withBaggage(bufferHolder) ); } From e6a2c3f73f6de9a3cb4c62d7a126ea9630eb222a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 3 Aug 2017 09:39:29 +0900 Subject: [PATCH 12/13] Change to Byte.SIZE --- .../druid/query/groupby/epinephelinae/BufferArrayGrouper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index 9068bddc721f..a17761090c14 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -81,7 +81,7 @@ static int requiredBufferCapacity( */ private static int getUsedFlagBufferCapacity(int cardinalityWithMissingValue) { - return (cardinalityWithMissingValue + 7) / 8; + return (cardinalityWithMissingValue + Byte.SIZE - 1) / Byte.SIZE; } public BufferArrayGrouper( From f1186d0b2f92f1cf6d079c13234eacfc3e22f25c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 4 Aug 2017 00:00:16 +0900 Subject: [PATCH 13/13] Add flatMap --- .../java/util/common/guava/Sequence.java | 7 ++ .../epinephelinae/GroupByQueryEngineV2.java | 103 +++++++++--------- 2 files changed, 57 insertions(+), 53 deletions(-) diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/Sequence.java b/java-util/src/main/java/io/druid/java/util/common/guava/Sequence.java index 8a0cc69769a0..20fab62adc20 100644 --- a/java-util/src/main/java/io/druid/java/util/common/guava/Sequence.java +++ b/java-util/src/main/java/io/druid/java/util/common/guava/Sequence.java @@ -69,6 +69,13 @@ default Sequence map(Function mapper) return new MappedSequence<>(this, mapper); } + default Sequence flatMap( + Function> mapper + ) + { + return new ConcatSequence<>(this.map(mapper)); + } + default Sequence flatMerge( Function> mapper, Ordering ordering diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 3734825dc584..417e7d47b1e1 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -31,7 +31,6 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; -import io.druid.java.util.common.guava.Sequences; import io.druid.query.ColumnSelectorPlus; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.ColumnSelectorStrategyFactory; @@ -131,59 +130,57 @@ public static Sequence process( ? null : new DateTime(Long.parseLong(fudgeTimestampString)); - return Sequences.concat( - cursors.map( - cursor -> new BaseSequence<>( - new BaseSequence.IteratorMaker>() - { - @Override - public GroupByEngineIterator make() - { - ColumnSelectorPlus[] selectorPlus = DimensionHandlerUtils - .createColumnSelectorPluses( - STRATEGY_FACTORY, - query.getDimensions(), - cursor - ); - GroupByColumnSelectorPlus[] dims = createGroupBySelectorPlus(selectorPlus); - - final ByteBuffer buffer = bufferHolder.get(); - - // Check array-based aggregation is applicable - if (isArrayAggregateApplicable(config, query, dims, storageAdapter, buffer)) { - return new ArrayAggregateIterator( - query, - config, - cursor, - buffer, - fudgeTimestamp, - dims, - allSingleValueDims, - // There must be 0 or 1 dimension if isArrayAggregateApplicable() is true - dims.length == 0 ? 1 : storageAdapter.getDimensionCardinality(dims[0].getName()) - ); - } else { - return new HashAggregateIterator( - query, - config, - cursor, - buffer, - fudgeTimestamp, - dims, - allSingleValueDims - ); - } - } - - @Override - public void cleanup(GroupByEngineIterator iterFromMake) - { - iterFromMake.close(); - } + return cursors.flatMap( + cursor -> new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public GroupByEngineIterator make() + { + ColumnSelectorPlus[] selectorPlus = DimensionHandlerUtils + .createColumnSelectorPluses( + STRATEGY_FACTORY, + query.getDimensions(), + cursor + ); + GroupByColumnSelectorPlus[] dims = createGroupBySelectorPlus(selectorPlus); + + final ByteBuffer buffer = bufferHolder.get(); + + // Check array-based aggregation is applicable + if (isArrayAggregateApplicable(config, query, dims, storageAdapter, buffer)) { + return new ArrayAggregateIterator( + query, + config, + cursor, + buffer, + fudgeTimestamp, + dims, + allSingleValueDims, + // There must be 0 or 1 dimension if isArrayAggregateApplicable() is true + dims.length == 0 ? 1 : storageAdapter.getDimensionCardinality(dims[0].getName()) + ); + } else { + return new HashAggregateIterator( + query, + config, + cursor, + buffer, + fudgeTimestamp, + dims, + allSingleValueDims + ); } - ) - ).withBaggage(bufferHolder) - ); + } + + @Override + public void cleanup(GroupByEngineIterator iterFromMake) + { + iterFromMake.close(); + } + } + ) + ).withBaggage(bufferHolder); } private static boolean isArrayAggregateApplicable(