diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index fdcf7f2356d4..99662946e1a3 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -19,6 +19,7 @@ package io.druid.collections; +import javax.annotation.Nullable; import java.util.List; public interface BlockingPool @@ -33,6 +34,7 @@ public interface BlockingPool * * @return a resource, or null if the timeout was reached */ + @Nullable ReferenceCountingResourceHolder take(long timeoutMs); /** @@ -49,16 +51,16 @@ public interface BlockingPool * @param elementNum number of resources to take * @param timeoutMs maximum time to wait for resources, in milliseconds. * - * @return a resource, or null if the timeout was reached + * @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available. */ - ReferenceCountingResourceHolder> takeBatch(int elementNum, long timeoutMs); + List> takeBatch(int elementNum, long timeoutMs); /** * Take resources from the pool, waiting if necessary until the elements of the given number become available. * * @param elementNum number of resources to take * - * @return a resource + * @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available. */ - ReferenceCountingResourceHolder> takeBatch(int elementNum); + List> takeBatch(int elementNum); } diff --git a/common/src/main/java/io/druid/collections/DefaultBlockingPool.java b/common/src/main/java/io/druid/collections/DefaultBlockingPool.java index 6b4512bfbb5e..e5efc70eacfa 100644 --- a/common/src/main/java/io/druid/collections/DefaultBlockingPool.java +++ b/common/src/main/java/io/druid/collections/DefaultBlockingPool.java @@ -22,16 +22,17 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; import io.druid.java.util.common.ISE; -import java.io.Closeable; +import javax.annotation.Nullable; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; /** * Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations. @@ -74,6 +75,7 @@ public int getPoolSize() } @Override + @Nullable public ReferenceCountingResourceHolder take(final long timeoutMs) { Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); @@ -82,7 +84,7 @@ public ReferenceCountingResourceHolder take(final long timeoutMs) return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject()); } catch (InterruptedException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -94,25 +96,20 @@ public ReferenceCountingResourceHolder take() return wrapObject(takeObject()); } catch (InterruptedException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } + @Nullable private ReferenceCountingResourceHolder wrapObject(T theObject) { return theObject == null ? null : new ReferenceCountingResourceHolder<>( theObject, - new Closeable() - { - @Override - public void close() - { - offer(theObject); - } - } + () -> offer(theObject) ); } + @Nullable private T pollObject() { final ReentrantLock lock = this.lock; @@ -125,6 +122,7 @@ private T pollObject() } } + @Nullable private T pollObject(long timeoutMs) throws InterruptedException { long nanos = TIME_UNIT.toNanos(timeoutMs); @@ -160,53 +158,39 @@ private T takeObject() throws InterruptedException } @Override - public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeoutMs) + public List> takeBatch(final int elementNum, final long timeoutMs) { Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); checkInitialized(); try { - return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum)); + final List objects = timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum); + return objects.stream().map(this::wrapObject).collect(Collectors.toList()); } catch (InterruptedException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @Override - public ReferenceCountingResourceHolder> takeBatch(final int elementNum) + public List> takeBatch(final int elementNum) { checkInitialized(); try { - return wrapObjects(takeObjects(elementNum)); + return takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList()); } catch (InterruptedException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } - private ReferenceCountingResourceHolder> wrapObjects(List theObjects) - { - return theObjects == null ? null : new ReferenceCountingResourceHolder<>( - theObjects, - new Closeable() - { - @Override - public void close() - { - offerBatch(theObjects); - } - } - ); - } - private List pollObjects(int elementNum) throws InterruptedException { - final List list = Lists.newArrayListWithCapacity(elementNum); + final List list = new ArrayList<>(elementNum); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { if (objects.size() < elementNum) { - return null; + return Collections.emptyList(); } else { for (int i = 0; i < elementNum; i++) { list.add(objects.pop()); @@ -222,13 +206,13 @@ private List pollObjects(int elementNum) throws InterruptedException private List pollObjects(int elementNum, long timeoutMs) throws InterruptedException { long nanos = TIME_UNIT.toNanos(timeoutMs); - final List list = Lists.newArrayListWithCapacity(elementNum); + final List list = new ArrayList<>(elementNum); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (objects.size() < elementNum) { if (nanos <= 0) { - return null; + return Collections.emptyList(); } nanos = notEnough.awaitNanos(nanos); } @@ -244,7 +228,7 @@ private List pollObjects(int elementNum, long timeoutMs) throws InterruptedEx private List takeObjects(int elementNum) throws InterruptedException { - final List list = Lists.newArrayListWithCapacity(elementNum); + final List list = new ArrayList<>(elementNum); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { @@ -282,23 +266,4 @@ private void offer(T theObject) lock.unlock(); } } - - private void offerBatch(List offers) - { - final ReentrantLock lock = this.lock; - lock.lock(); - try { - if (objects.size() + offers.size() <= maxSize) { - for (T offer : offers) { - objects.push(offer); - } - notEnough.signal(); - } else { - throw new ISE("Cannot exceed pre-configured maximum size"); - } - } - finally { - lock.unlock(); - } - } } diff --git a/common/src/main/java/io/druid/collections/DummyBlockingPool.java b/common/src/main/java/io/druid/collections/DummyBlockingPool.java index 2752e68a8ad2..e128d5713f7b 100644 --- a/common/src/main/java/io/druid/collections/DummyBlockingPool.java +++ b/common/src/main/java/io/druid/collections/DummyBlockingPool.java @@ -57,13 +57,13 @@ public ReferenceCountingResourceHolder take() } @Override - public ReferenceCountingResourceHolder> takeBatch(int elementNum, long timeoutMs) + public List> takeBatch(int elementNum, long timeoutMs) { throw new UnsupportedOperationException(); } @Override - public ReferenceCountingResourceHolder> takeBatch(int elementNum) + public List> takeBatch(int elementNum) { throw new UnsupportedOperationException(); } diff --git a/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java b/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java index 691281a14e4c..46a487d44a2a 100644 --- a/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java +++ b/common/src/main/java/io/druid/collections/ReferenceCountingResourceHolder.java @@ -52,7 +52,7 @@ public static long leakedResources() @SuppressWarnings("unused") private final Cleaner cleaner; - ReferenceCountingResourceHolder(final T object, final Closeable closer) + public ReferenceCountingResourceHolder(final T object, final Closeable closer) { this.object = object; this.closer = closer; @@ -64,6 +64,10 @@ public static ReferenceCountingResourceHolder fromClose return new ReferenceCountingResourceHolder<>(object, object); } + /** + * Returns the resource with an initial reference count of 1. More references can be added by + * calling {@link #increment()}. + */ @Override public T get() { @@ -73,6 +77,13 @@ public T get() return object; } + /** + * Increments the reference count by 1 and returns a {@link Releaser}. The returned {@link Releaser} is used to + * decrement the reference count when the caller no longer needs the resource. + * + * {@link Releaser}s are not thread-safe. If multiple threads need references to the same holder, they should + * each acquire their own {@link Releaser}. + */ public Releaser increment() { while (true) { @@ -103,6 +114,9 @@ public void close() }; } + /** + * Decrements the reference count by 1. If it reaches to 0, then closes {@link #closer}. + */ @Override public void close() { diff --git a/common/src/test/java/io/druid/collections/BlockingPoolTest.java b/common/src/test/java/io/druid/collections/BlockingPoolTest.java index 9b90a844e498..a6459a862f50 100644 --- a/common/src/test/java/io/druid/collections/BlockingPoolTest.java +++ b/common/src/test/java/io/druid/collections/BlockingPoolTest.java @@ -83,58 +83,51 @@ public void testTake() @Test(timeout = 1000) public void testTakeTimeout() { - final ReferenceCountingResourceHolder> batchHolder = POOL.takeBatch(10, 100L); + final List> batchHolder = POOL.takeBatch(10, 100L); final ReferenceCountingResourceHolder holder = POOL.take(100); assertNull(holder); - batchHolder.close(); + batchHolder.forEach(ReferenceCountingResourceHolder::close); } @Test(timeout = 1000) public void testTakeBatch() { - final ReferenceCountingResourceHolder> holder = POOL.takeBatch(6, 100L); + final List> holder = POOL.takeBatch(6, 100L); assertNotNull(holder); - assertEquals(6, holder.get().size()); + assertEquals(6, holder.size()); assertEquals(4, POOL.getPoolSize()); - holder.close(); + holder.forEach(ReferenceCountingResourceHolder::close); assertEquals(10, POOL.getPoolSize()); } @Test(timeout = 1000) public void testWaitAndTakeBatch() throws InterruptedException, ExecutionException { - ReferenceCountingResourceHolder> batchHolder = POOL.takeBatch(10, 10); + List> batchHolder = POOL.takeBatch(10, 10); assertNotNull(batchHolder); - assertEquals(10, batchHolder.get().size()); + assertEquals(10, batchHolder.size()); assertEquals(0, POOL.getPoolSize()); - final Future>> future = SERVICE.submit( - new Callable>>() - { - @Override - public ReferenceCountingResourceHolder> call() - { - return POOL.takeBatch(8, 100); - } - } + final Future>> future = SERVICE.submit( + () -> POOL.takeBatch(8, 100) ); Thread.sleep(20); - batchHolder.close(); + batchHolder.forEach(ReferenceCountingResourceHolder::close); batchHolder = future.get(); assertNotNull(batchHolder); - assertEquals(8, batchHolder.get().size()); + assertEquals(8, batchHolder.size()); assertEquals(2, POOL.getPoolSize()); - batchHolder.close(); + batchHolder.forEach(ReferenceCountingResourceHolder::close); assertEquals(10, POOL.getPoolSize()); } @Test(timeout = 1000) public void testTakeBatchTooManyObjects() { - final ReferenceCountingResourceHolder> holder = POOL.takeBatch(100, 100L); - assertNull(holder); + final List> holder = POOL.takeBatch(100, 100L); + assertTrue(holder.isEmpty()); } @Test(timeout = 1000) @@ -227,43 +220,27 @@ public void run() public void testConcurrentTakeBatch() throws ExecutionException, InterruptedException { final int batch1 = POOL.maxSize() / 2; - final Callable>> c1 = - new Callable>>() - { - @Override - public ReferenceCountingResourceHolder> call() - { - return POOL.takeBatch(batch1, 10); - } - }; + final Callable>> c1 = () -> POOL.takeBatch(batch1, 10); final int batch2 = POOL.maxSize() - batch1 + 1; - final Callable>> c2 = - new Callable>>() - { - @Override - public ReferenceCountingResourceHolder> call() - { - return POOL.takeBatch(batch2, 10); - } - }; + final Callable>> c2 = () -> POOL.takeBatch(batch2, 10); - final Future>> f1 = SERVICE.submit(c1); - final Future>> f2 = SERVICE.submit(c2); + final Future>> f1 = SERVICE.submit(c1); + final Future>> f2 = SERVICE.submit(c2); - final ReferenceCountingResourceHolder> r1 = f1.get(); - final ReferenceCountingResourceHolder> r2 = f2.get(); + final List> r1 = f1.get(); + final List> r2 = f2.get(); if (r1 != null) { - assertNull(r2); + assertTrue(r2.isEmpty()); assertEquals(POOL.maxSize() - batch1, POOL.getPoolSize()); - assertEquals(batch1, r1.get().size()); - r1.close(); + assertEquals(batch1, r1.size()); + r1.forEach(ReferenceCountingResourceHolder::close); } else { assertNotNull(r2); assertEquals(POOL.maxSize() - batch2, POOL.getPoolSize()); - assertEquals(batch2, r2.get().size()); - r2.close(); + assertEquals(batch2, r2.size()); + r2.forEach(ReferenceCountingResourceHolder::close); } assertEquals(POOL.maxSize(), POOL.getPoolSize()); @@ -273,37 +250,21 @@ public ReferenceCountingResourceHolder> call() public void testConcurrentBatchClose() throws ExecutionException, InterruptedException { final int batch1 = POOL.maxSize() / 2; - final Callable>> c1 = - new Callable>>() - { - @Override - public ReferenceCountingResourceHolder> call() - { - return POOL.takeBatch(batch1, 10); - } - }; + final Callable>> c1 = () -> POOL.takeBatch(batch1, 10); final int batch2 = POOL.maxSize() - batch1; - final Callable>> c2 = - new Callable>>() - { - @Override - public ReferenceCountingResourceHolder> call() - { - return POOL.takeBatch(batch2, 10); - } - }; + final Callable>> c2 = () -> POOL.takeBatch(batch2, 10); - final Future>> f1 = SERVICE.submit(c1); - final Future>> f2 = SERVICE.submit(c2); + final Future>> f1 = SERVICE.submit(c1); + final Future>> f2 = SERVICE.submit(c2); - final ReferenceCountingResourceHolder> r1 = f1.get(); - final ReferenceCountingResourceHolder> r2 = f2.get(); + final List> r1 = f1.get(); + final List> r2 = f2.get(); assertNotNull(r1); assertNotNull(r2); - assertEquals(batch1, r1.get().size()); - assertEquals(batch2, r2.get().size()); + assertEquals(batch1, r1.size()); + assertEquals(batch2, r2.size()); assertEquals(0, POOL.getPoolSize()); final Future future1 = SERVICE.submit(new Runnable() @@ -311,7 +272,7 @@ public ReferenceCountingResourceHolder> call() @Override public void run() { - r1.close(); + r1.forEach(ReferenceCountingResourceHolder::close); } }); final Future future2 = SERVICE.submit(new Runnable() @@ -319,7 +280,7 @@ public void run() @Override public void run() { - r2.close(); + r2.forEach(ReferenceCountingResourceHolder::close); } }); @@ -332,19 +293,11 @@ public void run() @Test(timeout = 1000) public void testConcurrentTakeBatchClose() throws ExecutionException, InterruptedException { - final ReferenceCountingResourceHolder> r1 = POOL.takeBatch(1, 10); + final List> r1 = POOL.takeBatch(1, 10); - final Callable>> c2 = - new Callable>>() - { - @Override - public ReferenceCountingResourceHolder> call() - { - return POOL.takeBatch(10, 100); - } - }; + final Callable>> c2 = () -> POOL.takeBatch(10, 100); - final Future>> f2 = SERVICE.submit(c2); + final Future>> f2 = SERVICE.submit(c2); final Future f1 = SERVICE.submit(new Runnable() { @Override @@ -356,17 +309,17 @@ public void run() catch (InterruptedException e) { // ignore } - r1.close(); + r1.forEach(ReferenceCountingResourceHolder::close); } }); - final ReferenceCountingResourceHolder> r2 = f2.get(); + final List> r2 = f2.get(); f1.get(); assertNotNull(r2); - assertEquals(10, r2.get().size()); + assertEquals(10, r2.size()); assertEquals(0, POOL.getPoolSize()); - r2.close(); + r2.forEach(ReferenceCountingResourceHolder::close); assertEquals(POOL.maxSize(), POOL.getPoolSize()); } } diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index 52dfddfd24d9..4048551f84fb 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -204,13 +204,30 @@ The default number of initial buckets is 1024 and the default max load factor of ##### Parallel combine -Once a historical finishes aggregation using the hash table, it sorts aggregates and merge them before sending to the broker for N-way merge aggregation in the broker. By default, historicals use all their available processing threads (configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting and merging aggregates which is an http thread to send data to brokers. - -This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take longer time than timeseries or topN queries, they should release processing threads as soon as possible. - -However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck of heavy groupBy queries is merging sorted aggregates. In such cases, you can use processing threads for it as well. This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled only when data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)). - -Once parallel combine is enabled, the groupBy v2 engine can create a combining tree for merging sorted aggregates. Each intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge aggregates from hash tables including spilled ones. Usually, leaf nodes are slower than intermediate nodes because they need to read data from disk. As a result, less threads are used for intermediate nodes by default. You can change the degree of intermeidate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). +Once a historical finishes aggregation using the hash table, it sorts the aggregated results and merges them before sending to the +broker for N-way merge aggregation in the broker. By default, historicals use all their available processing threads +(configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting and merging +aggregates which is an http thread to send data to brokers. + +This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared +between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available +processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take +longer time than timeseries or topN queries, they should release processing threads as soon as possible. + +However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck +of heavy groupBy queries is merging sorted aggregates. In such cases, you can use processing threads for it as well. +This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in +[Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled only when +data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)). + +Once parallel combine is enabled, the groupBy v2 engine can create a combining tree for merging sorted aggregates. Each +intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge +aggregates from hash tables including spilled ones. Usually, leaf nodes are slower than intermediate nodes because they +need to read data from disk. As a result, less threads are used for intermediate nodes by default. You can change the +degree of intermediate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations). + +Please note that each historical needs two merge buffers to process a groupBy v2 query with parallel combine: one for +computing intermediate aggregates from each segment and another for combining intermediate aggregates in parallel. #### Alternatives diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 52492fc2b69e..305f228a6828 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -27,7 +27,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import io.druid.collections.ResourceHolder; +import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.java.util.common.CloseableIterators; import io.druid.java.util.common.ISE; import io.druid.java.util.common.parsers.CloseableIterator; @@ -96,7 +96,7 @@ public class ConcurrentGrouper implements Grouper public ConcurrentGrouper( final GroupByQueryConfig groupByQueryConfig, final Supplier bufferSupplier, - final Supplier> combineBufferSupplier, + @Nullable final ReferenceCountingResourceHolder combineBufferHolder, final KeySerdeFactory keySerdeFactory, final KeySerdeFactory combineKeySerdeFactory, final ColumnSelectorFactory columnSelectorFactory, @@ -114,7 +114,7 @@ public ConcurrentGrouper( { this( bufferSupplier, - combineBufferSupplier, + combineBufferHolder, keySerdeFactory, combineKeySerdeFactory, columnSelectorFactory, @@ -138,7 +138,7 @@ public ConcurrentGrouper( ConcurrentGrouper( final Supplier bufferSupplier, - final Supplier> combineBufferSupplier, + @Nullable final ReferenceCountingResourceHolder combineBufferHolder, final KeySerdeFactory keySerdeFactory, final KeySerdeFactory combineKeySerdeFactory, final ColumnSelectorFactory columnSelectorFactory, @@ -191,7 +191,7 @@ public ConcurrentGrouper( if (numParallelCombineThreads > 1) { this.parallelCombiner = new ParallelCombiner<>( - combineBufferSupplier, + Preconditions.checkNotNull(combineBufferHolder, "combineBufferHolder"), getCombiningFactories(aggregatorFactories), combineKeySerdeFactory, executor, diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 71115147af91..95ac2e116463 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Predicates; -import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -34,10 +33,8 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.collections.BlockingPool; -import io.druid.collections.NonBlockingPool; import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.Releaser; -import io.druid.collections.ResourceHolder; import io.druid.data.input.Row; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; @@ -82,7 +79,6 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner private final ListeningExecutorService exec; private final QueryWatcher queryWatcher; private final int concurrencyHint; - private final NonBlockingPool processingBufferPool; private final BlockingPool mergeBufferPool; private final ObjectMapper spillMapper; private final String processingTmpDir; @@ -94,7 +90,6 @@ public GroupByMergingQueryRunnerV2( QueryWatcher queryWatcher, Iterable> queryables, int concurrencyHint, - NonBlockingPool processingBufferPool, BlockingPool mergeBufferPool, int mergeBufferSize, ObjectMapper spillMapper, @@ -106,7 +101,6 @@ public GroupByMergingQueryRunnerV2( this.queryWatcher = queryWatcher; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.concurrencyHint = concurrencyHint; - this.processingBufferPool = processingBufferPool; this.mergeBufferPool = mergeBufferPool; this.spillMapper = spillMapper; this.processingTmpDir = processingTmpDir; @@ -159,22 +153,6 @@ public Sequence run(final QueryPlus queryPlus, final Map> combineBufferSupplier = new Supplier>() - { - private boolean initialized; - private ResourceHolder buffer; - - @Override - public ResourceHolder get() - { - if (!initialized) { - buffer = processingBufferPool.take(); - initialized = true; - } - return buffer; - } - }; - return new BaseSequence<>( new BaseSequence.IteratorMaker>() { @@ -192,40 +170,39 @@ public CloseableGrouperIterator make() ReferenceCountingResourceHolder.fromCloseable(temporaryStorage); resources.add(temporaryStorageHolder); - final ReferenceCountingResourceHolder mergeBufferHolder; - try { - // This will potentially block if there are no merge buffers left in the pool. - if (hasTimeout) { - final long timeout = timeoutAt - System.currentTimeMillis(); - if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { - throw new TimeoutException(); - } - } else { - mergeBufferHolder = mergeBufferPool.take(); - } - resources.add(mergeBufferHolder); - } - catch (Exception e) { - throw new QueryInterruptedException(e); - } + // If parallelCombine is enabled, we need two merge buffers for parallel aggregating and parallel combining + final int numMergeBuffers = querySpecificConfig.getNumParallelCombineThreads() > 1 ? 2 : 1; - Pair, Accumulator> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( - query, - false, - null, - config, - Suppliers.ofInstance(mergeBufferHolder.get()), - combineBufferSupplier, - concurrencyHint, - temporaryStorage, - spillMapper, - combiningAggregatorFactories, - exec, - priority, + final List> mergeBufferHolders = getMergeBuffersHolder( + numMergeBuffers, hasTimeout, - timeoutAt, - mergeBufferSize + timeoutAt ); + resources.addAll(mergeBufferHolders); + + final ReferenceCountingResourceHolder mergeBufferHolder = mergeBufferHolders.get(0); + final ReferenceCountingResourceHolder combineBufferHolder = numMergeBuffers == 2 ? + mergeBufferHolders.get(1) : + null; + + Pair, Accumulator> pair = + RowBasedGrouperHelper.createGrouperAccumulatorPair( + query, + false, + null, + config, + Suppliers.ofInstance(mergeBufferHolder.get()), + combineBufferHolder, + concurrencyHint, + temporaryStorage, + spillMapper, + combiningAggregatorFactories, + exec, + priority, + hasTimeout, + timeoutAt, + mergeBufferSize + ); final Grouper grouper = pair.lhs; final Accumulator accumulator = pair.rhs; grouper.init(); @@ -256,7 +233,10 @@ public ListenableFuture apply(final QueryRunner input) public AggregateResult call() { try ( + // These variables are used to close releasers automatically. + @SuppressWarnings("unused") Releaser bufferReleaser = mergeBufferHolder.increment(); + @SuppressWarnings("unused") Releaser grouperReleaser = grouperHolder.increment() ) { final AggregateResult retVal = input.run(queryPlusForRunners, responseContext) @@ -332,6 +312,40 @@ public void cleanup(CloseableGrouperIterator iterFromMake) ); } + private List> getMergeBuffersHolder( + int numBuffers, + boolean hasTimeout, + long timeoutAt + ) + { + try { + if (numBuffers > mergeBufferPool.maxSize()) { + throw new ResourceLimitExceededException( + "Query needs " + numBuffers + " merge buffers, but only " + + mergeBufferPool.maxSize() + " merge buffers were configured. " + + "Try raising druid.processing.numMergeBuffers." + ); + } + final List> mergeBufferHolder; + // This will potentially block if there are no merge buffers left in the pool. + if (hasTimeout) { + final long timeout = timeoutAt - System.currentTimeMillis(); + if (timeout <= 0) { + throw new TimeoutException(); + } + if ((mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers, timeout)).isEmpty()) { + throw new TimeoutException("Cannot acquire enough merge buffers"); + } + } else { + mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers); + } + return mergeBufferHolder; + } + catch (Exception e) { + throw new QueryInterruptedException(e); + } + } + private void waitForFutureCompletion( GroupByQuery query, ListenableFuture> future, diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java index e8db98d31da1..0f2097534626 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java @@ -26,7 +26,8 @@ import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import io.druid.collections.ResourceHolder; +import io.druid.collections.ReferenceCountingResourceHolder; +import io.druid.collections.Releaser; import io.druid.java.util.common.CloseableIterators; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; @@ -84,7 +85,7 @@ public class ParallelCombiner // details. private static final int MINIMUM_LEAF_COMBINE_DEGREE = 2; - private final Supplier> combineBufferSupplier; + private final ReferenceCountingResourceHolder combineBufferHolder; private final AggregatorFactory[] combiningFactories; private final KeySerdeFactory combineKeySerdeFactory; private final ListeningExecutorService executor; @@ -98,7 +99,7 @@ public class ParallelCombiner private final int intermediateCombineDegree; public ParallelCombiner( - Supplier> combineBufferSupplier, + ReferenceCountingResourceHolder combineBufferHolder, AggregatorFactory[] combiningFactories, KeySerdeFactory combineKeySerdeFactory, ListeningExecutorService executor, @@ -109,7 +110,7 @@ public ParallelCombiner( int intermediateCombineDegree ) { - this.combineBufferSupplier = combineBufferSupplier; + this.combineBufferHolder = combineBufferHolder; this.combiningFactories = combiningFactories; this.combineKeySerdeFactory = combineKeySerdeFactory; this.executor = executor; @@ -137,9 +138,6 @@ public CloseableIterator> combine( { // CombineBuffer is initialized when this method is called and closed after the result iterator is done final Closer closer = Closer.create(); - final ResourceHolder combineBufferHolder = combineBufferSupplier.get(); - closer.register(combineBufferHolder); - try { final ByteBuffer combineBuffer = combineBufferHolder.get(); final int minimumRequiredBufferCapacity = StreamingMergeSortedGrouper.requiredBufferCapacity( @@ -172,6 +170,7 @@ public CloseableIterator> combine( final CloseableIterator> combineIterator = Iterables.getOnlyElement(combineIteratorAndFutures.lhs); final List combineFutures = combineIteratorAndFutures.rhs; + closer.register(() -> checkCombineFutures(combineFutures)); return CloseableIterators.wrap(combineIterator, closer); @@ -278,7 +277,7 @@ private Pair findLeafCombineDegreeAndNumBuffers( * * @return minimum number of buffers required for combining tree * - * @see #buildCombineTree(List, Supplier, AggregatorFactory[], int, List) + * @see #buildCombineTree */ private int computeRequiredBufferNum(int numChildNodes, int combineDegree) { @@ -405,7 +404,10 @@ public Void call() CloseableIterator> mergedIterator = CloseableIterators.mergeSorted( iterators, keyObjComparator - ) + ); + // This variable is used to close releaser automatically. + @SuppressWarnings("unused") + final Releaser releaser = combineBufferHolder.increment() ) { while (mergedIterator.hasNext()) { final Entry next = mergedIterator.next(); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index ad5f12b98fd6..722f7b4692dd 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -30,7 +30,7 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.ListeningExecutorService; -import io.druid.collections.ResourceHolder; +import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.common.utils.IntArrayUtils; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; @@ -137,7 +137,7 @@ public static Pair, Accumulator> crea final Map rawInputRowSignature, final GroupByQueryConfig config, final Supplier bufferSupplier, - final Supplier> combineBufferSupplier, + @Nullable final ReferenceCountingResourceHolder combineBufferHolder, final int concurrencyHint, final LimitedTemporaryStorage temporaryStorage, final ObjectMapper spillMapper, @@ -216,7 +216,7 @@ public static Pair, Accumulator> crea grouper = new ConcurrentGrouper<>( querySpecificConfig, bufferSupplier, - combineBufferSupplier, + combineBufferHolder, keySerdeFactory, combineKeySerdeFactory, columnSelectorFactory, diff --git a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java index 03fd7da0331d..5e41a15b62f5 100644 --- a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java +++ b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java @@ -19,6 +19,7 @@ package io.druid.query.groupby.resource; +import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.ResourceHolder; import io.druid.java.util.common.logger.Logger; @@ -36,19 +37,20 @@ public class GroupByQueryResource implements Closeable { private static final Logger log = new Logger(GroupByQueryResource.class); - private final ResourceHolder> mergeBuffersHolder; + private final List> mergeBufferHolders; private final Deque mergeBuffers; public GroupByQueryResource() { - this.mergeBuffersHolder = null; + this.mergeBufferHolders = null; this.mergeBuffers = new ArrayDeque<>(); } - public GroupByQueryResource(ResourceHolder> mergeBuffersHolder) + public GroupByQueryResource(List> mergeBufferHolders) { - this.mergeBuffersHolder = mergeBuffersHolder; - this.mergeBuffers = new ArrayDeque<>(mergeBuffersHolder.get()); + this.mergeBufferHolders = mergeBufferHolders; + this.mergeBuffers = new ArrayDeque<>(mergeBufferHolders.size()); + mergeBufferHolders.forEach(holder -> mergeBuffers.add(holder.get())); } /** @@ -81,11 +83,11 @@ public void close() @Override public void close() { - if (mergeBuffersHolder != null) { - if (mergeBuffers.size() != mergeBuffersHolder.get().size()) { - log.warn("%d resources are not returned yet", mergeBuffersHolder.get().size() - mergeBuffers.size()); + if (mergeBufferHolders != null) { + if (mergeBuffers.size() != mergeBufferHolders.size()) { + log.warn("%d resources are not returned yet", mergeBufferHolders.size() - mergeBuffers.size()); } - mergeBuffersHolder.close(); + mergeBufferHolders.forEach(ReferenceCountingResourceHolder::close); } } } diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 24bbfe7af01a..4cfae5606559 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -29,7 +29,7 @@ import com.google.inject.Inject; import io.druid.collections.BlockingPool; import io.druid.collections.NonBlockingPool; -import io.druid.collections.ResourceHolder; +import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.guice.annotations.Global; @@ -136,18 +136,18 @@ public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMerg if (requiredMergeBufferNum > mergeBufferPool.maxSize()) { throw new ResourceLimitExceededException( "Query needs " + requiredMergeBufferNum + " merge buffers, but only " - + mergeBufferPool.maxSize() + " merge buffers are configured" + + mergeBufferPool.maxSize() + " merge buffers were configured" ); } else if (requiredMergeBufferNum == 0) { return new GroupByQueryResource(); } else { - final ResourceHolder> mergeBufferHolders; + final List> mergeBufferHolders; if (QueryContexts.hasTimeout(query)) { mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum, QueryContexts.getTimeout(query)); } else { mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum); } - if (mergeBufferHolders == null) { + if (mergeBufferHolders.isEmpty()) { throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); } else { return new GroupByQueryResource(mergeBufferHolders); @@ -338,7 +338,6 @@ public QueryRunner mergeRunners( queryWatcher, queryRunners, processingConfig.getNumThreads(), - bufferPool, mergeBufferPool, processingConfig.intermediateComputeSizeBytes(), spillMapper, diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index 23f15b7e3204..4e80ed6baf49 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -82,9 +82,9 @@ public ReferenceCountingResourceHolder take(final long timeout) } @Override - public ReferenceCountingResourceHolder> takeBatch(final int maxElements, final long timeout) + public List> takeBatch(final int maxElements, final long timeout) { - final ReferenceCountingResourceHolder> holder = super.takeBatch(maxElements, timeout); + final List> holder = super.takeBatch(maxElements, timeout); final int poolSize = getPoolSize(); if (minRemainBufferNum > poolSize) { minRemainBufferNum = poolSize; diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java index 74d5b2f5d679..55c05ddec982 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -184,7 +184,8 @@ public GroupByQueryRunnerFailureTest(QueryRunner runner) public void testNotEnoughMergeBuffersOnQueryable() { expectedException.expect(QueryInterruptedException.class); - expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class)); + expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class)); + expectedException.expectMessage("Cannot acquire enough merge buffers"); final GroupByQuery query = GroupByQuery .builder() @@ -268,8 +269,15 @@ public void testInsufficientResourcesOnBroker() .setContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 500)) .build(); - try (ReferenceCountingResourceHolder> holder = mergeBufferPool.takeBatch(1, 10)) { + List> holder = null; + try { + holder = mergeBufferPool.takeBatch(1, 10); GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } + finally { + if (holder != null) { + holder.forEach(ReferenceCountingResourceHolder::close); + } + } } } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 42c0c2f00855..e35f8b7f2bb3 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -171,8 +171,9 @@ public int intermediateComputeSizeBytes() @Override public int getNumMergeBuffers() { - // There are some tests that need to allocate two buffers (simulating two levels of merging) - return 2; + // Some tests need two buffers for testing nested groupBy (simulating two levels of merging). + // Some tests need more buffers for parallel combine (testMergedPostAggHavingSpec). + return 4; } @Override diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java index 0d0137b2e5f3..ac6f450e8998 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java @@ -24,9 +24,8 @@ import com.google.common.collect.Lists; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.collections.ResourceHolder; +import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.jackson.DefaultObjectMapper; -import io.druid.java.util.common.IAE; import io.druid.java.util.common.parsers.CloseableIterator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -66,7 +65,6 @@ public class ConcurrentGrouperTest private static final ExecutorService SERVICE = Executors.newFixedThreadPool(8); private static final TestResourceHolder TEST_RESOURCE_HOLDER = new TestResourceHolder(256); private static final KeySerdeFactory KEY_SERDE_FACTORY = new TestKeySerdeFactory(); - private static final Supplier> COMBINE_BUFFER_SUPPLIER = new TestBufferSupplier(); private static final ColumnSelectorFactory NULL_FACTORY = new TestColumnSelectorFactory(); @Rule @@ -113,7 +111,7 @@ public void testAggregate() throws InterruptedException, ExecutionException, IOE { final ConcurrentGrouper grouper = new ConcurrentGrouper<>( bufferSupplier, - COMBINE_BUFFER_SUPPLIER, + TEST_RESOURCE_HOLDER, KEY_SERDE_FACTORY, KEY_SERDE_FACTORY, NULL_FACTORY, @@ -160,7 +158,7 @@ public void run() final List> actual = Lists.newArrayList(iterator); iterator.close(); - Assert.assertTrue(!TEST_RESOURCE_HOLDER.taken || TEST_RESOURCE_HOLDER.closed); + Assert.assertTrue(TEST_RESOURCE_HOLDER.taken); final List> expected = new ArrayList<>(); for (long i = 0; i < numRows; i++) { @@ -172,28 +170,20 @@ public void run() grouper.close(); } - static class TestResourceHolder implements ResourceHolder + static class TestResourceHolder extends ReferenceCountingResourceHolder { private boolean taken; - private boolean closed; - private ByteBuffer buffer; TestResourceHolder(int bufferSize) { - buffer = ByteBuffer.allocate(bufferSize); + super(ByteBuffer.allocate(bufferSize), () -> {}); } @Override public ByteBuffer get() { taken = true; - return buffer; - } - - @Override - public void close() - { - closed = true; + return super.get(); } } @@ -292,21 +282,6 @@ public int compare(Grouper.Entry o1, Grouper.Entry o2) } } - private static class TestBufferSupplier implements Supplier> - { - private final AtomicBoolean called = new AtomicBoolean(false); - - @Override - public ResourceHolder get() - { - if (called.compareAndSet(false, true)) { - return TEST_RESOURCE_HOLDER; - } else { - throw new IAE("should be called once"); - } - } - } - private static class TestColumnSelectorFactory implements ColumnSelectorFactory { @Override diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ParallelCombinerTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ParallelCombinerTest.java index 84e6df7b98c2..795133d4119b 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ParallelCombinerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ParallelCombinerTest.java @@ -19,10 +19,7 @@ package io.druid.query.groupby.epinephelinae; -import com.google.common.base.Supplier; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.collections.ResourceHolder; -import io.druid.java.util.common.IAE; import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.parsers.CloseableIterator; import io.druid.query.aggregation.AggregatorFactory; @@ -36,12 +33,10 @@ import org.junit.Test; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; public class ParallelCombinerTest { @@ -50,22 +45,6 @@ public class ParallelCombinerTest private static final TestResourceHolder TEST_RESOURCE_HOLDER = new TestResourceHolder(512); private static final KeySerdeFactory KEY_SERDE_FACTORY = new TestKeySerdeFactory(); - private static final Supplier> COMBINE_BUFFER_SUPPLIER = - new Supplier>() - { - private final AtomicBoolean called = new AtomicBoolean(false); - - @Override - public ResourceHolder get() - { - if (called.compareAndSet(false, true)) { - return TEST_RESOURCE_HOLDER; - } else { - throw new IAE("should be called once"); - } - } - }; - private static final class TestIterator implements CloseableIterator> { private final Iterator> innerIterator; @@ -112,7 +91,7 @@ public static void teardown() public void testCombine() throws IOException { final ParallelCombiner combiner = new ParallelCombiner<>( - COMBINE_BUFFER_SUPPLIER, + TEST_RESOURCE_HOLDER, new AggregatorFactory[]{new CountAggregatorFactory("cnt").getCombiningFactory()}, KEY_SERDE_FACTORY, MoreExecutors.listeningDecorator(SERVICE),