From c8967bf10d274baae389d278eb8d031011dc22aa Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 7 Aug 2017 15:39:35 +0900 Subject: [PATCH 1/2] Multi-thread sort --- .../epinephelinae/ConcurrentGrouper.java | 68 ++++++++++++++++--- .../GroupByMergingQueryRunnerV2.java | 4 +- .../epinephelinae/GroupByRowProcessor.java | 4 +- .../epinephelinae/RowBasedGrouperHelper.java | 10 ++- .../epinephelinae/ConcurrentGrouperTest.java | 6 +- 5 files changed, 79 insertions(+), 13 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 8bd1b597c41a..98ab7fad9804 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -23,17 +23,24 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import io.druid.java.util.common.ISE; +import io.druid.query.AbstractPrioritizedCallable; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.groupby.orderby.DefaultLimitSpec; import io.druid.segment.ColumnSelectorFactory; +import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * Grouper based around a set of underlying {@link SpillingGrouper} instances. Thread-safe. @@ -51,7 +58,6 @@ public class ConcurrentGrouper implements Grouper private final AtomicInteger threadNumber = new AtomicInteger(); private volatile boolean spilling = false; private volatile boolean closed = false; - private final Comparator> keyObjComparator; private final Supplier bufferSupplier; private final ColumnSelectorFactory columnSelectorFactory; @@ -65,6 +71,10 @@ public class ConcurrentGrouper implements Grouper private final KeySerdeFactory keySerdeFactory; private final DefaultLimitSpec limitSpec; private final boolean sortHasNonGroupingFields; + private final Comparator> keyObjComparator; + @Nullable + private final ListeningExecutorService grouperSorter; + private final int priority; private volatile boolean initialized = false; @@ -80,7 +90,9 @@ public ConcurrentGrouper( final ObjectMapper spillMapper, final int concurrencyHint, final DefaultLimitSpec limitSpec, - final boolean sortHasNonGroupingFields + final boolean sortHasNonGroupingFields, + @Nullable final ListeningExecutorService grouperSorter, + final int priority ) { Preconditions.checkArgument(concurrencyHint > 0, "concurrencyHint > 0"); @@ -108,6 +120,8 @@ protected SpillingGrouper initialValue() this.limitSpec = limitSpec; this.sortHasNonGroupingFields = sortHasNonGroupingFields; this.keyObjComparator = keySerdeFactory.objectComparator(sortHasNonGroupingFields); + this.grouperSorter = grouperSorter; + this.priority = priority; } @Override @@ -216,15 +230,53 @@ public Iterator> iterator(final boolean sorted) throw new ISE("Grouper is closed"); } - final List>> iterators = new ArrayList<>(groupers.size()); + return Groupers.mergeIterators( + sorted && isParallelSortAvailable() ? parallelSortAndGetGroupersIterator() : getGroupersIterator(sorted), + sorted ? keyObjComparator : null + ); + } - for (Grouper grouper : groupers) { - synchronized (grouper) { - iterators.add(grouper.iterator(sorted)); - } + private boolean isParallelSortAvailable() + { + return grouperSorter != null && concurrencyHint > 1; + } + + private List>> parallelSortAndGetGroupersIterator() + { + // The number of groupers is same with the number of processing threads in grouperSorter + final List>>> futures = groupers + .stream() + .map(grouper -> + grouperSorter.submit( + new AbstractPrioritizedCallable>>(priority) + { + @Override + public Iterator> call() throws Exception + { + synchronized (grouper) { + return grouper.iterator(true); + } + } + } + ) + ).collect(Collectors.toList()); + + try { + return Futures.allAsList(futures).get(); } + catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } - return Groupers.mergeIterators(iterators, sorted ? keyObjComparator : null); + private List>> getGroupersIterator(boolean sorted) + { + return groupers.stream() + .map(grouper -> { + synchronized (grouper) { + return grouper.iterator(sorted); + } + }).collect(Collectors.toList()); } @Override diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 594b22c6ea40..60e1bd6e018c 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -194,7 +194,9 @@ public CloseableGrouperIterator make() concurrencyHint, temporaryStorage, spillMapper, - combiningAggregatorFactories + combiningAggregatorFactories, + exec, + priority ); final Grouper grouper = pair.lhs; final Accumulator accumulator = pair.rhs; diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 4f15ae440d7e..bb175a6b7da7 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -158,7 +158,9 @@ public ByteBuffer get() -1, temporaryStorage, spillMapper, - aggregatorFactories + aggregatorFactories, + null, + -1 // priority doesn't matter if grouperMerger is null ); final Grouper grouper = pair.lhs; final Accumulator accumulator = pair.rhs; diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 4fb958a3f60a..bf30e14ecf67 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -33,6 +33,7 @@ import com.google.common.primitives.Floats; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.ListeningExecutorService; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.java.util.common.IAE; @@ -65,6 +66,7 @@ import io.druid.segment.data.IndexedInts; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.Closeable; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -92,7 +94,9 @@ public static Pair, Accumulator> crea final int concurrencyHint, final LimitedTemporaryStorage temporaryStorage, final ObjectMapper spillMapper, - final AggregatorFactory[] aggregatorFactories + final AggregatorFactory[] aggregatorFactories, + @Nullable final ListeningExecutorService grouperSorter, + final int priority ) { // concurrencyHint >= 1 for concurrent groupers, -1 for single-threaded @@ -160,7 +164,9 @@ public static Pair, Accumulator> crea spillMapper, concurrencyHint, limitSpec, - sortHasNonGroupingFields + sortHasNonGroupingFields, + grouperSorter, + priority ); } diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java index 4a1e57f19b6e..734cf26ab81c 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java @@ -21,6 +21,8 @@ import com.google.common.base.Supplier; import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.concurrent.Execs; import io.druid.java.util.common.IAE; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -202,7 +204,9 @@ public void testAggregate() throws InterruptedException, ExecutionException null, 8, null, - false + false, + MoreExecutors.listeningDecorator(Execs.multiThreaded(4, "concurrent-grouper-test-%d")), + 0 ); Future[] futures = new Future[8]; From 037a2dcd4bb6200a9785005940274a545a694733 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 9 Aug 2017 11:48:46 +0900 Subject: [PATCH 2/2] Address comments --- .../epinephelinae/ConcurrentGrouper.java | 73 +++++++++++-------- .../GroupByMergingQueryRunnerV2.java | 4 +- .../epinephelinae/GroupByRowProcessor.java | 5 +- .../query/groupby/epinephelinae/Grouper.java | 3 +- .../epinephelinae/RowBasedGrouperHelper.java | 43 ++++++++++- .../epinephelinae/ConcurrentGrouperTest.java | 4 +- 6 files changed, 93 insertions(+), 39 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 98ab7fad9804..3f86b120c0da 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -28,17 +28,20 @@ import com.google.common.util.concurrent.ListeningExecutorService; import io.druid.java.util.common.ISE; import io.druid.query.AbstractPrioritizedCallable; +import io.druid.query.QueryInterruptedException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.groupby.orderby.DefaultLimitSpec; import io.druid.segment.ColumnSelectorFactory; -import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -72,9 +75,10 @@ public class ConcurrentGrouper implements Grouper private final DefaultLimitSpec limitSpec; private final boolean sortHasNonGroupingFields; private final Comparator> keyObjComparator; - @Nullable private final ListeningExecutorService grouperSorter; private final int priority; + private final boolean hasQueryTimeout; + private final long queryTimeoutAt; private volatile boolean initialized = false; @@ -91,8 +95,10 @@ public ConcurrentGrouper( final int concurrencyHint, final DefaultLimitSpec limitSpec, final boolean sortHasNonGroupingFields, - @Nullable final ListeningExecutorService grouperSorter, - final int priority + final ListeningExecutorService grouperSorter, + final int priority, + final boolean hasQueryTimeout, + final long queryTimeoutAt ) { Preconditions.checkArgument(concurrencyHint > 0, "concurrencyHint > 0"); @@ -120,8 +126,10 @@ protected SpillingGrouper initialValue() this.limitSpec = limitSpec; this.sortHasNonGroupingFields = sortHasNonGroupingFields; this.keyObjComparator = keySerdeFactory.objectComparator(sortHasNonGroupingFields); - this.grouperSorter = grouperSorter; + this.grouperSorter = Preconditions.checkNotNull(grouperSorter); this.priority = priority; + this.hasQueryTimeout = hasQueryTimeout; + this.queryTimeoutAt = queryTimeoutAt; } @Override @@ -238,45 +246,50 @@ sorted && isParallelSortAvailable() ? parallelSortAndGetGroupersIterator() : get private boolean isParallelSortAvailable() { - return grouperSorter != null && concurrencyHint > 1; + return concurrencyHint > 1; } private List>> parallelSortAndGetGroupersIterator() { // The number of groupers is same with the number of processing threads in grouperSorter - final List>>> futures = groupers - .stream() - .map(grouper -> - grouperSorter.submit( - new AbstractPrioritizedCallable>>(priority) - { - @Override - public Iterator> call() throws Exception - { - synchronized (grouper) { - return grouper.iterator(true); - } - } - } - ) - ).collect(Collectors.toList()); + final ListenableFuture>>> future = Futures.allAsList( + groupers.stream() + .map(grouper -> + grouperSorter.submit( + new AbstractPrioritizedCallable>>(priority) + { + @Override + public Iterator> call() throws Exception + { + return grouper.iterator(true); + } + } + ) + ) + .collect(Collectors.toList()) + ); try { - return Futures.allAsList(futures).get(); + final long timeout = queryTimeoutAt - System.currentTimeMillis(); + return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get(); + } + catch (InterruptedException | TimeoutException e) { + future.cancel(true); + throw new QueryInterruptedException(e); + } + catch (CancellationException e) { + throw new QueryInterruptedException(e); } - catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); + catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); } } private List>> getGroupersIterator(boolean sorted) { return groupers.stream() - .map(grouper -> { - synchronized (grouper) { - return grouper.iterator(sorted); - } - }).collect(Collectors.toList()); + .map(grouper -> grouper.iterator(sorted)) + .collect(Collectors.toList()); } @Override diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 60e1bd6e018c..1d960e6cf666 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -196,7 +196,9 @@ public CloseableGrouperIterator make() spillMapper, combiningAggregatorFactories, exec, - priority + priority, + hasTimeout, + timeoutAt ); final Grouper grouper = pair.lhs; final Accumulator accumulator = pair.rhs; diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index bb175a6b7da7..a3dccb1bc0b2 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -155,12 +155,9 @@ public ByteBuffer get() return mergeBufferHolder.get(); } }, - -1, temporaryStorage, spillMapper, - aggregatorFactories, - null, - -1 // priority doesn't matter if grouperMerger is null + aggregatorFactories ); final Grouper grouper = pair.lhs; final Accumulator accumulator = pair.rhs; diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java index 2ca544485eb2..7afd95fefe00 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/Grouper.java @@ -102,7 +102,8 @@ default ToIntFunction hashFunction() *

* Once this method is called, writes are no longer safe. After you are done with the iterator returned by this * method, you should either call {@link #close()} (if you are done with the Grouper), {@link #reset()} (if you - * want to reuse it), or {@link #iterator(boolean)} again if you want another iterator. + * want to reuse it), or {@link #iterator(boolean)} again if you want another iterator. This method is not thread-safe + * and must not be called by multiple threads concurrently. *

* If "sorted" is true then the iterator will return sorted results. It will use KeyType's natural ordering on * deserialized objects, and will use the {@link KeySerde#comparator()} on serialized objects. Woe be unto you diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index bf30e14ecf67..467ef8b6f0a1 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -80,6 +80,41 @@ // this class contains shared code between GroupByMergingQueryRunnerV2 and GroupByRowProcessor public class RowBasedGrouperHelper { + private static final int SINGLE_THREAD_CONCURRENCY_HINT = -1; + private static final int UNKNOWN_THREAD_PRIORITY = -1; + private static final long UNKNOWN_TIMEOUT = -1L; + + /** + * Create a single-threaded grouper and accumulator. + */ + public static Pair, Accumulator> createGrouperAccumulatorPair( + final GroupByQuery query, + final boolean isInputRaw, + final Map rawInputRowSignature, + final GroupByQueryConfig config, + final Supplier bufferSupplier, + final LimitedTemporaryStorage temporaryStorage, + final ObjectMapper spillMapper, + final AggregatorFactory[] aggregatorFactories + ) + { + return createGrouperAccumulatorPair( + query, + isInputRaw, + rawInputRowSignature, + config, + bufferSupplier, + SINGLE_THREAD_CONCURRENCY_HINT, + temporaryStorage, + spillMapper, + aggregatorFactories, + null, + UNKNOWN_THREAD_PRIORITY, + false, + UNKNOWN_TIMEOUT + ); + } + /** * If isInputRaw is true, transformations such as timestamp truncation and extraction functions have not * been applied to the input rows yet, for example, in a nested query, if an extraction function is being @@ -96,7 +131,9 @@ public static Pair, Accumulator> crea final ObjectMapper spillMapper, final AggregatorFactory[] aggregatorFactories, @Nullable final ListeningExecutorService grouperSorter, - final int priority + final int priority, + final boolean hasQueryTimeout, + final long queryTimeoutAt ) { // concurrencyHint >= 1 for concurrent groupers, -1 for single-threaded @@ -166,7 +203,9 @@ public static Pair, Accumulator> crea limitSpec, sortHasNonGroupingFields, grouperSorter, - priority + priority, + hasQueryTimeout, + queryTimeoutAt ); } diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java index 734cf26ab81c..d2162c702a9c 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java @@ -189,7 +189,7 @@ public DoubleColumnSelector makeDoubleColumnSelector(String columnName) } }; - @Test + @Test(timeout = 5000L) public void testAggregate() throws InterruptedException, ExecutionException { final ConcurrentGrouper grouper = new ConcurrentGrouper<>( @@ -206,6 +206,8 @@ public void testAggregate() throws InterruptedException, ExecutionException null, false, MoreExecutors.listeningDecorator(Execs.multiThreaded(4, "concurrent-grouper-test-%d")), + 0, + false, 0 );