diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java
index ebe834890348..81cedc4e1da8 100644
--- a/common/src/main/java/io/druid/collections/BlockingPool.java
+++ b/common/src/main/java/io/druid/collections/BlockingPool.java
@@ -19,6 +19,7 @@
package io.druid.collections;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
@@ -78,4 +79,10 @@ public void close() throws IOException
}
);
}
+
+ @VisibleForTesting
+ protected int getQueueSize()
+ {
+ return objects.size();
+ }
}
diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md
index a33da89f3219..d455c27e3902 100644
--- a/docs/content/querying/groupbyquery.md
+++ b/docs/content/querying/groupbyquery.md
@@ -157,10 +157,10 @@ inner query's results stream with off-heap fact map and on-heap string dictionar
strategy perform the outer query on the broker in a single-threaded fashion.
Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
-With the v2 groupBy strategy, this can potentially lead to deadlocks for groupBys nested beyond two layers, since the
-merge buffers are limited in number and are acquired one-by-one and not as a complete set. At this time we recommend
-that you avoid deeply-nested groupBys with the v2 strategy. Doubly-nested groupBys (groupBy -> groupBy -> table) are
-safe and do not suffer from this issue.
+This merge buffer is immediately released once they are not used anymore during the query processing. However, deeply
+nested groupBys (there are two or more groupBy layers beyond the first one) can potentially lead to deadlocks since the
+merge buffers are limited in number and are acquired one-by-one instead of a complete set. At this time, we recommend
+that you avoid too many concurrent execution of deeply nested groupBys with the v2 strategy.
#### Server configuration
@@ -185,7 +185,8 @@ When using the "v2" strategy, the following runtime properties apply:
Additionally, the "v2" strategy uses merging buffers for merging. It is currently the only query implementation that
does so. By default, Druid is configured without any merging buffer pool, so to use the "v2" strategy you must also
-set `druid.processing.numMergeBuffers` to some non-zero number.
+set `druid.processing.numMergeBuffers` to some non-zero number. Furthermore, if you want to execute deeply nested groupBys,
+you must set `druid.processing.numMergeBuffers` to at least 2.
This may require allocating more direct memory. The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md
index 1bec7880f7cd..36d0f7da739c 100644
--- a/docs/content/querying/sql.md
+++ b/docs/content/querying/sql.md
@@ -117,11 +117,10 @@ SELECT COUNT(*) FROM (SELECT DISTINCT col FROM data_source)
```
Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy.
-With the v2 groupBy strategy, this can potentially lead to deadlocks for groupBys nested beyond two layers, since the
-merge buffers are limited in number and are acquired one-by-one and not as a complete set. At this time we recommend
-that you avoid deeply-nested groupBys with the v2 strategy. Doubly-nested groupBys (groupBy -> groupBy -> table) are
-safe and do not suffer from this issue. If you like, you can forbid deeper nesting by setting
-`druid.sql.planner.maxQueryCount = 2`.
+This merge buffer is immediately released once they are not used anymore during the query processing. However, deeply
+nested groupBys (there are two or more groupBy layers beyond the first one) can potentially lead to deadlocks since the
+merge buffers are limited in number and are acquired one-by-one instead of a complete set. At this time, we recommend
+that you avoid too many concurrent execution of deeply nested groupBys with the v2 strategy.
#### Semi-joins
diff --git a/java-util/src/main/java/io/druid/java/util/common/guava/Sequence.java b/java-util/src/main/java/io/druid/java/util/common/guava/Sequence.java
index 4b287e937bdd..4cd28d4d6c32 100644
--- a/java-util/src/main/java/io/druid/java/util/common/guava/Sequence.java
+++ b/java-util/src/main/java/io/druid/java/util/common/guava/Sequence.java
@@ -20,20 +20,41 @@
package io.druid.java.util.common.guava;
/**
- * A Sequence represents an iterable sequence of elements. Unlike normal Iterators however, it doesn't expose
+ * A Sequence represents an iterable sequence of elements. Unlike normal Iterators however, it doesn't expose
* a way for you to extract values from it, instead you provide it with a worker (an Accumulator) and that defines
* what happens with the data.
- *
- * This inversion of control is in place to allow the Sequence to do resource management. It can enforce that close()
- * methods get called and other resources get cleaned up whenever processing is complete. Without this inversion
+ *
+ * This inversion of control is in place to allow the Sequence to do resource management. It can enforce that close()
+ * methods get called and other resources get cleaned up whenever processing is complete. Without this inversion
* it is very easy to unintentionally leak resources when iterating over something that is backed by a resource.
- *
+ *
* Sequences also expose {#see com.metamx.common.guava.Yielder} Yielder objects which allow you to implement a
- * continuation over the Sequence. Yielder do not offer the same guarantees of automagic resource management
+ * continuation over the Sequence. Yielder do not offer the same guarantees of automatic resource management
* as the accumulate method, but they are Closeable and will do the proper cleanup when close() is called on them.
*/
public interface Sequence
{
- public OutType accumulate(OutType initValue, Accumulator accumulator);
- public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator);
+ /**
+ * Accumulate this sequence using the given accumulator.
+ *
+ * @param initValue the initial value to pass along to start the accumulation.
+ * @param accumulator the accumulator which is responsible for accumulating input values.
+ * @param the type of accumulated value.
+ *
+ * @return accumulated value.
+ */
+ OutType accumulate(OutType initValue, Accumulator accumulator);
+
+ /**
+ * Return an Yielder for accumulated sequence.
+ *
+ * @param initValue the initial value to pass along to start the accumulation.
+ * @param accumulator the accumulator which is responsible for accumulating input values.
+ * @param the type of accumulated value.
+ *
+ * @return an Yielder for accumulated sequence.
+ *
+ * @see Yielder
+ */
+ Yielder toYielder(OutType initValue, YieldingAccumulator accumulator);
}
diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java
index a2dcd4ed815b..4987d34b6b14 100644
--- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java
+++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java
@@ -19,6 +19,7 @@
package io.druid.query.groupby.epinephelinae;
+import com.google.common.base.Supplier;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
@@ -64,17 +65,19 @@ public class BufferGrouper implements Grouper
private static final float DEFAULT_MAX_LOAD_FACTOR = 0.7f;
private static final int HASH_SIZE = Ints.BYTES;
- private final ByteBuffer buffer;
+ private final Supplier bufferSupplier;
private final KeySerde keySerde;
private final int keySize;
private final BufferAggregator[] aggregators;
private final int[] aggregatorOffsets;
private final int initialBuckets;
private final int bucketSize;
- private final int tableArenaSize;
private final int bufferGrouperMaxSize; // Integer.MAX_VALUE in production, only used for unit tests
private final float maxLoadFactor;
+ private ByteBuffer buffer;
+ private int tableArenaSize = -1;
+
// Buffer pointing to the current table (it moves around as the table grows)
private ByteBuffer tableBuffer;
@@ -90,8 +93,10 @@ public class BufferGrouper implements Grouper
// Maximum number of elements in the table before it must be resized
private int maxSize;
+ private boolean initialized = false;
+
public BufferGrouper(
- final ByteBuffer buffer,
+ final Supplier bufferSupplier,
final KeySerde keySerde,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
@@ -100,7 +105,7 @@ public BufferGrouper(
final int initialBuckets
)
{
- this.buffer = buffer;
+ this.bufferSupplier = bufferSupplier;
this.keySerde = keySerde;
this.keySize = keySerde.keySize();
this.aggregators = new BufferAggregator[aggregatorFactories.length];
@@ -121,9 +126,23 @@ public BufferGrouper(
}
this.bucketSize = offset;
- this.tableArenaSize = (buffer.capacity() / (bucketSize + Ints.BYTES)) * bucketSize;
+ }
+
+ @Override
+ public void init()
+ {
+ if (!initialized) {
+ this.buffer = bufferSupplier.get();
+ this.tableArenaSize = (buffer.capacity() / (bucketSize + Ints.BYTES)) * bucketSize;
+ reset();
+ initialized = true;
+ }
+ }
- reset();
+ @Override
+ public boolean isInitialized()
+ {
+ return initialized;
}
@Override
diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
index 5072b5d5be97..27b1016d0a63 100644
--- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
+++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
@@ -21,6 +21,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import io.druid.java.util.common.ISE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
@@ -34,7 +36,7 @@
/**
* Grouper based around a set of underlying {@link SpillingGrouper} instances. Thread-safe.
- *
+ *
* The passed-in buffer is cut up into concurrencyHint slices, and each slice is passed to a different underlying
* grouper. Access to each slice is separately synchronized. As long as the result set fits in memory, keys are
* partitioned between buffers based on their hash, and multiple threads can write into the same buffer. When
@@ -50,8 +52,21 @@ public class ConcurrentGrouper implements Grouper
private volatile boolean closed = false;
private final Comparator keyObjComparator;
+ private final Supplier bufferSupplier;
+ private final ColumnSelectorFactory columnSelectorFactory;
+ private final AggregatorFactory[] aggregatorFactories;
+ private final int bufferGrouperMaxSize;
+ private final float bufferGrouperMaxLoadFactor;
+ private final int bufferGrouperInitialBuckets;
+ private final LimitedTemporaryStorage temporaryStorage;
+ private final ObjectMapper spillMapper;
+ private final int concurrencyHint;
+ private final KeySerdeFactory keySerdeFactory;
+
+ private volatile boolean initialized = false;
+
public ConcurrentGrouper(
- final ByteBuffer buffer,
+ final Supplier bufferSupplier,
final KeySerdeFactory keySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
@@ -75,34 +90,67 @@ protected SpillingGrouper initialValue()
}
};
- final int sliceSize = (buffer.capacity() / concurrencyHint);
-
- for (int i = 0; i < concurrencyHint; i++) {
- final ByteBuffer slice = buffer.duplicate();
- slice.position(sliceSize * i);
- slice.limit(slice.position() + sliceSize);
- groupers.add(
- new SpillingGrouper<>(
- slice.slice(),
- keySerdeFactory,
- columnSelectorFactory,
- aggregatorFactories,
- bufferGrouperMaxSize,
- bufferGrouperMaxLoadFactor,
- bufferGrouperInitialBuckets,
- temporaryStorage,
- spillMapper,
- false
- )
- );
+ this.bufferSupplier = bufferSupplier;
+ this.columnSelectorFactory = columnSelectorFactory;
+ this.aggregatorFactories = aggregatorFactories;
+ this.bufferGrouperMaxSize = bufferGrouperMaxSize;
+ this.bufferGrouperMaxLoadFactor = bufferGrouperMaxLoadFactor;
+ this.bufferGrouperInitialBuckets = bufferGrouperInitialBuckets;
+ this.temporaryStorage = temporaryStorage;
+ this.spillMapper = spillMapper;
+ this.concurrencyHint = concurrencyHint;
+ this.keySerdeFactory = keySerdeFactory;
+ this.keyObjComparator = keySerdeFactory.objectComparator();
+ }
+
+ @Override
+ public void init()
+ {
+ if (!initialized) {
+ synchronized (bufferSupplier) {
+ if (!initialized) {
+ final ByteBuffer buffer = bufferSupplier.get();
+ final int sliceSize = (buffer.capacity() / concurrencyHint);
+
+ for (int i = 0; i < concurrencyHint; i++) {
+ final ByteBuffer slice = buffer.duplicate();
+ slice.position(sliceSize * i);
+ slice.limit(slice.position() + sliceSize);
+ final SpillingGrouper grouper = new SpillingGrouper<>(
+ Suppliers.ofInstance(slice.slice()),
+ keySerdeFactory,
+ columnSelectorFactory,
+ aggregatorFactories,
+ bufferGrouperMaxSize,
+ bufferGrouperMaxLoadFactor,
+ bufferGrouperInitialBuckets,
+ temporaryStorage,
+ spillMapper,
+ false
+ );
+ grouper.init();
+ groupers.add(grouper);
+ }
+
+ initialized = true;
+ }
+ }
}
+ }
- this.keyObjComparator = keySerdeFactory.objectComparator();
+ @Override
+ public boolean isInitialized()
+ {
+ return initialized;
}
@Override
public boolean aggregate(KeyType key, int keyHash)
{
+ if (!initialized) {
+ throw new ISE("Grouper is not initialized");
+ }
+
if (closed) {
throw new ISE("Grouper is closed");
}
@@ -139,6 +187,10 @@ public boolean aggregate(KeyType key)
@Override
public void reset()
{
+ if (!initialized) {
+ throw new ISE("Grouper is not initialized");
+ }
+
if (closed) {
throw new ISE("Grouper is closed");
}
@@ -153,6 +205,10 @@ public void reset()
@Override
public Iterator> iterator(final boolean sorted)
{
+ if (!initialized) {
+ throw new ISE("Grouper is not initialized");
+ }
+
if (closed) {
throw new ISE("Grouper is closed");
}
diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
index a4b68cc7c680..ab06082c69c4 100644
--- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
+++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Predicates;
+import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@@ -69,7 +70,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-public class GroupByMergingQueryRunnerV2 implements QueryRunner
+public class GroupByMergingQueryRunnerV2 implements QueryRunner
{
private static final Logger log = new Logger(GroupByMergingQueryRunnerV2.class);
private static final String CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION = "mergeRunnersUsingChainedExecution";
@@ -181,7 +182,7 @@ public CloseableGrouperIterator make()
false,
null,
config,
- mergeBufferHolder.get(),
+ Suppliers.ofInstance(mergeBufferHolder.get()),
concurrencyHint,
temporaryStorage,
spillMapper,
@@ -189,6 +190,7 @@ public CloseableGrouperIterator make()
);
final Grouper grouper = pair.lhs;
final Accumulator, Row> accumulator = pair.rhs;
+ grouper.init();
final ReferenceCountingResourceHolder> grouperHolder =
ReferenceCountingResourceHolder.fromCloseable(grouper);
diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
index 5aa1d7bd790c..5106fa3e5a59 100644
--- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
+++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java
@@ -21,6 +21,7 @@
import com.google.common.base.Function;
import com.google.common.base.Strings;
+import com.google.common.base.Suppliers;
import com.google.common.collect.Maps;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
@@ -259,7 +260,7 @@ public Row next()
}
final Grouper grouper = new BufferGrouper<>(
- buffer,
+ Suppliers.ofInstance(buffer),
keySerde,
cursor,
query.getAggregatorSpecs()
@@ -268,6 +269,7 @@ public Row next()
querySpecificConfig.getBufferGrouperMaxLoadFactor(),
querySpecificConfig.getBufferGrouperInitialBuckets()
);
+ grouper.init();
outer:
while (!cursor.isDone()) {
diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
index daf6576dc72a..1736837d3e6c 100644
--- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
+++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
@@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import io.druid.collections.BlockingPool;
import io.druid.collections.ReferenceCountingResourceHolder;
@@ -114,6 +115,7 @@ public boolean apply(Row input)
for (Interval queryInterval : queryIntervals) {
if (queryInterval.contains(rowTime)) {
inInterval = true;
+ break;
}
}
if (!inInterval) {
@@ -141,24 +143,33 @@ public CloseableGrouperIterator make()
closeOnFailure.add(temporaryStorage);
- final ReferenceCountingResourceHolder mergeBufferHolder;
- try {
- // This will potentially block if there are no merge buffers left in the pool.
- if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
- throw new QueryInterruptedException(new TimeoutException());
- }
- closeOnFailure.add(mergeBufferHolder);
- }
- catch (InterruptedException e) {
- throw new QueryInterruptedException(e);
- }
+ final SettableSupplier> bufferHolderSupplier = new SettableSupplier<>();
Pair, Accumulator, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
query,
true,
rowSignature,
querySpecificConfig,
- mergeBufferHolder.get(),
+ new Supplier()
+ {
+ @Override
+ public ByteBuffer get()
+ {
+ final ReferenceCountingResourceHolder mergeBufferHolder;
+ try {
+ if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
+ throw new QueryInterruptedException(new TimeoutException());
+ }
+ bufferHolderSupplier.set(mergeBufferHolder);
+ closeOnFailure.add(mergeBufferHolder);
+
+ return mergeBufferHolder.get();
+ }
+ catch (InterruptedException e) {
+ throw new QueryInterruptedException(e);
+ }
+ }
+ },
-1,
temporaryStorage,
spillMapper,
@@ -168,7 +179,10 @@ public CloseableGrouperIterator make()
final Accumulator, Row> accumulator = pair.rhs;
closeOnFailure.add(grouper);
- final Grouper retVal = filteredSequence.accumulate(grouper, accumulator);
+ final Grouper retVal = filteredSequence.accumulate(
+ grouper,
+ accumulator
+ );
if (retVal != grouper) {
throw new ResourceLimitExceededException("Grouping resources exhausted");
}
@@ -182,7 +196,7 @@ public CloseableGrouperIterator make()
public void close() throws IOException
{
grouper.close();
- mergeBufferHolder.close();
+ CloseQuietly.close(bufferHolderSupplier.get());
CloseQuietly.close(temporaryStorage);
}
}
diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java
index 922fb61c3b18..2417e8961bc6 100644
--- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java
+++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java
@@ -32,7 +32,7 @@
* Groupers aggregate metrics from rows that they typically get from a ColumnSelectorFactory, under
* grouping keys that some outside driver is passing in. They can also iterate over the grouped
* rows after the aggregation is done.
- *
+ *
* They work sort of like a map of KeyType to aggregated values, except they don't support
* random lookups.
*
@@ -40,6 +40,19 @@
*/
public interface Grouper extends Closeable
{
+ /**
+ * Initialize the grouper.
+ * This method needs to be called before calling {@link #aggregate(Object)} and {@link #aggregate(Object, int)}.
+ */
+ void init();
+
+ /**
+ * Check this grouper is initialized or not.
+ *
+ * @return true if the grouper is already initialized, otherwise false.
+ */
+ boolean isInitialized();
+
/**
* Aggregate the current row with the provided key. Some implementations are thread-safe and
* some are not.
@@ -74,11 +87,11 @@ public interface Grouper extends Closeable
/**
* Iterate through entries. If a comparator is provided, do a sorted iteration.
- *
+ *
* Once this method is called, writes are no longer safe. After you are done with the iterator returned by this
* method, you should either call {@link #close()} (if you are done with the Grouper), {@link #reset()} (if you
* want to reuse it), or {@link #iterator(boolean)} again if you want another iterator.
- *
+ *
* If "sorted" is true then the iterator will return sorted results. It will use KeyType's natural ordering on
* deserialized objects, and will use the {@link KeySerde#comparator()} on serialized objects. Woe be unto you
* if these comparators are not equivalent.
@@ -188,7 +201,7 @@ interface KeySerde
/**
* Serialize a key. This will be called by the {@link #aggregate(Comparable)} method. The buffer will not
* be retained after the aggregate method returns, so reusing buffers is OK.
- *
+ *
* This method may return null, which indicates that some internal resource limit has been reached and
* no more keys can be generated. In this situation you can call {@link #reset()} and try again, although
* beware the caveats on that method.
diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
index 350163f56f40..7814c6f9d8b0 100644
--- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
+++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java
@@ -55,6 +55,7 @@
import io.druid.segment.data.IndexedInts;
import org.joda.time.DateTime;
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -76,7 +77,7 @@ public static Pair, Accumulator, Row>>
final boolean isInputRaw,
final Map rawInputRowSignature,
final GroupByQueryConfig config,
- final ByteBuffer buffer,
+ final Supplier bufferSupplier,
final int concurrencyHint,
final LimitedTemporaryStorage temporaryStorage,
final ObjectMapper spillMapper,
@@ -105,7 +106,7 @@ public static Pair, Accumulator, Row>>
final Grouper grouper;
if (concurrencyHint == -1) {
grouper = new SpillingGrouper<>(
- buffer,
+ bufferSupplier,
keySerdeFactory,
columnSelectorFactory,
aggregatorFactories,
@@ -118,7 +119,7 @@ public static Pair, Accumulator, Row>>
);
} else {
grouper = new ConcurrentGrouper<>(
- buffer,
+ bufferSupplier,
keySerdeFactory,
columnSelectorFactory,
aggregatorFactories,
@@ -131,16 +132,15 @@ public static Pair, Accumulator, Row>>
);
}
- final Supplier[] inputRawSuppliers;
- if (isInputRaw) {
- inputRawSuppliers = getValueSuppliersForDimensions(
- columnSelectorFactory,
- query.getDimensions(),
- rawInputRowSignature
- );
- } else {
- inputRawSuppliers = null;
- }
+ final int keySize = includeTimestamp ? query.getDimensions().size() + 1 : query.getDimensions().size();
+ final ValueExtractFunction valueExtractFn = makeValueExtractFunction(
+ query,
+ isInputRaw,
+ includeTimestamp,
+ columnSelectorFactory,
+ rawInputRowSignature,
+ valueTypes
+ );
final Accumulator, Row> accumulator = new Accumulator, Row>()
{
@@ -159,62 +159,14 @@ public Grouper accumulate(
return null;
}
- columnSelectorRow.set(row);
-
- final int dimStart;
- final Comparable[] key;
-
- if (includeTimestamp) {
- key = new Comparable[query.getDimensions().size() + 1];
-
- final long timestamp;
- if (isInputRaw) {
- if (query.getGranularity() instanceof AllGranularity) {
- timestamp = query.getIntervals().get(0).getStartMillis();
- } else {
- timestamp = query.getGranularity().truncate(row.getTimestampFromEpoch());
- }
- } else {
- timestamp = row.getTimestampFromEpoch();
- }
-
- key[0] = timestamp;
- dimStart = 1;
- } else {
- key = new Comparable[query.getDimensions().size()];
- dimStart = 0;
+ if (!theGrouper.isInitialized()) {
+ theGrouper.init();
}
- for (int i = dimStart; i < key.length; i++) {
- final ValueType type = valueTypes.get(i - dimStart);
- Object valObj;
- if (isInputRaw) {
- valObj = inputRawSuppliers[i - dimStart].get();
- } else {
- valObj = row.getRaw(query.getDimensions().get(i - dimStart).getOutputName());
- }
- // convert values to the output type specified by the DimensionSpec, for merging purposes
- switch (type) {
- case STRING:
- valObj = valObj == null ? "" : valObj.toString();
- break;
- case LONG:
- valObj = DimensionHandlerUtils.convertObjectToLong(valObj);
- if (valObj == null) {
- valObj = 0L;
- }
- break;
- case FLOAT:
- valObj = DimensionHandlerUtils.convertObjectToFloat(valObj);
- if (valObj == null) {
- valObj = 0.0f;
- }
- break;
- default:
- throw new IAE("invalid type: [%s]", type);
- }
- key[i] = (Comparable) valObj;
- }
+ columnSelectorRow.set(row);
+
+ final Comparable[] key = new Comparable[keySize];
+ valueExtractFn.apply(row, key);
final boolean didAggregate = theGrouper.aggregate(new RowBasedKey(key));
if (!didAggregate) {
@@ -230,6 +182,135 @@ public Grouper accumulate(
return new Pair<>(grouper, accumulator);
}
+ private interface TimestampExtractFunction
+ {
+ long apply(Row row);
+ }
+
+ private static TimestampExtractFunction makeTimestampExtractFunction(
+ final GroupByQuery query,
+ final boolean isInputRaw
+ )
+ {
+ if (isInputRaw) {
+ if (query.getGranularity() instanceof AllGranularity) {
+ return new TimestampExtractFunction()
+ {
+ @Override
+ public long apply(Row row)
+ {
+ return query.getIntervals().get(0).getStartMillis();
+ }
+ };
+ } else {
+ return new TimestampExtractFunction()
+ {
+ @Override
+ public long apply(Row row)
+ {
+ return query.getGranularity().truncate(row.getTimestampFromEpoch());
+ }
+ };
+ }
+ } else {
+ return new TimestampExtractFunction()
+ {
+ @Override
+ public long apply(Row row)
+ {
+ return row.getTimestampFromEpoch();
+ }
+ };
+ }
+ }
+
+ private interface ValueExtractFunction
+ {
+ Comparable[] apply(Row row, Comparable[] key);
+ }
+
+ private static ValueExtractFunction makeValueExtractFunction(
+ final GroupByQuery query,
+ final boolean isInputRaw,
+ final boolean includeTimestamp,
+ final ColumnSelectorFactory columnSelectorFactory,
+ final Map rawInputRowSignature,
+ final List valueTypes
+ )
+ {
+ final TimestampExtractFunction timestampExtractFn = includeTimestamp ?
+ makeTimestampExtractFunction(query, isInputRaw) :
+ null;
+
+ final Function[] valueConvertFns = makeValueConvertFunctions(valueTypes);
+
+ if (isInputRaw) {
+ final Supplier[] inputRawSuppliers = getValueSuppliersForDimensions(
+ columnSelectorFactory,
+ query.getDimensions(),
+ rawInputRowSignature
+ );
+
+ if (includeTimestamp) {
+ return new ValueExtractFunction()
+ {
+ @Override
+ public Comparable[] apply(Row row, Comparable[] key)
+ {
+ key[0] = timestampExtractFn.apply(row);
+ for (int i = 1; i < key.length; i++) {
+ final Comparable val = inputRawSuppliers[i - 1].get();
+ key[i] = valueConvertFns[i - 1].apply(val);
+ }
+ return key;
+ }
+ };
+ } else {
+ return new ValueExtractFunction()
+ {
+ @Override
+ public Comparable[] apply(Row row, Comparable[] key)
+ {
+ for (int i = 0; i < key.length; i++) {
+ final Comparable val = inputRawSuppliers[i].get();
+ key[i] = valueConvertFns[i].apply(val);
+ }
+ return key;
+ }
+ };
+ }
+ } else {
+ if (includeTimestamp) {
+ return new ValueExtractFunction()
+ {
+ @Override
+ public Comparable[] apply(Row row, Comparable[] key)
+ {
+ key[0] = timestampExtractFn.apply(row);
+ for (int i = 1; i < key.length; i++) {
+ final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i - 1).getOutputName());
+ key[i] = valueConvertFns[i - 1].apply(val);
+ }
+ return key;
+ }
+ };
+ } else {
+ return new ValueExtractFunction()
+ {
+ @Override
+ public Comparable[] apply(Row row, Comparable[] key)
+ {
+ for (int i = 0; i < key.length; i++) {
+ final Comparable val = (Comparable) row.getRaw(query.getDimensions().get(i).getOutputName());
+ key[i] = valueConvertFns[i].apply(val);
+ }
+ return key;
+ }
+ };
+ }
+ }
+ }
+
public static CloseableGrouperIterator makeGrouperIterator(
final Grouper grouper,
final GroupByQuery query,
@@ -340,7 +421,8 @@ public String toString()
}
}
- private static Supplier[] getValueSuppliersForDimensions(
+ @SuppressWarnings("unchecked")
+ private static Supplier[] getValueSuppliersForDimensions(
final ColumnSelectorFactory columnSelectorFactory,
final List dimensions,
final Map rawInputRowSignature
@@ -360,10 +442,10 @@ private static Supplier[] getValueSuppliersForDimensions(
}
switch (type) {
case STRING:
- inputRawSuppliers[i] = new Supplier()
+ inputRawSuppliers[i] = new Supplier()
{
@Override
- public Object get()
+ public Comparable get()
{
final String value;
IndexedInts index = ((DimensionSelector) selector).getRow();
@@ -375,20 +457,20 @@ public Object get()
};
break;
case LONG:
- inputRawSuppliers[i] = new Supplier()
+ inputRawSuppliers[i] = new Supplier()
{
@Override
- public Object get()
+ public Comparable get()
{
return ((LongColumnSelector) selector).get();
}
};
break;
case FLOAT:
- inputRawSuppliers[i] = new Supplier()
+ inputRawSuppliers[i] = new Supplier()
{
@Override
- public Object get()
+ public Comparable get()
{
return ((FloatColumnSelector) selector).get();
}
@@ -401,6 +483,74 @@ public Object get()
return inputRawSuppliers;
}
+ @SuppressWarnings("unchecked")
+ private static Function[] makeValueConvertFunctions(
+ final Map rawInputRowSignature,
+ final List dimensions
+ )
+ {
+ final List valueTypes = Lists.newArrayListWithCapacity(dimensions.size());
+ for (DimensionSpec dimensionSpec : dimensions) {
+ final ValueType valueType = rawInputRowSignature.get(dimensionSpec);
+ valueTypes.add(valueType == null ? ValueType.STRING : valueType);
+ }
+ return makeValueConvertFunctions(valueTypes);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Function[] makeValueConvertFunctions(
+ final List valueTypes
+ )
+ {
+ final Function[] functions = new Function[valueTypes.size()];
+ for (int i = 0; i < functions.length; i++) {
+ ValueType type = valueTypes.get(i);
+ // Subquery post-aggs aren't added to the rowSignature (see rowSignatureFor() in GroupByQueryHelper) because
+ // their types aren't known, so default to String handling.
+ type = type == null ? ValueType.STRING : type;
+ switch (type) {
+ case STRING:
+ functions[i] = new Function()
+ {
+ @Override
+ public Comparable apply(@Nullable Comparable input)
+ {
+ return input == null ? "" : input.toString();
+ }
+ };
+ break;
+
+ case LONG:
+ functions[i] = new Function()
+ {
+ @Override
+ public Comparable apply(@Nullable Comparable input)
+ {
+ final Long val = DimensionHandlerUtils.convertObjectToLong(input);
+ return val == null ? 0L : val;
+ }
+ };
+ break;
+
+ case FLOAT:
+ functions[i] = new Function()
+ {
+ @Override
+ public Comparable apply(@Nullable Comparable input)
+ {
+ final Float val = DimensionHandlerUtils.convertObjectToFloat(input);
+ return val == null ? 0.f : val;
+ }
+ };
+ break;
+
+ default:
+ throw new IAE("invalid type: [%s]", type);
+ }
+ }
+ return functions;
+ }
+
private static class RowBasedKeySerdeFactory implements Grouper.KeySerdeFactory
{
private final boolean includeTimestamp;
diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java
index addda75d464c..c5a4317d58f9 100644
--- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java
+++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java
@@ -23,11 +23,11 @@
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
+import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import io.druid.java.util.common.guava.CloseQuietly;
-import io.druid.java.util.common.logger.Logger;
import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
@@ -51,8 +51,6 @@
*/
public class SpillingGrouper implements Grouper
{
- private static final Logger log = new Logger(SpillingGrouper.class);
-
private final BufferGrouper grouper;
private final KeySerde keySerde;
private final LimitedTemporaryStorage temporaryStorage;
@@ -66,7 +64,7 @@ public class SpillingGrouper implements Grouper
private boolean spillingAllowed = false;
public SpillingGrouper(
- final ByteBuffer buffer,
+ final Supplier bufferSupplier,
final KeySerdeFactory keySerdeFactory,
final ColumnSelectorFactory columnSelectorFactory,
final AggregatorFactory[] aggregatorFactories,
@@ -81,7 +79,7 @@ public SpillingGrouper(
this.keySerde = keySerdeFactory.factorize();
this.keyObjComparator = keySerdeFactory.objectComparator();
this.grouper = new BufferGrouper<>(
- buffer,
+ bufferSupplier,
keySerde,
columnSelectorFactory,
aggregatorFactories,
@@ -95,6 +93,18 @@ public SpillingGrouper(
this.spillingAllowed = spillingAllowed;
}
+ @Override
+ public void init()
+ {
+ grouper.init();
+ }
+
+ @Override
+ public boolean isInitialized()
+ {
+ return grouper.isInitialized();
+ }
+
@Override
public boolean aggregate(KeyType key, int keyHash)
{
diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java
new file mode 100644
index 000000000000..5d04ab840fac
--- /dev/null
+++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.groupby;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.druid.collections.BlockingPool;
+import io.druid.collections.ReferenceCountingResourceHolder;
+import io.druid.collections.StupidPool;
+import io.druid.data.input.Row;
+import io.druid.granularity.QueryGranularities;
+import io.druid.query.DruidProcessingConfig;
+import io.druid.query.QueryContextKeys;
+import io.druid.query.QueryDataSource;
+import io.druid.query.QueryRunner;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.query.dimension.DefaultDimensionSpec;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.groupby.strategy.GroupByStrategySelector;
+import io.druid.query.groupby.strategy.GroupByStrategyV1;
+import io.druid.query.groupby.strategy.GroupByStrategyV2;
+import org.bouncycastle.util.Integers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class GroupByQueryMergeBufferTest
+{
+ private static class TestBlockingPool extends BlockingPool
+ {
+ private int minRemainBufferNum;
+
+ public TestBlockingPool(Supplier generator, int limit)
+ {
+ super(generator, limit);
+ minRemainBufferNum = limit;
+ }
+
+ @Override
+ public ReferenceCountingResourceHolder take(final long timeout) throws InterruptedException
+ {
+ final ReferenceCountingResourceHolder holder = super.take(timeout);
+ final int queueSize = getQueueSize();
+ if (minRemainBufferNum > queueSize) {
+ minRemainBufferNum = queueSize;
+ }
+ return holder;
+ }
+
+ public void resetMinRemainBufferNum()
+ {
+ minRemainBufferNum = PROCESSING_CONFIG.getNumMergeBuffers();
+ }
+
+ public int getMinRemainBufferNum()
+ {
+ return minRemainBufferNum;
+ }
+ }
+
+ public static final DruidProcessingConfig PROCESSING_CONFIG = new DruidProcessingConfig()
+ {
+
+ @Override
+ public String getFormatString()
+ {
+ return null;
+ }
+
+ @Override
+ public int intermediateComputeSizeBytes()
+ {
+ return 10 * 1024 * 1024;
+ }
+
+ @Override
+ public int getNumMergeBuffers()
+ {
+ return 3;
+ }
+
+ @Override
+ public int getNumThreads()
+ {
+ return 1;
+ }
+ };
+
+ private static GroupByQueryRunnerFactory makeQueryRunnerFactory(
+ final ObjectMapper mapper,
+ final GroupByQueryConfig config
+ )
+ {
+ final Supplier configSupplier = Suppliers.ofInstance(config);
+ final StupidPool bufferPool = new StupidPool<>(
+ "GroupByQueryEngine-bufferPool",
+ new Supplier()
+ {
+ @Override
+ public ByteBuffer get()
+ {
+ return ByteBuffer.allocateDirect(PROCESSING_CONFIG.intermediateComputeSizeBytes());
+ }
+ }
+ );
+ final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
+ configSupplier,
+ new GroupByStrategyV1(
+ configSupplier,
+ new GroupByQueryEngine(configSupplier, bufferPool),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER,
+ bufferPool
+ ),
+ new GroupByStrategyV2(
+ PROCESSING_CONFIG,
+ configSupplier,
+ bufferPool,
+ mergeBufferPool,
+ mapper,
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ )
+ );
+ final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(
+ configSupplier,
+ strategySelector,
+ bufferPool,
+ QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
+ );
+ return new GroupByQueryRunnerFactory(
+ strategySelector,
+ toolChest
+ );
+ }
+
+ private final static TestBlockingPool mergeBufferPool = new TestBlockingPool(
+ new Supplier()
+ {
+ @Override
+ public ByteBuffer get ()
+ {
+ return ByteBuffer.allocateDirect(PROCESSING_CONFIG.intermediateComputeSizeBytes());
+ }
+ },
+ PROCESSING_CONFIG.getNumMergeBuffers()
+ );
+
+ private static final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+ public String getDefaultStrategy()
+ {
+ return "v2";
+ }
+ }
+ );
+
+ private QueryRunner runner;
+
+ @Parameters(name = "{0}")
+ public static Collection