diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 8bd1b597c41a..3f86b120c0da 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -23,7 +23,12 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import io.druid.java.util.common.ISE; +import io.druid.query.AbstractPrioritizedCallable; +import io.druid.query.QueryInterruptedException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.groupby.orderby.DefaultLimitSpec; import io.druid.segment.ColumnSelectorFactory; @@ -33,7 +38,12 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * Grouper based around a set of underlying {@link SpillingGrouper} instances. Thread-safe. @@ -51,7 +61,6 @@ public class ConcurrentGrouper implements Grouper private final AtomicInteger threadNumber = new AtomicInteger(); private volatile boolean spilling = false; private volatile boolean closed = false; - private final Comparator> keyObjComparator; private final Supplier bufferSupplier; private final ColumnSelectorFactory columnSelectorFactory; @@ -65,6 +74,11 @@ public class ConcurrentGrouper implements Grouper private final KeySerdeFactory keySerdeFactory; private final DefaultLimitSpec limitSpec; private final boolean sortHasNonGroupingFields; + private final Comparator> keyObjComparator; + private final ListeningExecutorService grouperSorter; + private final int priority; + private final boolean hasQueryTimeout; + private final long queryTimeoutAt; private volatile boolean initialized = false; @@ -80,7 +94,11 @@ public ConcurrentGrouper( final ObjectMapper spillMapper, final int concurrencyHint, final DefaultLimitSpec limitSpec, - final boolean sortHasNonGroupingFields + final boolean sortHasNonGroupingFields, + final ListeningExecutorService grouperSorter, + final int priority, + final boolean hasQueryTimeout, + final long queryTimeoutAt ) { Preconditions.checkArgument(concurrencyHint > 0, "concurrencyHint > 0"); @@ -108,6 +126,10 @@ protected SpillingGrouper initialValue() this.limitSpec = limitSpec; this.sortHasNonGroupingFields = sortHasNonGroupingFields; this.keyObjComparator = keySerdeFactory.objectComparator(sortHasNonGroupingFields); + this.grouperSorter = Preconditions.checkNotNull(grouperSorter); + this.priority = priority; + this.hasQueryTimeout = hasQueryTimeout; + this.queryTimeoutAt = queryTimeoutAt; } @Override @@ -216,15 +238,58 @@ public Iterator> iterator(final boolean sorted) throw new ISE("Grouper is closed"); } - final List>> iterators = new ArrayList<>(groupers.size()); + return Groupers.mergeIterators( + sorted && isParallelSortAvailable() ? parallelSortAndGetGroupersIterator() : getGroupersIterator(sorted), + sorted ? keyObjComparator : null + ); + } - for (Grouper grouper : groupers) { - synchronized (grouper) { - iterators.add(grouper.iterator(sorted)); - } + private boolean isParallelSortAvailable() + { + return concurrencyHint > 1; + } + + private List>> parallelSortAndGetGroupersIterator() + { + // The number of groupers is same with the number of processing threads in grouperSorter + final ListenableFuture>>> future = Futures.allAsList( + groupers.stream() + .map(grouper -> + grouperSorter.submit( + new AbstractPrioritizedCallable>>(priority) + { + @Override + public Iterator> call() throws Exception + { + return grouper.iterator(true); + } + } + ) + ) + .collect(Collectors.toList()) + ); + + try { + final long timeout = queryTimeoutAt - System.currentTimeMillis(); + return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get(); + } + catch (InterruptedException | TimeoutException e) { + future.cancel(true); + throw new QueryInterruptedException(e); } + catch (CancellationException e) { + throw new QueryInterruptedException(e); + } + catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } + } - return Groupers.mergeIterators(iterators, sorted ? keyObjComparator : null); + private List>> getGroupersIterator(boolean sorted) + { + return groupers.stream() + .map(grouper -> grouper.iterator(sorted)) + .collect(Collectors.toList()); } @Override diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 594b22c6ea40..1d960e6cf666 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -194,7 +194,11 @@ public CloseableGrouperIterator make() concurrencyHint, temporaryStorage, spillMapper, - combiningAggregatorFactories + combiningAggregatorFactories, + exec, + priority, + hasTimeout, + timeoutAt ); final Grouper grouper = pair.lhs; final Accumulator accumulator = pair.rhs; diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 4f15ae440d7e..a3dccb1bc0b2 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -155,7 +155,6 @@ public ByteBuffer get() return mergeBufferHolder.get(); } }, - -1, temporaryStorage, spillMapper, aggregatorFactories diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java index 2ca544485eb2..7afd95fefe00 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java @@ -102,7 +102,8 @@ default ToIntFunction hashFunction() *

* Once this method is called, writes are no longer safe. After you are done with the iterator returned by this * method, you should either call {@link #close()} (if you are done with the Grouper), {@link #reset()} (if you - * want to reuse it), or {@link #iterator(boolean)} again if you want another iterator. + * want to reuse it), or {@link #iterator(boolean)} again if you want another iterator. This method is not thread-safe + * and must not be called by multiple threads concurrently. *

* If "sorted" is true then the iterator will return sorted results. It will use KeyType's natural ordering on * deserialized objects, and will use the {@link KeySerde#comparator()} on serialized objects. Woe be unto you diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 4fb958a3f60a..467ef8b6f0a1 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -33,6 +33,7 @@ import com.google.common.primitives.Floats; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.ListeningExecutorService; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.java.util.common.IAE; @@ -65,6 +66,7 @@ import io.druid.segment.data.IndexedInts; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.Closeable; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -78,6 +80,41 @@ // this class contains shared code between GroupByMergingQueryRunnerV2 and GroupByRowProcessor public class RowBasedGrouperHelper { + private static final int SINGLE_THREAD_CONCURRENCY_HINT = -1; + private static final int UNKNOWN_THREAD_PRIORITY = -1; + private static final long UNKNOWN_TIMEOUT = -1L; + + /** + * Create a single-threaded grouper and accumulator. + */ + public static Pair, Accumulator> createGrouperAccumulatorPair( + final GroupByQuery query, + final boolean isInputRaw, + final Map rawInputRowSignature, + final GroupByQueryConfig config, + final Supplier bufferSupplier, + final LimitedTemporaryStorage temporaryStorage, + final ObjectMapper spillMapper, + final AggregatorFactory[] aggregatorFactories + ) + { + return createGrouperAccumulatorPair( + query, + isInputRaw, + rawInputRowSignature, + config, + bufferSupplier, + SINGLE_THREAD_CONCURRENCY_HINT, + temporaryStorage, + spillMapper, + aggregatorFactories, + null, + UNKNOWN_THREAD_PRIORITY, + false, + UNKNOWN_TIMEOUT + ); + } + /** * If isInputRaw is true, transformations such as timestamp truncation and extraction functions have not * been applied to the input rows yet, for example, in a nested query, if an extraction function is being @@ -92,7 +129,11 @@ public static Pair, Accumulator> crea final int concurrencyHint, final LimitedTemporaryStorage temporaryStorage, final ObjectMapper spillMapper, - final AggregatorFactory[] aggregatorFactories + final AggregatorFactory[] aggregatorFactories, + @Nullable final ListeningExecutorService grouperSorter, + final int priority, + final boolean hasQueryTimeout, + final long queryTimeoutAt ) { // concurrencyHint >= 1 for concurrent groupers, -1 for single-threaded @@ -160,7 +201,11 @@ public static Pair, Accumulator> crea spillMapper, concurrencyHint, limitSpec, - sortHasNonGroupingFields + sortHasNonGroupingFields, + grouperSorter, + priority, + hasQueryTimeout, + queryTimeoutAt ); } diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java index 4a1e57f19b6e..d2162c702a9c 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java @@ -21,6 +21,8 @@ import com.google.common.base.Supplier; import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.concurrent.Execs; import io.druid.java.util.common.IAE; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -187,7 +189,7 @@ public DoubleColumnSelector makeDoubleColumnSelector(String columnName) } }; - @Test + @Test(timeout = 5000L) public void testAggregate() throws InterruptedException, ExecutionException { final ConcurrentGrouper grouper = new ConcurrentGrouper<>( @@ -202,7 +204,11 @@ public void testAggregate() throws InterruptedException, ExecutionException null, 8, null, - false + false, + MoreExecutors.listeningDecorator(Execs.multiThreaded(4, "concurrent-grouper-test-%d")), + 0, + false, + 0 ); Future[] futures = new Future[8];