From a028bf0d13cf3228cc6804c1d7dd3b67ad8dfabc Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 15 Feb 2017 19:13:51 +0900 Subject: [PATCH 1/9] Atomic merge buffer acquisition for groupBys --- .../GroupByTypeInterfaceBenchmark.java | 4 +- .../benchmark/query/GroupByBenchmark.java | 4 +- .../io/druid/collections/BlockingPool.java | 58 +++++++++-- .../druid/common/guava/CombiningSequence.java | 2 +- .../druid/collections/BlockingPoolTest.java | 97 +++++++++++++++++++ .../common/guava/CombiningSequenceTest.java | 2 +- .../groupby/GroupByQueryQueryToolChest.java | 41 +++++--- .../epinephelinae/GroupByRowProcessor.java | 44 +++------ .../resource/GroupByQueryBrokerResource.java | 76 +++++++++++++++ ...GroupByQueryBrokerResourceInitializer.java | 91 +++++++++++++++++ .../groupby/strategy/GroupByStrategy.java | 2 + .../strategy/GroupByStrategySelector.java | 12 ++- .../groupby/strategy/GroupByStrategyV1.java | 3 +- .../groupby/strategy/GroupByStrategyV2.java | 8 +- .../groupby/GroupByQueryMergeBufferTest.java | 31 ++++-- .../GroupByQueryRunnerFailureTest.java | 41 +++++++- .../query/groupby/GroupByQueryRunnerTest.java | 14 +-- .../druid/sql/calcite/util/CalciteTests.java | 5 +- 18 files changed, 459 insertions(+), 76 deletions(-) create mode 100644 common/src/test/java/io/druid/collections/BlockingPoolTest.java create mode 100644 processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java create mode 100644 processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResourceInitializer.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java index 6780e2404b7d..49de3725c349 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -61,6 +61,7 @@ import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryQueryToolChest; import io.druid.query.groupby.GroupByQueryRunnerFactory; +import io.druid.query.groupby.resource.GroupByQueryBrokerResourceInitializer; import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; @@ -425,9 +426,8 @@ public String getFormatString() factory = new GroupByQueryRunnerFactory( strategySelector, new GroupByQueryQueryToolChest( - configSupplier, strategySelector, - bufferPool, + new GroupByQueryBrokerResourceInitializer(strategySelector, mergePool), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator() ) ); diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index 03d5b62b4560..1bcdde4e125b 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -61,6 +61,7 @@ import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryQueryToolChest; import io.druid.query.groupby.GroupByQueryRunnerFactory; +import io.druid.query.groupby.resource.GroupByQueryBrokerResourceInitializer; import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; @@ -468,9 +469,8 @@ public String getFormatString() factory = new GroupByQueryRunnerFactory( strategySelector, new GroupByQueryQueryToolChest( - configSupplier, strategySelector, - bufferPool, + new GroupByQueryBrokerResourceInitializer(strategySelector, mergePool), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator() ) ); diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index 81cedc4e1da8..a3192cdd7468 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -22,11 +22,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; - +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import io.druid.java.util.common.logger.Logger; import java.io.Closeable; import java.io.IOException; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -52,6 +54,12 @@ public BlockingPool( } } + @VisibleForTesting + public int getPoolSize() + { + return objects.size(); + } + /** * Take a resource from the pool. * @@ -63,7 +71,7 @@ public BlockingPool( */ public ReferenceCountingResourceHolder take(final long timeout) throws InterruptedException { - Preconditions.checkState(objects != null, "Pool was initialized with limit = 0, there are no objects to take."); + checkInitialized(); final T theObject = timeout >= 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.take(); return theObject == null ? null : new ReferenceCountingResourceHolder<>( theObject, @@ -72,17 +80,53 @@ public ReferenceCountingResourceHolder take(final long timeout) throws Interr @Override public void close() throws IOException { - if (!objects.offer(theObject)) { - log.error("WTF?! Queue offer failed, uh oh..."); + offer(theObject); + } + } + ); + } + + /** + * Drains at most the given number of available resources from the pool. + * + * @param maxElements the maximum number of elements to drain + * + * @return a resource holder which contains the drained resources + */ + public ReferenceCountingResourceHolder> drain(final int maxElements) + { + checkInitialized(); + final List batch = Lists.newArrayListWithCapacity(maxElements); + final int n = objects.drainTo(batch, maxElements); + if (log.isDebugEnabled()) { + log.debug("Requested " + maxElements + " elements, but drained " + n + " elements"); + } + + final List resources = ImmutableList.copyOf(batch); + return new ReferenceCountingResourceHolder<>( + resources, + new Closeable() + { + @Override + public void close() throws IOException + { + for (T obj : resources) { + offer(obj); } } } ); } - @VisibleForTesting - protected int getQueueSize() + private void checkInitialized() { - return objects.size(); + Preconditions.checkState(objects != null, "Pool was initialized with limit = 0, there are no objects to take."); + } + + private void offer(T theObject) + { + if (!objects.offer(theObject)) { + log.error("WTF?! Queue offer failed, uh oh..."); + } } } diff --git a/common/src/main/java/io/druid/common/guava/CombiningSequence.java b/common/src/main/java/io/druid/common/guava/CombiningSequence.java index e3ccc40453a4..6abe7ce02035 100644 --- a/common/src/main/java/io/druid/common/guava/CombiningSequence.java +++ b/common/src/main/java/io/druid/common/guava/CombiningSequence.java @@ -46,7 +46,7 @@ public static CombiningSequence create( private final Ordering ordering; private final BinaryFn mergeFn; - public CombiningSequence( + private CombiningSequence( Sequence baseSequence, Ordering ordering, BinaryFn mergeFn diff --git a/common/src/test/java/io/druid/collections/BlockingPoolTest.java b/common/src/test/java/io/druid/collections/BlockingPoolTest.java new file mode 100644 index 000000000000..93b45c247a5e --- /dev/null +++ b/common/src/test/java/io/druid/collections/BlockingPoolTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.collections; + +import com.google.common.base.Suppliers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class BlockingPoolTest +{ + private static final BlockingPool POOL = new BlockingPool<>(Suppliers.ofInstance(1), 10); + private static final BlockingPool EMPTY_POOL = new BlockingPool<>(Suppliers.ofInstance(1), 0); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testTakeFromEmptyPool() throws InterruptedException + { + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take."); + EMPTY_POOL.take(0); + } + + @Test + public void testDrainFromEmptyPool() + { + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take."); + EMPTY_POOL.drain(1); + } + + @Test(timeout = 1000) + public void testTake() throws InterruptedException + { + final ReferenceCountingResourceHolder holder = POOL.take(100); + assertNotNull(holder); + assertEquals(9, POOL.getPoolSize()); + holder.close(); + assertEquals(10, POOL.getPoolSize()); + } + + @Test(timeout = 1000) + public void testTakeTimeout() throws InterruptedException + { + final ReferenceCountingResourceHolder> batchHolder = POOL.drain(10); + final ReferenceCountingResourceHolder holder = POOL.take(100); + assertNull(holder); + batchHolder.close(); + } + + @Test + public void testDrain() + { + final ReferenceCountingResourceHolder> holder = POOL.drain(6); + assertNotNull(holder); + assertEquals(6, holder.get().size()); + assertEquals(4, POOL.getPoolSize()); + holder.close(); + assertEquals(10, POOL.getPoolSize()); + } + + @Test + public void testDrainTooManyObjects() + { + final ReferenceCountingResourceHolder> holder = POOL.drain(100); + assertNotNull(holder); + assertEquals(10, holder.get().size()); + assertEquals(0, POOL.getPoolSize()); + holder.close(); + assertEquals(10, POOL.getPoolSize()); + } +} diff --git a/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java b/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java index 040bac1739cb..b1f933e0df35 100644 --- a/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java +++ b/common/src/test/java/io/druid/common/guava/CombiningSequenceTest.java @@ -228,7 +228,7 @@ public void close() throws IOException }; Sequence> seq = Sequences.limit( - new CombiningSequence<>( + CombiningSequence.create( Sequences.withBaggage(Sequences.simple(pairs), closeable), Ordering.natural().onResultOf(Pair.lhsFn()), new BinaryFn, Pair, Pair>() diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 57979634becb..2af057ef0ea1 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -23,7 +23,6 @@ import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Predicate; -import com.google.common.base.Supplier; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -32,14 +31,13 @@ import com.google.common.collect.Maps; import com.google.inject.Inject; import com.metamx.emitter.service.ServiceMetricEvent; -import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; -import io.druid.guice.annotations.Global; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.MappedSequence; import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; import io.druid.query.BaseQuery; import io.druid.query.CacheStrategy; import io.druid.query.DataSource; @@ -57,11 +55,12 @@ import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; +import io.druid.query.groupby.resource.GroupByQueryBrokerResource; +import io.druid.query.groupby.resource.GroupByQueryBrokerResourceInitializer; import io.druid.query.groupby.strategy.GroupByStrategySelector; import org.joda.time.DateTime; import javax.annotation.Nullable; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -84,22 +83,19 @@ public class GroupByQueryQueryToolChest extends QueryToolChest configSupplier; private final GroupByStrategySelector strategySelector; - private final StupidPool bufferPool; + private final GroupByQueryBrokerResourceInitializer brokerResourceInitializer; private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator; @Inject public GroupByQueryQueryToolChest( - Supplier configSupplier, GroupByStrategySelector strategySelector, - @Global StupidPool bufferPool, + GroupByQueryBrokerResourceInitializer brokerResourceInitializer, IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator ) { - this.configSupplier = configSupplier; this.strategySelector = strategySelector; - this.bufferPool = bufferPool; + this.brokerResourceInitializer = brokerResourceInitializer; this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator; } @@ -116,7 +112,7 @@ public Sequence run(Query query, Map responseContext) } if (query.getContextBoolean(GROUP_BY_MERGE_KEY, true)) { - return mergeGroupByResults( + return initAndMergeGroupByResults( (GroupByQuery) query, runner, responseContext @@ -127,8 +123,28 @@ public Sequence run(Query query, Map responseContext) }; } + private Sequence initAndMergeGroupByResults( + final GroupByQuery query, + QueryRunner runner, + Map context + ) + { + final GroupByQueryBrokerResource resource = brokerResourceInitializer.prepare(query); + + return Sequences.withBaggage( + mergeGroupByResults( + query, + resource, + runner, + context + ), + resource + ); + } + private Sequence mergeGroupByResults( final GroupByQuery query, + GroupByQueryBrokerResource brokerResource, QueryRunner runner, Map context ) @@ -169,6 +185,7 @@ private Sequence mergeGroupByResults( false ) ), + brokerResource, runner, context ); @@ -186,7 +203,7 @@ private Sequence mergeGroupByResults( finalizingResults = subqueryResult; } - return strategySelector.strategize(query).processSubqueryResult(subquery, query, finalizingResults); + return strategySelector.strategize(query).processSubqueryResult(subquery, query, brokerResource, finalizingResults); } else { return strategySelector.strategize(query).mergeResults(runner, query, context); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 1736837d3e6c..9ac1d4fdfa34 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -23,10 +23,8 @@ import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.Lists; -import io.druid.collections.BlockingPool; -import io.druid.collections.ReferenceCountingResourceHolder; +import io.druid.collections.ResourceHolder; import io.druid.common.guava.SettableSupplier; -import io.druid.common.utils.JodaUtils; import io.druid.data.input.Row; import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.Accumulator; @@ -35,8 +33,6 @@ import io.druid.java.util.common.guava.FilteredSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.query.Query; -import io.druid.query.QueryContextKeys; -import io.druid.query.QueryInterruptedException; import io.druid.query.ResourceLimitExceededException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.filter.Filter; @@ -45,6 +41,7 @@ import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.RowBasedColumnSelectorFactory; import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; +import io.druid.query.groupby.resource.GroupByQueryBrokerResource; import io.druid.segment.column.ValueType; import io.druid.segment.filter.BooleanValueMatcher; import io.druid.segment.filter.Filters; @@ -58,7 +55,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeoutException; public class GroupByRowProcessor { @@ -67,7 +63,7 @@ public static Sequence process( final Sequence rows, final Map rowSignature, final GroupByQueryConfig config, - final BlockingPool mergeBufferPool, + final GroupByQueryBrokerResource brokerResource, final ObjectMapper spillMapper, final String processingTmpDir ) @@ -86,8 +82,6 @@ public static Sequence process( String.format("druid-groupBy-%s_%s", UUID.randomUUID(), query.getId()) ); - final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); - final long timeout = queryTimeout == null ? JodaUtils.MAX_INSTANT : queryTimeout.longValue(); final List queryIntervals = query.getIntervals(); final Filter filter = Filters.convertToCNFFromQueryContext( query, @@ -133,7 +127,7 @@ public boolean apply(Row input) @Override public CloseableGrouperIterator make() { - final List closeOnFailure = Lists.newArrayList(); + final List closeOnExit = Lists.newArrayList(); try { final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage( @@ -141,9 +135,7 @@ public CloseableGrouperIterator make() querySpecificConfig.getMaxOnDiskStorage() ); - closeOnFailure.add(temporaryStorage); - - final SettableSupplier> bufferHolderSupplier = new SettableSupplier<>(); + closeOnExit.add(temporaryStorage); Pair, Accumulator, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( query, @@ -155,19 +147,9 @@ public CloseableGrouperIterator make() @Override public ByteBuffer get() { - final ReferenceCountingResourceHolder mergeBufferHolder; - try { - if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { - throw new QueryInterruptedException(new TimeoutException()); - } - bufferHolderSupplier.set(mergeBufferHolder); - closeOnFailure.add(mergeBufferHolder); - - return mergeBufferHolder.get(); - } - catch (InterruptedException e) { - throw new QueryInterruptedException(e); - } + final ResourceHolder mergeBufferHolder = brokerResource.getMergeBuffer(); + closeOnExit.add(mergeBufferHolder); + return mergeBufferHolder.get(); } }, -1, @@ -177,7 +159,7 @@ public ByteBuffer get() ); final Grouper grouper = pair.lhs; final Accumulator, Row> accumulator = pair.rhs; - closeOnFailure.add(grouper); + closeOnExit.add(grouper); final Grouper retVal = filteredSequence.accumulate( grouper, @@ -195,16 +177,16 @@ public ByteBuffer get() @Override public void close() throws IOException { - grouper.close(); - CloseQuietly.close(bufferHolderSupplier.get()); - CloseQuietly.close(temporaryStorage); + for (Closeable closeable : Lists.reverse(closeOnExit)) { + CloseQuietly.close(closeable); + } } } ); } catch (Throwable e) { // Exception caught while setting up the iterator; release resources. - for (Closeable closeable : Lists.reverse(closeOnFailure)) { + for (Closeable closeable : Lists.reverse(closeOnExit)) { CloseQuietly.close(closeable); } throw e; diff --git a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java new file mode 100644 index 000000000000..972cb479b460 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java @@ -0,0 +1,76 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby.resource; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.metamx.emitter.EmittingLogger; +import io.druid.collections.ResourceHolder; + +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * This class contains all resources required by the Broker during executing a group-by query. + * Currently, it contains only merge buffers, but any additional resources can be added in the future. + */ +public class GroupByQueryBrokerResource implements Closeable +{ + private static final EmittingLogger log = new EmittingLogger(GroupByQueryBrokerResource.class); + + private final ResourceHolder> mergeBuffersHolder; + private final List mergeBuffers; + + public GroupByQueryBrokerResource(ResourceHolder> mergeBuffersHolder) + { + this.mergeBuffersHolder = mergeBuffersHolder; + this.mergeBuffers = Lists.newArrayList(mergeBuffersHolder.get()); + } + + public ResourceHolder getMergeBuffer() + { + Preconditions.checkState(mergeBuffers.size() > 0); + final ByteBuffer buffer = mergeBuffers.remove(mergeBuffers.size() - 1); + return new ResourceHolder() + { + @Override + public ByteBuffer get() + { + return buffer; + } + + @Override + public void close() + { + mergeBuffers.add(buffer); + } + }; + } + + @Override + public void close() + { + if (mergeBuffers.size() != mergeBuffersHolder.get().size()) { + log.warn((mergeBuffersHolder.get().size() - mergeBuffers.size()) + " resources are not returned yet"); + } + mergeBuffersHolder.close(); + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResourceInitializer.java b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResourceInitializer.java new file mode 100644 index 000000000000..914b229e93a8 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResourceInitializer.java @@ -0,0 +1,91 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby.resource; + +import com.google.inject.Inject; +import io.druid.collections.BlockingPool; +import io.druid.collections.ResourceHolder; +import io.druid.guice.annotations.Merging; +import io.druid.query.DataSource; +import io.druid.query.Query; +import io.druid.query.QueryDataSource; +import io.druid.query.ResourceLimitExceededException; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.strategy.GroupByStrategySelector; + +import java.nio.ByteBuffer; +import java.util.List; + +/** + * This class is responsible for initializing a {@link GroupByQueryBrokerResource} before executin a group-by query. + */ +public class GroupByQueryBrokerResourceInitializer +{ + private final GroupByStrategySelector strategySelector; + private final BlockingPool mergeBufferPool; + + @Inject + public GroupByQueryBrokerResourceInitializer( + GroupByStrategySelector strategySelector, + @Merging BlockingPool mergeBufferPool + ) + { + this.strategySelector = strategySelector; + this.mergeBufferPool = mergeBufferPool; + } + + /** + * Prepares broker resources for executing the given query. + * + * @param query a query to be executed + * + * @return broker resource needed to execute the query + * + * @throws ResourceLimitExceededException if there isn't enough resources for query execution + */ + public GroupByQueryBrokerResource prepare(GroupByQuery query) + { + final int requiredMergeBufferNum; + if (strategySelector.useStrategyV2(query)) { + final int groupByLayerNum = countGroupByLayers(query, 1); + requiredMergeBufferNum = Math.min(2, groupByLayerNum - 1); + } else { + requiredMergeBufferNum = 0; + } + + final ResourceHolder> mergeBufferHolders = mergeBufferPool.drain(requiredMergeBufferNum); + if (mergeBufferHolders.get().size() < requiredMergeBufferNum) { + mergeBufferHolders.close(); + throw new ResourceLimitExceededException("Cannot acquire enough merge buffers"); + } else { + return new GroupByQueryBrokerResource(mergeBufferHolders); + } + } + + private static int countGroupByLayers(Query query, int foundNum) + { + final DataSource dataSource = query.getDataSource(); + if (dataSource instanceof QueryDataSource) { + return countGroupByLayers(((QueryDataSource) dataSource).getQuery(), foundNum + 1); + } else { + return foundNum; + } + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java index 4fb63a92e531..b8dbc1397ec9 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java @@ -24,6 +24,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.query.QueryRunner; import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.resource.GroupByQueryBrokerResource; import io.druid.segment.StorageAdapter; import java.util.Map; @@ -39,6 +40,7 @@ Sequence mergeResults( Sequence processSubqueryResult( GroupByQuery subquery, GroupByQuery query, + GroupByQueryBrokerResource resource, Sequence subqueryResult ); diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java index e222f14d0796..369459c29bf5 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java @@ -48,7 +48,7 @@ public GroupByStrategySelector( public GroupByStrategy strategize(GroupByQuery query) { - final String strategyString = config.withOverrides(query).getDefaultStrategy(); + final String strategyString = getStrategy(query); switch (strategyString) { case STRATEGY_V2: @@ -61,4 +61,14 @@ public GroupByStrategy strategize(GroupByQuery query) throw new ISE("No such strategy[%s]", strategyString); } } + + public String getStrategy(GroupByQuery query) + { + return config.withOverrides(query).getDefaultStrategy(); + } + + public boolean useStrategyV2(GroupByQuery query) + { + return getStrategy(query).equals(STRATEGY_V2); + } } diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 794fe68d7e6b..a320cd1169f3 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -46,6 +46,7 @@ import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryHelper; import io.druid.query.groupby.GroupByQueryQueryToolChest; +import io.druid.query.groupby.resource.GroupByQueryBrokerResource; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.StorageAdapter; import io.druid.segment.incremental.IncrementalIndex; @@ -124,7 +125,7 @@ public Sequence mergeResults( @Override public Sequence processSubqueryResult( - GroupByQuery subquery, GroupByQuery query, Sequence subqueryResult + GroupByQuery subquery, GroupByQuery query, GroupByQueryBrokerResource resource, Sequence subqueryResult ) { final Set aggs = Sets.newHashSet(); diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 95df89b550ab..1860ab08a3fd 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -49,6 +49,7 @@ import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryHelper; +import io.druid.query.groupby.resource.GroupByQueryBrokerResource; import io.druid.query.groupby.epinephelinae.GroupByBinaryFnV2; import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; @@ -202,7 +203,10 @@ public Row apply(final Row row) @Override public Sequence processSubqueryResult( - GroupByQuery subquery, GroupByQuery query, Sequence subqueryResult + GroupByQuery subquery, + GroupByQuery query, + GroupByQueryBrokerResource resource, + Sequence subqueryResult ) { final Sequence results = GroupByRowProcessor.process( @@ -210,7 +214,7 @@ public Sequence processSubqueryResult( subqueryResult, GroupByQueryHelper.rowSignatureFor(subquery), configSupplier.get(), - mergeBufferPool, + resource, spillMapper, processingConfig.getTmpDir() ); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index 5d04ab840fac..b98025733d49 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -40,6 +40,7 @@ import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; +import io.druid.query.groupby.resource.GroupByQueryBrokerResourceInitializer; import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; @@ -74,9 +75,20 @@ public TestBlockingPool(Supplier generator, int limit) public ReferenceCountingResourceHolder take(final long timeout) throws InterruptedException { final ReferenceCountingResourceHolder holder = super.take(timeout); - final int queueSize = getQueueSize(); - if (minRemainBufferNum > queueSize) { - minRemainBufferNum = queueSize; + final int poolSize = getPoolSize(); + if (minRemainBufferNum > poolSize) { + minRemainBufferNum = poolSize; + } + return holder; + } + + @Override + public ReferenceCountingResourceHolder> drain(final int maxElements) + { + final ReferenceCountingResourceHolder> holder = super.drain(maxElements); + final int poolSize = getPoolSize(); + if (minRemainBufferNum > poolSize) { + minRemainBufferNum = poolSize; } return holder; } @@ -155,9 +167,8 @@ public ByteBuffer get() ) ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( - configSupplier, strategySelector, - bufferPool, + new GroupByQueryBrokerResourceInitializer(strategySelector, mergeBufferPool), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ); return new GroupByQueryRunnerFactory( @@ -227,6 +238,7 @@ public void testSimpleGroupBy() GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); assertEquals(2, mergeBufferPool.getMinRemainBufferNum()); + assertEquals(3, mergeBufferPool.getPoolSize()); } @Test @@ -254,6 +266,7 @@ public void testNestedGroupBy() GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); assertEquals(1, mergeBufferPool.getMinRemainBufferNum()); + assertEquals(3, mergeBufferPool.getPoolSize()); } @Test @@ -291,7 +304,9 @@ public void testDoubleNestedGroupBy() GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); - assertEquals(1, mergeBufferPool.getMinRemainBufferNum()); + // This should be 0 because the broker needs 2 buffers and the queryable node needs one. + assertEquals(0, mergeBufferPool.getMinRemainBufferNum()); + assertEquals(3, mergeBufferPool.getPoolSize()); } @Test @@ -341,6 +356,8 @@ public void testTripleNestedGroupBy() GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); - assertEquals(1, mergeBufferPool.getMinRemainBufferNum()); + // This should be 0 because the broker needs 2 buffers and the queryable node needs one. + assertEquals(0, mergeBufferPool.getMinRemainBufferNum()); + assertEquals(3, mergeBufferPool.getPoolSize()); } } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java index 2489318d3a84..e7764d1ee622 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -31,6 +31,7 @@ import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.ResourceLimitExceededException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; @@ -113,7 +114,7 @@ public GroupByQueryRunnerFailureTest(QueryRunner runner) } @Test(timeout = 10000) - public void testLackOfMergeBuffers() throws IOException + public void testNotEnoughMergeBuffersOnQueryable() throws IOException { expectedException.expect(QueryInterruptedException.class); expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class)); @@ -139,4 +140,42 @@ public void testLackOfMergeBuffers() throws IOException GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } + + @Test(timeout = 10000) + public void testNotEnoughMergeBuffersOnBroker() + { + expectedException.expect(ResourceLimitExceededException.class); + + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval(QueryRunnerTestHelper.firstToThird) + .setGranularity(QueryGranularities.ALL) + .setDimensions(Lists.newArrayList( + new DefaultDimensionSpec("quality", "alias"), + new DefaultDimensionSpec("market", null) + )) + .setAggregatorSpecs(Lists.newArrayList(QueryRunnerTestHelper.rowsCount)) + .build() + ) + .setInterval(QueryRunnerTestHelper.firstToThird) + .setGranularity(QueryGranularities.ALL) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs(Lists.newArrayList(QueryRunnerTestHelper.rowsCount)) + .build() + ) + ) + .setGranularity(QueryGranularities.ALL) + .setInterval(QueryRunnerTestHelper.firstToThird) + .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) + .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .build(); + + GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + } } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 22b231693efe..285d95766f4e 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -107,6 +107,7 @@ import io.druid.query.groupby.orderby.DefaultLimitSpec; import io.druid.query.groupby.orderby.LimitSpec; import io.druid.query.groupby.orderby.OrderByColumnSpec; +import io.druid.query.groupby.resource.GroupByQueryBrokerResourceInitializer; import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; @@ -142,6 +143,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.junit.Assert.assertEquals; + @RunWith(Parameterized.class) public class GroupByQueryRunnerTest { @@ -347,9 +350,8 @@ public ByteBuffer get() ) ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( - configSupplier, strategySelector, - bufferPool, + new GroupByQueryBrokerResourceInitializer(strategySelector, mergeBufferPool), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ); return new GroupByQueryRunnerFactory( @@ -1793,7 +1795,7 @@ public void testGroupByWithNoResult() List expectedResults = ImmutableList.of(); Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); - Assert.assertEquals(expectedResults, results); + assertEquals(expectedResults, results); } @Test @@ -3140,7 +3142,7 @@ public void testLimitPerGrouping() final Object next1 = resultsIter.next(); Object expectedNext1 = expectedResultsIter.next(); - Assert.assertEquals("order-limit", expectedNext1, next1); + assertEquals("order-limit", expectedNext1, next1); final Object next2 = resultsIter.next(); Object expectedNext2 = expectedResultsIter.next(); @@ -6908,7 +6910,7 @@ public void testGroupByLongColumn() .setGranularity(QueryRunnerTestHelper.dayGran) .build(); - Assert.assertEquals( + assertEquals( Functions.>identity(), query.getLimitSpec().build( query.getDimensions(), @@ -7172,7 +7174,7 @@ public void testGroupByFloatColumn() .setGranularity(QueryRunnerTestHelper.dayGran) .build(); - Assert.assertEquals( + assertEquals( Functions.>identity(), query.getLimitSpec().build( query.getDimensions(), diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java index 4d416e762b0f..a89cdef5d21d 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java @@ -171,8 +171,9 @@ public int intermediateComputeSizeBytes() @Override public int getNumMergeBuffers() { - // Need 2 buffers for CalciteQueryTest.testDoubleNestedGroupby. - return 2; + // Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby. + // Two buffers for the broker and one for the queryable + return 3; } } ) From 8243531c6ed13c24947802c9ed21ba638fba6409 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 16 Feb 2017 07:42:18 +0900 Subject: [PATCH 2/9] documentation --- docs/content/querying/groupbyquery.md | 10 +++++----- docs/content/querying/sql.md | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index d455c27e3902..c70849609436 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -156,11 +156,11 @@ indexing mechanism, and runs the outer query on these materialized results. "v2" inner query's results stream with off-heap fact map and on-heap string dictionary that can spill to disk. Both strategy perform the outer query on the broker in a single-threaded fashion. -Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy. -This merge buffer is immediately released once they are not used anymore during the query processing. However, deeply -nested groupBys (there are two or more groupBy layers beyond the first one) can potentially lead to deadlocks since the -merge buffers are limited in number and are acquired one-by-one instead of a complete set. At this time, we recommend -that you avoid too many concurrent execution of deeply nested groupBys with the v2 strategy. +Currently, concurrent execution of multiple deeply nested groupBys (there are two or more groupBy layers beyond the +first one) can potentially lead to invalid results. Note that groupBys require a separate merge buffer on the broker for +each layer beyond the first layer of the groupBy. The merge buffer storing the inner groupBy result should be held until +the outer groupBy is completed, but this is currently not guaranteed. At this time, we recommend that you avoid too many +concurrent execution of deeply nested groupBys with the v2 strategy. #### Server configuration diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 36d0f7da739c..08ad33748d41 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -116,11 +116,11 @@ exact distinct count using a nested groupBy. SELECT COUNT(*) FROM (SELECT DISTINCT col FROM data_source) ``` -Note that groupBys require a separate merge buffer on the broker for each layer beyond the first layer of the groupBy. -This merge buffer is immediately released once they are not used anymore during the query processing. However, deeply -nested groupBys (there are two or more groupBy layers beyond the first one) can potentially lead to deadlocks since the -merge buffers are limited in number and are acquired one-by-one instead of a complete set. At this time, we recommend -that you avoid too many concurrent execution of deeply nested groupBys with the v2 strategy. +Currently, concurrent execution of multiple deeply nested groupBys (there are two or more groupBy layers beyond the +first one) can potentially lead to invalid results. Note that groupBys require a separate merge buffer on the broker for +each layer beyond the first layer of the groupBy. The merge buffer storing the inner groupBy result should be held until +the outer groupBy is completed, but this is currently not guaranteed. At this time, we recommend that you avoid too many +concurrent execution of deeply nested groupBys with the v2 strategy. #### Semi-joins From e4e9b56d0836a9c1911bab1d23ee656fd4a3d970 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 16 Feb 2017 15:39:03 +0900 Subject: [PATCH 3/9] documentation --- docs/content/querying/groupbyquery.md | 6 ------ docs/content/querying/sql.md | 6 ------ 2 files changed, 12 deletions(-) diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index c70849609436..2208fedc4e28 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -156,12 +156,6 @@ indexing mechanism, and runs the outer query on these materialized results. "v2" inner query's results stream with off-heap fact map and on-heap string dictionary that can spill to disk. Both strategy perform the outer query on the broker in a single-threaded fashion. -Currently, concurrent execution of multiple deeply nested groupBys (there are two or more groupBy layers beyond the -first one) can potentially lead to invalid results. Note that groupBys require a separate merge buffer on the broker for -each layer beyond the first layer of the groupBy. The merge buffer storing the inner groupBy result should be held until -the outer groupBy is completed, but this is currently not guaranteed. At this time, we recommend that you avoid too many -concurrent execution of deeply nested groupBys with the v2 strategy. - #### Server configuration When using the "v1" strategy, the following runtime properties apply: diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 08ad33748d41..77009f2f3d72 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -116,12 +116,6 @@ exact distinct count using a nested groupBy. SELECT COUNT(*) FROM (SELECT DISTINCT col FROM data_source) ``` -Currently, concurrent execution of multiple deeply nested groupBys (there are two or more groupBy layers beyond the -first one) can potentially lead to invalid results. Note that groupBys require a separate merge buffer on the broker for -each layer beyond the first layer of the groupBy. The merge buffer storing the inner groupBy result should be held until -the outer groupBy is completed, but this is currently not guaranteed. At this time, we recommend that you avoid too many -concurrent execution of deeply nested groupBys with the v2 strategy. - #### Semi-joins Semi-join subqueries involving `WHERE ... IN (SELECT ...)`, like the following, are executed with a special process. From 4f36f619bdfe5f7084913e77c583cf2f423d304a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 17 Feb 2017 18:31:00 +0900 Subject: [PATCH 4/9] address comments --- .../GroupByTypeInterfaceBenchmark.java | 2 - .../benchmark/query/GroupByBenchmark.java | 2 - .../io/druid/collections/BlockingPool.java | 61 +++++++++---- .../druid/collections/BlockingPoolTest.java | 18 ++-- .../groupby/GroupByQueryQueryToolChest.java | 15 +-- .../GroupByMergingQueryRunnerV2.java | 4 +- .../resource/GroupByQueryBrokerResource.java | 23 ++++- ...GroupByQueryBrokerResourceInitializer.java | 91 ------------------- .../groupby/strategy/GroupByStrategy.java | 8 ++ .../groupby/strategy/GroupByStrategyV1.java | 6 ++ .../groupby/strategy/GroupByStrategyV2.java | 60 +++++++++++- .../groupby/GroupByQueryMergeBufferTest.java | 8 +- .../query/groupby/GroupByQueryRunnerTest.java | 2 - 13 files changed, 159 insertions(+), 141 deletions(-) delete mode 100644 processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResourceInitializer.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java index 49de3725c349..314b486d4488 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -61,7 +61,6 @@ import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryQueryToolChest; import io.druid.query.groupby.GroupByQueryRunnerFactory; -import io.druid.query.groupby.resource.GroupByQueryBrokerResourceInitializer; import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; @@ -427,7 +426,6 @@ public String getFormatString() strategySelector, new GroupByQueryQueryToolChest( strategySelector, - new GroupByQueryBrokerResourceInitializer(strategySelector, mergePool), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator() ) ); diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index 1bcdde4e125b..6725977c10fd 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -61,7 +61,6 @@ import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryQueryToolChest; import io.druid.query.groupby.GroupByQueryRunnerFactory; -import io.druid.query.groupby.resource.GroupByQueryBrokerResourceInitializer; import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; @@ -470,7 +469,6 @@ public String getFormatString() strategySelector, new GroupByQueryQueryToolChest( strategySelector, - new GroupByQueryBrokerResourceInitializer(strategySelector, mergePool), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator() ) ); diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index a3192cdd7468..0ff1571a97b9 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -22,8 +22,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Queues; import io.druid.java.util.common.logger.Logger; import java.io.Closeable; @@ -41,6 +43,7 @@ public class BlockingPool private static final Logger log = new Logger(BlockingPool.class); private final BlockingQueue objects; + private final int maxSize; public BlockingPool( Supplier generator, @@ -48,12 +51,18 @@ public BlockingPool( ) { this.objects = limit > 0 ? new ArrayBlockingQueue(limit) : null; + this.maxSize = limit; for (int i = 0; i < limit; i++) { objects.add(generator.get()); } } + public int maxSize() + { + return maxSize; + } + @VisibleForTesting public int getPoolSize() { @@ -66,40 +75,58 @@ public int getPoolSize() * @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout. * * @return a resource, or null if the timeout was reached - * - * @throws InterruptedException if interrupted while waiting for a resource to become available */ - public ReferenceCountingResourceHolder take(final long timeout) throws InterruptedException + public ReferenceCountingResourceHolder take(final long timeout) { checkInitialized(); - final T theObject = timeout >= 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.take(); - return theObject == null ? null : new ReferenceCountingResourceHolder<>( - theObject, - new Closeable() - { - @Override - public void close() throws IOException + final T theObject; + try { + theObject = timeout >= 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.take(); + return theObject == null ? null : new ReferenceCountingResourceHolder<>( + theObject, + new Closeable() { - offer(theObject); + @Override + public void close() throws IOException + { + offer(theObject); + } } - } - ); + ); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } } /** * Drains at most the given number of available resources from the pool. * * @param maxElements the maximum number of elements to drain + * @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout. * * @return a resource holder which contains the drained resources */ - public ReferenceCountingResourceHolder> drain(final int maxElements) + public ReferenceCountingResourceHolder> drain(final int maxElements, final long timeout) { checkInitialized(); final List batch = Lists.newArrayListWithCapacity(maxElements); - final int n = objects.drainTo(batch, maxElements); - if (log.isDebugEnabled()) { - log.debug("Requested " + maxElements + " elements, but drained " + n + " elements"); + + try { + final int n = timeout >= 0 ? + Queues.drain(objects, batch, maxElements, timeout, TimeUnit.MILLISECONDS) : + objects.drainTo(batch, maxElements); + if (n < maxElements) { + if (log.isDebugEnabled()) { + log.debug("Requested " + maxElements + " elements, but drained " + n + " elements"); + } + } + } + catch (InterruptedException e) { + for (T obj : batch) { + offer(obj); + } + throw Throwables.propagate(e); } final List resources = ImmutableList.copyOf(batch); diff --git a/common/src/test/java/io/druid/collections/BlockingPoolTest.java b/common/src/test/java/io/druid/collections/BlockingPoolTest.java index 93b45c247a5e..039b087200e6 100644 --- a/common/src/test/java/io/druid/collections/BlockingPoolTest.java +++ b/common/src/test/java/io/druid/collections/BlockingPoolTest.java @@ -39,7 +39,7 @@ public class BlockingPoolTest public ExpectedException expectedException = ExpectedException.none(); @Test - public void testTakeFromEmptyPool() throws InterruptedException + public void testTakeFromEmptyPool() { expectedException.expect(IllegalStateException.class); expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take."); @@ -51,11 +51,11 @@ public void testDrainFromEmptyPool() { expectedException.expect(IllegalStateException.class); expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take."); - EMPTY_POOL.drain(1); + EMPTY_POOL.drain(1, 0); } @Test(timeout = 1000) - public void testTake() throws InterruptedException + public void testTake() { final ReferenceCountingResourceHolder holder = POOL.take(100); assertNotNull(holder); @@ -65,18 +65,18 @@ public void testTake() throws InterruptedException } @Test(timeout = 1000) - public void testTakeTimeout() throws InterruptedException + public void testTakeTimeout() { - final ReferenceCountingResourceHolder> batchHolder = POOL.drain(10); + final ReferenceCountingResourceHolder> batchHolder = POOL.drain(10, 100L); final ReferenceCountingResourceHolder holder = POOL.take(100); assertNull(holder); batchHolder.close(); } - @Test + @Test(timeout = 1000) public void testDrain() { - final ReferenceCountingResourceHolder> holder = POOL.drain(6); + final ReferenceCountingResourceHolder> holder = POOL.drain(6, 100L); assertNotNull(holder); assertEquals(6, holder.get().size()); assertEquals(4, POOL.getPoolSize()); @@ -84,10 +84,10 @@ public void testDrain() assertEquals(10, POOL.getPoolSize()); } - @Test + @Test(timeout = 1000) public void testDrainTooManyObjects() { - final ReferenceCountingResourceHolder> holder = POOL.drain(100); + final ReferenceCountingResourceHolder> holder = POOL.drain(100, 100L); assertNotNull(holder); assertEquals(10, holder.get().size()); assertEquals(0, POOL.getPoolSize()); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 2af057ef0ea1..6dfa034407a1 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -56,7 +56,7 @@ import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.query.groupby.resource.GroupByQueryBrokerResource; -import io.druid.query.groupby.resource.GroupByQueryBrokerResourceInitializer; +import io.druid.query.groupby.strategy.GroupByStrategy; import io.druid.query.groupby.strategy.GroupByStrategySelector; import org.joda.time.DateTime; @@ -84,18 +84,15 @@ public class GroupByQueryQueryToolChest extends QueryToolChest initAndMergeGroupByResults( Map context ) { - final GroupByQueryBrokerResource resource = brokerResourceInitializer.prepare(query); + final GroupByStrategy groupByStrategy = strategySelector.strategize(query); + final GroupByQueryBrokerResource resource = groupByStrategy.prepareBrokerResource(query); return Sequences.withBaggage( mergeGroupByResults( + groupByStrategy, query, resource, runner, @@ -143,6 +142,7 @@ private Sequence initAndMergeGroupByResults( } private Sequence mergeGroupByResults( + GroupByStrategy groupByStrategy, final GroupByQuery query, GroupByQueryBrokerResource brokerResource, QueryRunner runner, @@ -177,6 +177,7 @@ private Sequence mergeGroupByResults( } final Sequence subqueryResult = mergeGroupByResults( + groupByStrategy, subquery.withOverriddenContext( ImmutableMap.of( //setting sort to false avoids unnecessary sorting while merging results. we only need to sort @@ -203,9 +204,9 @@ private Sequence mergeGroupByResults( finalizingResults = subqueryResult; } - return strategySelector.strategize(query).processSubqueryResult(subquery, query, brokerResource, finalizingResults); + return groupByStrategy.processSubqueryResult(subquery, query, brokerResource, finalizingResults); } else { - return strategySelector.strategize(query).mergeResults(runner, query, context); + return groupByStrategy.mergeResults(runner, query, context); } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index ab06082c69c4..b9311bb6b882 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -169,11 +169,11 @@ public CloseableGrouperIterator make() // This will potentially block if there are no merge buffers left in the pool. final long timeout = timeoutAt - System.currentTimeMillis(); if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) { - throw new QueryInterruptedException(new TimeoutException()); + throw new TimeoutException(); } resources.add(mergeBufferHolder); } - catch (InterruptedException e) { + catch (Exception e) { throw new QueryInterruptedException(e); } diff --git a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java index 972cb479b460..8b4a95603ecd 100644 --- a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java +++ b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java @@ -39,14 +39,29 @@ public class GroupByQueryBrokerResource implements Closeable private final ResourceHolder> mergeBuffersHolder; private final List mergeBuffers; + public GroupByQueryBrokerResource() + { + this.mergeBuffersHolder = null; + this.mergeBuffers = null; + } + public GroupByQueryBrokerResource(ResourceHolder> mergeBuffersHolder) { this.mergeBuffersHolder = mergeBuffersHolder; this.mergeBuffers = Lists.newArrayList(mergeBuffersHolder.get()); } + /** + * Get a merge buffer from the pre-acquired broker resources. + * + * @return a resource holder containing a merge buffer + * + * @throws IllegalStateException if this resource is not initialized with available merge buffers, or + * there isn't any available merge buffers + */ public ResourceHolder getMergeBuffer() { + Preconditions.checkState(mergeBuffers != null); Preconditions.checkState(mergeBuffers.size() > 0); final ByteBuffer buffer = mergeBuffers.remove(mergeBuffers.size() - 1); return new ResourceHolder() @@ -68,9 +83,11 @@ public void close() @Override public void close() { - if (mergeBuffers.size() != mergeBuffersHolder.get().size()) { - log.warn((mergeBuffersHolder.get().size() - mergeBuffers.size()) + " resources are not returned yet"); + if (mergeBuffersHolder != null) { + if (mergeBuffers.size() != mergeBuffersHolder.get().size()) { + log.warn((mergeBuffersHolder.get().size() - mergeBuffers.size()) + " resources are not returned yet"); + } + mergeBuffersHolder.close(); } - mergeBuffersHolder.close(); } } diff --git a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResourceInitializer.java b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResourceInitializer.java deleted file mode 100644 index 914b229e93a8..000000000000 --- a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResourceInitializer.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.query.groupby.resource; - -import com.google.inject.Inject; -import io.druid.collections.BlockingPool; -import io.druid.collections.ResourceHolder; -import io.druid.guice.annotations.Merging; -import io.druid.query.DataSource; -import io.druid.query.Query; -import io.druid.query.QueryDataSource; -import io.druid.query.ResourceLimitExceededException; -import io.druid.query.groupby.GroupByQuery; -import io.druid.query.groupby.strategy.GroupByStrategySelector; - -import java.nio.ByteBuffer; -import java.util.List; - -/** - * This class is responsible for initializing a {@link GroupByQueryBrokerResource} before executin a group-by query. - */ -public class GroupByQueryBrokerResourceInitializer -{ - private final GroupByStrategySelector strategySelector; - private final BlockingPool mergeBufferPool; - - @Inject - public GroupByQueryBrokerResourceInitializer( - GroupByStrategySelector strategySelector, - @Merging BlockingPool mergeBufferPool - ) - { - this.strategySelector = strategySelector; - this.mergeBufferPool = mergeBufferPool; - } - - /** - * Prepares broker resources for executing the given query. - * - * @param query a query to be executed - * - * @return broker resource needed to execute the query - * - * @throws ResourceLimitExceededException if there isn't enough resources for query execution - */ - public GroupByQueryBrokerResource prepare(GroupByQuery query) - { - final int requiredMergeBufferNum; - if (strategySelector.useStrategyV2(query)) { - final int groupByLayerNum = countGroupByLayers(query, 1); - requiredMergeBufferNum = Math.min(2, groupByLayerNum - 1); - } else { - requiredMergeBufferNum = 0; - } - - final ResourceHolder> mergeBufferHolders = mergeBufferPool.drain(requiredMergeBufferNum); - if (mergeBufferHolders.get().size() < requiredMergeBufferNum) { - mergeBufferHolders.close(); - throw new ResourceLimitExceededException("Cannot acquire enough merge buffers"); - } else { - return new GroupByQueryBrokerResource(mergeBufferHolders); - } - } - - private static int countGroupByLayers(Query query, int foundNum) - { - final DataSource dataSource = query.getDataSource(); - if (dataSource instanceof QueryDataSource) { - return countGroupByLayers(((QueryDataSource) dataSource).getQuery(), foundNum + 1); - } else { - return foundNum; - } - } -} diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java index b8dbc1397ec9..4ea78ba4bc31 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java @@ -31,6 +31,14 @@ public interface GroupByStrategy { + /** + * Initializes resources required for a broker to process the given query. + * + * @param query a groupBy query to be processed + * @return broker resource + */ + GroupByQueryBrokerResource prepareBrokerResource(GroupByQuery query); + Sequence mergeResults( QueryRunner baseRunner, GroupByQuery query, diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index a320cd1169f3..bfefcf31434f 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -78,6 +78,12 @@ public GroupByStrategyV1( this.bufferPool = bufferPool; } + @Override + public GroupByQueryBrokerResource prepareBrokerResource(GroupByQuery query) + { + return new GroupByQueryBrokerResource(); + } + @Override public Sequence mergeResults( final QueryRunner baseRunner, diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 1860ab08a3fd..c218be8d07cb 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -29,7 +29,9 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; import io.druid.collections.BlockingPool; +import io.druid.collections.ResourceHolder; import io.druid.collections.StupidPool; +import io.druid.common.utils.JodaUtils; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularities; @@ -40,25 +42,32 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.nary.BinaryFn; +import io.druid.query.DataSource; import io.druid.query.DruidProcessingConfig; import io.druid.query.Query; +import io.druid.query.QueryContextKeys; +import io.druid.query.QueryDataSource; +import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; +import io.druid.query.ResourceLimitExceededException; import io.druid.query.ResultMergeQueryRunner; import io.druid.query.aggregation.PostAggregator; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryHelper; -import io.druid.query.groupby.resource.GroupByQueryBrokerResource; import io.druid.query.groupby.epinephelinae.GroupByBinaryFnV2; import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; import io.druid.query.groupby.epinephelinae.GroupByRowProcessor; +import io.druid.query.groupby.resource.GroupByQueryBrokerResource; import io.druid.segment.StorageAdapter; import org.joda.time.DateTime; import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; public class GroupByStrategyV2 implements GroupByStrategy { @@ -113,6 +122,55 @@ public static DateTime getUniversalTimestamp(final GroupByQuery query) } } + @Override + public GroupByQueryBrokerResource prepareBrokerResource(GroupByQuery query) + { + final int groupByLayerNum = countGroupByLayers(query, 1); + + // Note: A broker requires merge buffers for processing the groupBy layers beyond the inner-most one. + // For example, the number of required merge buffers for a nested groupBy (groupBy -> groupBy -> table) is 1. + // If the broker processes an outer groupBy which reads input from an inner groupBy, + // it requires two merge buffers for inner and outer groupBys to keep the intermediate result of inner groupBy + // until the outer groupBy processing completes. + // This is same for subsequent groupBy layers, and thus the maximum number of required merge buffers becomes 2. + final int requiredMergeBufferNum = Math.min(2, groupByLayerNum - 1); + + if (requiredMergeBufferNum > mergeBufferPool.maxSize()) { + throw new ResourceLimitExceededException( + "Query needs " + requiredMergeBufferNum + " merge buffers, but only " + + mergeBufferPool.maxSize() + " is configured" + ); + } else if (requiredMergeBufferNum == 0) { + return new GroupByQueryBrokerResource(); + } else { + final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT); + final ResourceHolder> mergeBufferHolders; + + try { + mergeBufferHolders = mergeBufferPool.drain(requiredMergeBufferNum, timeout.longValue()); + if (mergeBufferHolders.get().size() < requiredMergeBufferNum) { + mergeBufferHolders.close(); + throw new TimeoutException("Cannot acquire enough merge buffers"); + } else { + return new GroupByQueryBrokerResource(mergeBufferHolders); + } + } + catch (Exception e) { + throw new QueryInterruptedException(e); + } + } + } + + private static int countGroupByLayers(Query query, int foundNum) + { + final DataSource dataSource = query.getDataSource(); + if (dataSource instanceof QueryDataSource) { + return countGroupByLayers(((QueryDataSource) dataSource).getQuery(), foundNum + 1); + } else { + return foundNum; + } + } + @Override public Sequence mergeResults( final QueryRunner baseRunner, diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index b98025733d49..96bd05b135fc 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -40,7 +40,6 @@ import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; -import io.druid.query.groupby.resource.GroupByQueryBrokerResourceInitializer; import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; @@ -72,7 +71,7 @@ public TestBlockingPool(Supplier generator, int limit) } @Override - public ReferenceCountingResourceHolder take(final long timeout) throws InterruptedException + public ReferenceCountingResourceHolder take(final long timeout) { final ReferenceCountingResourceHolder holder = super.take(timeout); final int poolSize = getPoolSize(); @@ -83,9 +82,9 @@ public ReferenceCountingResourceHolder take(final long timeout) thro } @Override - public ReferenceCountingResourceHolder> drain(final int maxElements) + public ReferenceCountingResourceHolder> drain(final int maxElements, final long timeout) { - final ReferenceCountingResourceHolder> holder = super.drain(maxElements); + final ReferenceCountingResourceHolder> holder = super.drain(maxElements, timeout); final int poolSize = getPoolSize(); if (minRemainBufferNum > poolSize) { minRemainBufferNum = poolSize; @@ -168,7 +167,6 @@ public ByteBuffer get() ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( strategySelector, - new GroupByQueryBrokerResourceInitializer(strategySelector, mergeBufferPool), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ); return new GroupByQueryRunnerFactory( diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 285d95766f4e..ca7c1f42c3ae 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -107,7 +107,6 @@ import io.druid.query.groupby.orderby.DefaultLimitSpec; import io.druid.query.groupby.orderby.LimitSpec; import io.druid.query.groupby.orderby.OrderByColumnSpec; -import io.druid.query.groupby.resource.GroupByQueryBrokerResourceInitializer; import io.druid.query.groupby.strategy.GroupByStrategySelector; import io.druid.query.groupby.strategy.GroupByStrategyV1; import io.druid.query.groupby.strategy.GroupByStrategyV2; @@ -351,7 +350,6 @@ public ByteBuffer get() ); final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( strategySelector, - new GroupByQueryBrokerResourceInitializer(strategySelector, mergeBufferPool), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() ); return new GroupByQueryRunnerFactory( From 875a8acc47163befc83cbf94e74d07cac63f212c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 18 Feb 2017 09:49:37 +0900 Subject: [PATCH 5/9] address comments --- .../resource/GroupByQueryBrokerResource.java | 4 ++-- .../groupby/strategy/GroupByStrategySelector.java | 12 +----------- .../query/groupby/strategy/GroupByStrategyV2.java | 14 ++++++-------- 3 files changed, 9 insertions(+), 21 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java index 8b4a95603ecd..53425e55b543 100644 --- a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java +++ b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java @@ -21,7 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.metamx.emitter.EmittingLogger; +import com.metamx.common.logger.Logger; import io.druid.collections.ResourceHolder; import java.io.Closeable; @@ -34,7 +34,7 @@ */ public class GroupByQueryBrokerResource implements Closeable { - private static final EmittingLogger log = new EmittingLogger(GroupByQueryBrokerResource.class); + private static final Logger log = new Logger(GroupByQueryBrokerResource.class); private final ResourceHolder> mergeBuffersHolder; private final List mergeBuffers; diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java index 369459c29bf5..e222f14d0796 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategySelector.java @@ -48,7 +48,7 @@ public GroupByStrategySelector( public GroupByStrategy strategize(GroupByQuery query) { - final String strategyString = getStrategy(query); + final String strategyString = config.withOverrides(query).getDefaultStrategy(); switch (strategyString) { case STRATEGY_V2: @@ -61,14 +61,4 @@ public GroupByStrategy strategize(GroupByQuery query) throw new ISE("No such strategy[%s]", strategyString); } } - - public String getStrategy(GroupByQuery query) - { - return config.withOverrides(query).getDefaultStrategy(); - } - - public boolean useStrategyV2(GroupByQuery query) - { - return getStrategy(query).equals(STRATEGY_V2); - } } diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index c218be8d07cb..9919ffbffc27 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -125,20 +125,18 @@ public static DateTime getUniversalTimestamp(final GroupByQuery query) @Override public GroupByQueryBrokerResource prepareBrokerResource(GroupByQuery query) { - final int groupByLayerNum = countGroupByLayers(query, 1); - // Note: A broker requires merge buffers for processing the groupBy layers beyond the inner-most one. // For example, the number of required merge buffers for a nested groupBy (groupBy -> groupBy -> table) is 1. // If the broker processes an outer groupBy which reads input from an inner groupBy, // it requires two merge buffers for inner and outer groupBys to keep the intermediate result of inner groupBy // until the outer groupBy processing completes. // This is same for subsequent groupBy layers, and thus the maximum number of required merge buffers becomes 2. - final int requiredMergeBufferNum = Math.min(2, groupByLayerNum - 1); + final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 2, 1); if (requiredMergeBufferNum > mergeBufferPool.maxSize()) { throw new ResourceLimitExceededException( "Query needs " + requiredMergeBufferNum + " merge buffers, but only " - + mergeBufferPool.maxSize() + " is configured" + + mergeBufferPool.maxSize() + " merge buffers are configured" ); } else if (requiredMergeBufferNum == 0) { return new GroupByQueryBrokerResource(); @@ -161,13 +159,13 @@ public GroupByQueryBrokerResource prepareBrokerResource(GroupByQuery query) } } - private static int countGroupByLayers(Query query, int foundNum) + private static int countRequiredMergeBufferNum(Query query, int maxBufferNum, int foundNum) { final DataSource dataSource = query.getDataSource(); - if (dataSource instanceof QueryDataSource) { - return countGroupByLayers(((QueryDataSource) dataSource).getQuery(), foundNum + 1); + if (foundNum == maxBufferNum + 1 || !(dataSource instanceof QueryDataSource)) { + return foundNum - 1; } else { - return foundNum; + return countRequiredMergeBufferNum(((QueryDataSource) dataSource).getQuery(), maxBufferNum, foundNum + 1); } } From f368a5e7a7ec08416348d2a07ff5d0e55c53b89a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 22 Feb 2017 09:20:46 +0900 Subject: [PATCH 6/9] fix test failure --- .../query/groupby/resource/GroupByQueryBrokerResource.java | 4 ++-- .../io/druid/query/groupby/strategy/GroupByStrategyV2.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java index 53425e55b543..131c8e901af2 100644 --- a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java +++ b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java @@ -61,8 +61,8 @@ public GroupByQueryBrokerResource(ResourceHolder> mergeBuffersH */ public ResourceHolder getMergeBuffer() { - Preconditions.checkState(mergeBuffers != null); - Preconditions.checkState(mergeBuffers.size() > 0); + Preconditions.checkState(mergeBuffers != null, "Resource is initialized with empty merge buffers"); + Preconditions.checkState(mergeBuffers.size() > 0, "No available merge buffers"); final ByteBuffer buffer = mergeBuffers.remove(mergeBuffers.size() - 1); return new ResourceHolder() { diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index fec6839db196..440729da890f 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -125,7 +125,7 @@ public static DateTime getUniversalTimestamp(final GroupByQuery query) @Override public GroupByQueryBrokerResource prepareResource(GroupByQuery query, boolean willMergeRunners) { - if (willMergeRunners) { + if (!willMergeRunners) { // Note: A broker requires merge buffers for processing the groupBy layers beyond the inner-most one. // For example, the number of required merge buffers for a nested groupBy (groupBy -> groupBy -> table) is 1. // If the broker processes an outer groupBy which reads input from an inner groupBy, From 64b69ee20dedd8dd7074e64408b794d9745e9289 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 22 Feb 2017 12:23:35 +0900 Subject: [PATCH 7/9] Addressed comments - Add InsufficientResourcesException - Renamed GroupByQueryBrokerResource to GroupByQueryResource --- .../query/InsufficientResourcesException.java | 31 +++++++++++++++ .../groupby/GroupByQueryQueryToolChest.java | 10 ++--- .../epinephelinae/GroupByRowProcessor.java | 6 +-- ...esource.java => GroupByQueryResource.java} | 30 +++++++------- .../groupby/strategy/GroupByStrategy.java | 6 +-- .../groupby/strategy/GroupByStrategyV1.java | 8 ++-- .../groupby/strategy/GroupByStrategyV2.java | 39 ++++++++++--------- 7 files changed, 81 insertions(+), 49 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/InsufficientResourcesException.java rename processing/src/main/java/io/druid/query/groupby/resource/{GroupByQueryBrokerResource.java => GroupByQueryResource.java} (63%) diff --git a/processing/src/main/java/io/druid/query/InsufficientResourcesException.java b/processing/src/main/java/io/druid/query/InsufficientResourcesException.java new file mode 100644 index 000000000000..993379c2f882 --- /dev/null +++ b/processing/src/main/java/io/druid/query/InsufficientResourcesException.java @@ -0,0 +1,31 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +/** + * This exception is thrown when the requested operation cannot be completed due to a lack of available resources. + */ +public class InsufficientResourcesException extends Exception +{ + public InsufficientResourcesException(String message) + { + super(message); + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index fa9525402725..1255b1756e02 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -55,7 +55,7 @@ import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; -import io.druid.query.groupby.resource.GroupByQueryBrokerResource; +import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.query.groupby.strategy.GroupByStrategy; import io.druid.query.groupby.strategy.GroupByStrategySelector; import org.joda.time.DateTime; @@ -127,7 +127,7 @@ private Sequence initAndMergeGroupByResults( ) { final GroupByStrategy groupByStrategy = strategySelector.strategize(query); - final GroupByQueryBrokerResource resource = groupByStrategy.prepareResource(query, false); + final GroupByQueryResource resource = groupByStrategy.prepareResource(query, false); return Sequences.withBaggage( mergeGroupByResults( @@ -144,7 +144,7 @@ private Sequence initAndMergeGroupByResults( private Sequence mergeGroupByResults( GroupByStrategy groupByStrategy, final GroupByQuery query, - GroupByQueryBrokerResource brokerResource, + GroupByQueryResource resource, QueryRunner runner, Map context ) @@ -186,7 +186,7 @@ private Sequence mergeGroupByResults( false ) ), - brokerResource, + resource, runner, context ); @@ -204,7 +204,7 @@ private Sequence mergeGroupByResults( finalizingResults = subqueryResult; } - return groupByStrategy.processSubqueryResult(subquery, query, brokerResource, finalizingResults); + return groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults); } else { return groupByStrategy.mergeResults(runner, query, context); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 9ac1d4fdfa34..37e0b810615c 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -41,7 +41,7 @@ import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.RowBasedColumnSelectorFactory; import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; -import io.druid.query.groupby.resource.GroupByQueryBrokerResource; +import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.segment.column.ValueType; import io.druid.segment.filter.BooleanValueMatcher; import io.druid.segment.filter.Filters; @@ -63,7 +63,7 @@ public static Sequence process( final Sequence rows, final Map rowSignature, final GroupByQueryConfig config, - final GroupByQueryBrokerResource brokerResource, + final GroupByQueryResource resource, final ObjectMapper spillMapper, final String processingTmpDir ) @@ -147,7 +147,7 @@ public CloseableGrouperIterator make() @Override public ByteBuffer get() { - final ResourceHolder mergeBufferHolder = brokerResource.getMergeBuffer(); + final ResourceHolder mergeBufferHolder = resource.getMergeBuffer(); closeOnExit.add(mergeBufferHolder); return mergeBufferHolder.get(); } diff --git a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java similarity index 63% rename from processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java rename to processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java index 131c8e901af2..fa993af9c303 100644 --- a/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryBrokerResource.java +++ b/processing/src/main/java/io/druid/query/groupby/resource/GroupByQueryResource.java @@ -19,51 +19,49 @@ package io.druid.query.groupby.resource; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.metamx.common.logger.Logger; import io.druid.collections.ResourceHolder; import java.io.Closeable; import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.List; /** - * This class contains all resources required by the Broker during executing a group-by query. + * This class contains resources required for a groupBy query execution. * Currently, it contains only merge buffers, but any additional resources can be added in the future. */ -public class GroupByQueryBrokerResource implements Closeable +public class GroupByQueryResource implements Closeable { - private static final Logger log = new Logger(GroupByQueryBrokerResource.class); + private static final Logger log = new Logger(GroupByQueryResource.class); private final ResourceHolder> mergeBuffersHolder; - private final List mergeBuffers; + private final Deque mergeBuffers; - public GroupByQueryBrokerResource() + public GroupByQueryResource() { this.mergeBuffersHolder = null; - this.mergeBuffers = null; + this.mergeBuffers = new ArrayDeque<>(); } - public GroupByQueryBrokerResource(ResourceHolder> mergeBuffersHolder) + public GroupByQueryResource(ResourceHolder> mergeBuffersHolder) { this.mergeBuffersHolder = mergeBuffersHolder; - this.mergeBuffers = Lists.newArrayList(mergeBuffersHolder.get()); + this.mergeBuffers = new ArrayDeque<>(mergeBuffersHolder.get()); } /** - * Get a merge buffer from the pre-acquired broker resources. + * Get a merge buffer from the pre-acquired resources. * * @return a resource holder containing a merge buffer * - * @throws IllegalStateException if this resource is not initialized with available merge buffers, or + * @throws IllegalStateException if this resource is initialized with empty merge buffers, or * there isn't any available merge buffers */ public ResourceHolder getMergeBuffer() { - Preconditions.checkState(mergeBuffers != null, "Resource is initialized with empty merge buffers"); - Preconditions.checkState(mergeBuffers.size() > 0, "No available merge buffers"); - final ByteBuffer buffer = mergeBuffers.remove(mergeBuffers.size() - 1); + final ByteBuffer buffer = mergeBuffers.pop(); return new ResourceHolder() { @Override @@ -85,7 +83,7 @@ public void close() { if (mergeBuffersHolder != null) { if (mergeBuffers.size() != mergeBuffersHolder.get().size()) { - log.warn((mergeBuffersHolder.get().size() - mergeBuffers.size()) + " resources are not returned yet"); + log.warn("%d resources are not returned yet", mergeBuffersHolder.get().size() - mergeBuffers.size()); } mergeBuffersHolder.close(); } diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java index e5ad57c98228..7513fd0286bc 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategy.java @@ -25,7 +25,7 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.groupby.GroupByQuery; -import io.druid.query.groupby.resource.GroupByQueryBrokerResource; +import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.segment.StorageAdapter; import java.util.Map; @@ -39,7 +39,7 @@ public interface GroupByStrategy * @param query a groupBy query to be processed * @return broker resource */ - GroupByQueryBrokerResource prepareResource(GroupByQuery query, boolean willMergeRunners); + GroupByQueryResource prepareResource(GroupByQuery query, boolean willMergeRunners); /** * Indicates this strategy is cacheable or not. @@ -60,7 +60,7 @@ Sequence mergeResults( Sequence processSubqueryResult( GroupByQuery subquery, GroupByQuery query, - GroupByQueryBrokerResource resource, + GroupByQueryResource resource, Sequence subqueryResult ); diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index fb5062def372..abd6fcc32e69 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -46,7 +46,7 @@ import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryHelper; import io.druid.query.groupby.GroupByQueryQueryToolChest; -import io.druid.query.groupby.resource.GroupByQueryBrokerResource; +import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.StorageAdapter; import io.druid.segment.incremental.IncrementalIndex; @@ -79,9 +79,9 @@ public GroupByStrategyV1( } @Override - public GroupByQueryBrokerResource prepareResource(GroupByQuery query, boolean willMergeRunners) + public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMergeRunners) { - return new GroupByQueryBrokerResource(); + return new GroupByQueryResource(); } @Override @@ -137,7 +137,7 @@ public Sequence mergeResults( @Override public Sequence processSubqueryResult( - GroupByQuery subquery, GroupByQuery query, GroupByQueryBrokerResource resource, Sequence subqueryResult + GroupByQuery subquery, GroupByQuery query, GroupByQueryResource resource, Sequence subqueryResult ) { final Set aggs = Sets.newHashSet(); diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 440729da890f..d69047898da4 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -51,6 +51,7 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; import io.druid.query.ResourceLimitExceededException; +import io.druid.query.InsufficientResourcesException; import io.druid.query.ResultMergeQueryRunner; import io.druid.query.aggregation.PostAggregator; import io.druid.query.groupby.GroupByQuery; @@ -60,20 +61,21 @@ import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; import io.druid.query.groupby.epinephelinae.GroupByRowProcessor; -import io.druid.query.groupby.resource.GroupByQueryBrokerResource; +import io.druid.query.groupby.resource.GroupByQueryResource; import io.druid.segment.StorageAdapter; import org.joda.time.DateTime; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeoutException; public class GroupByStrategyV2 implements GroupByStrategy { public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp"; public static final String CTX_KEY_OUTERMOST = "groupByOutermost"; + private static final int MAX_MERGE_BUFFER_NUM = 2; + private final DruidProcessingConfig processingConfig; private final Supplier configSupplier; private final StupidPool bufferPool; @@ -123,16 +125,10 @@ public static DateTime getUniversalTimestamp(final GroupByQuery query) } @Override - public GroupByQueryBrokerResource prepareResource(GroupByQuery query, boolean willMergeRunners) + public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMergeRunners) { if (!willMergeRunners) { - // Note: A broker requires merge buffers for processing the groupBy layers beyond the inner-most one. - // For example, the number of required merge buffers for a nested groupBy (groupBy -> groupBy -> table) is 1. - // If the broker processes an outer groupBy which reads input from an inner groupBy, - // it requires two merge buffers for inner and outer groupBys to keep the intermediate result of inner groupBy - // until the outer groupBy processing completes. - // This is same for subsequent groupBy layers, and thus the maximum number of required merge buffers becomes 2. - final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 2, 1); + final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1); if (requiredMergeBufferNum > mergeBufferPool.maxSize()) { throw new ResourceLimitExceededException( @@ -140,7 +136,7 @@ public GroupByQueryBrokerResource prepareResource(GroupByQuery query, boolean wi + mergeBufferPool.maxSize() + " merge buffers are configured" ); } else if (requiredMergeBufferNum == 0) { - return new GroupByQueryBrokerResource(); + return new GroupByQueryResource(); } else { final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT); final ResourceHolder> mergeBufferHolders; @@ -149,9 +145,9 @@ public GroupByQueryBrokerResource prepareResource(GroupByQuery query, boolean wi mergeBufferHolders = mergeBufferPool.drain(requiredMergeBufferNum, timeout.longValue()); if (mergeBufferHolders.get().size() < requiredMergeBufferNum) { mergeBufferHolders.close(); - throw new TimeoutException("Cannot acquire enough merge buffers"); + throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); } else { - return new GroupByQueryBrokerResource(mergeBufferHolders); + return new GroupByQueryResource(mergeBufferHolders); } } catch (Exception e) { @@ -159,17 +155,24 @@ public GroupByQueryBrokerResource prepareResource(GroupByQuery query, boolean wi } } } else { - return new GroupByQueryBrokerResource(); + return new GroupByQueryResource(); } } - private static int countRequiredMergeBufferNum(Query query, int maxBufferNum, int foundNum) + private static int countRequiredMergeBufferNum(Query query, int foundNum) { + // Note: A broker requires merge buffers for processing the groupBy layers beyond the inner-most one. + // For example, the number of required merge buffers for a nested groupBy (groupBy -> groupBy -> table) is 1. + // If the broker processes an outer groupBy which reads input from an inner groupBy, + // it requires two merge buffers for inner and outer groupBys to keep the intermediate result of inner groupBy + // until the outer groupBy processing completes. + // This is same for subsequent groupBy layers, and thus the maximum number of required merge buffers becomes 2. + final DataSource dataSource = query.getDataSource(); - if (foundNum == maxBufferNum + 1 || !(dataSource instanceof QueryDataSource)) { + if (foundNum == MAX_MERGE_BUFFER_NUM + 1 || !(dataSource instanceof QueryDataSource)) { return foundNum - 1; } else { - return countRequiredMergeBufferNum(((QueryDataSource) dataSource).getQuery(), maxBufferNum, foundNum + 1); + return countRequiredMergeBufferNum(((QueryDataSource) dataSource).getQuery(), foundNum + 1); } } @@ -271,7 +274,7 @@ public Row apply(final Row row) public Sequence processSubqueryResult( GroupByQuery subquery, GroupByQuery query, - GroupByQueryBrokerResource resource, + GroupByQueryResource resource, Sequence subqueryResult ) { From 933e94faefa484c0662995eaafbdedc93bd76688 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 22 Feb 2017 15:11:01 +0900 Subject: [PATCH 8/9] addressed comments --- .../io/druid/collections/BlockingPool.java | 12 +++++---- .../query/InsufficientResourcesException.java | 2 +- .../epinephelinae/GroupByRowProcessor.java | 2 ++ .../groupby/strategy/GroupByStrategyV2.java | 25 ++++++++----------- 4 files changed, 20 insertions(+), 21 deletions(-) diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index 0ff1571a97b9..27caa7d12881 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -81,7 +81,11 @@ public ReferenceCountingResourceHolder take(final long timeout) checkInitialized(); final T theObject; try { - theObject = timeout >= 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.take(); + if (timeout > -1) { + theObject = timeout > 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.poll(); + } else { + theObject = objects.take(); + } return theObject == null ? null : new ReferenceCountingResourceHolder<>( theObject, new Closeable() @@ -113,13 +117,11 @@ public ReferenceCountingResourceHolder> drain(final int maxElements, fin final List batch = Lists.newArrayListWithCapacity(maxElements); try { - final int n = timeout >= 0 ? + final int n = timeout > 0 ? Queues.drain(objects, batch, maxElements, timeout, TimeUnit.MILLISECONDS) : objects.drainTo(batch, maxElements); if (n < maxElements) { - if (log.isDebugEnabled()) { - log.debug("Requested " + maxElements + " elements, but drained " + n + " elements"); - } + log.debug("Requested %d elements, but drained %d elements", maxElements, n); } } catch (InterruptedException e) { diff --git a/processing/src/main/java/io/druid/query/InsufficientResourcesException.java b/processing/src/main/java/io/druid/query/InsufficientResourcesException.java index 993379c2f882..f431428d1d65 100644 --- a/processing/src/main/java/io/druid/query/InsufficientResourcesException.java +++ b/processing/src/main/java/io/druid/query/InsufficientResourcesException.java @@ -22,7 +22,7 @@ /** * This exception is thrown when the requested operation cannot be completed due to a lack of available resources. */ -public class InsufficientResourcesException extends Exception +public class InsufficientResourcesException extends RuntimeException { public InsufficientResourcesException(String message) { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 37e0b810615c..4e7148c87b7e 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -127,6 +127,8 @@ public boolean apply(Row input) @Override public CloseableGrouperIterator make() { + // This contains all closeable objects which are closed when the returned iterator iterates all the elements, + // or an exceptions is thrown. The objects are closed in their reverse order. final List closeOnExit = Lists.newArrayList(); try { diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index d69047898da4..ba0f353f51ad 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -44,14 +44,13 @@ import io.druid.java.util.common.guava.nary.BinaryFn; import io.druid.query.DataSource; import io.druid.query.DruidProcessingConfig; +import io.druid.query.InsufficientResourcesException; import io.druid.query.Query; import io.druid.query.QueryContextKeys; import io.druid.query.QueryDataSource; -import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; import io.druid.query.QueryWatcher; import io.druid.query.ResourceLimitExceededException; -import io.druid.query.InsufficientResourcesException; import io.druid.query.ResultMergeQueryRunner; import io.druid.query.aggregation.PostAggregator; import io.druid.query.groupby.GroupByQuery; @@ -74,6 +73,7 @@ public class GroupByStrategyV2 implements GroupByStrategy public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp"; public static final String CTX_KEY_OUTERMOST = "groupByOutermost"; + // see countRequiredMergeBufferNum() for explanation private static final int MAX_MERGE_BUFFER_NUM = 2; private final DruidProcessingConfig processingConfig; @@ -139,19 +139,14 @@ public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMerg return new GroupByQueryResource(); } else { final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT); - final ResourceHolder> mergeBufferHolders; - - try { - mergeBufferHolders = mergeBufferPool.drain(requiredMergeBufferNum, timeout.longValue()); - if (mergeBufferHolders.get().size() < requiredMergeBufferNum) { - mergeBufferHolders.close(); - throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); - } else { - return new GroupByQueryResource(mergeBufferHolders); - } - } - catch (Exception e) { - throw new QueryInterruptedException(e); + final ResourceHolder> mergeBufferHolders = mergeBufferPool.drain( + requiredMergeBufferNum, timeout.longValue() + ); + if (mergeBufferHolders.get().size() < requiredMergeBufferNum) { + mergeBufferHolders.close(); + throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); + } else { + return new GroupByQueryResource(mergeBufferHolders); } } } else { From 62e2f5848507c8ac1608a89c622799ac16a1c6e1 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 22 Feb 2017 19:37:15 +0900 Subject: [PATCH 9/9] Add takeBatch() to BlockingPool --- .../io/druid/collections/BlockingPool.java | 206 +++++++++--- .../druid/collections/BlockingPoolTest.java | 293 +++++++++++++++++- .../groupby/strategy/GroupByStrategyV2.java | 5 +- .../groupby/GroupByQueryMergeBufferTest.java | 4 +- .../GroupByQueryRunnerFailureTest.java | 108 ++++++- 5 files changed, 555 insertions(+), 61 deletions(-) diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index 27caa7d12881..9a883da35cfb 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -23,26 +23,27 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.collect.Queues; -import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.ISE; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayDeque; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations. */ public class BlockingPool { - private static final Logger log = new Logger(BlockingPool.class); + private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS; - private final BlockingQueue objects; + private final ArrayDeque objects; + private final ReentrantLock lock; + private final Condition notEnough; private final int maxSize; public BlockingPool( @@ -50,12 +51,15 @@ public BlockingPool( int limit ) { - this.objects = limit > 0 ? new ArrayBlockingQueue(limit) : null; + this.objects = new ArrayDeque<>(limit); this.maxSize = limit; for (int i = 0; i < limit; i++) { objects.add(generator.get()); } + + this.lock = new ReentrantLock(); + this.notEnough = lock.newCondition(); } public int maxSize() @@ -82,9 +86,9 @@ public ReferenceCountingResourceHolder take(final long timeout) final T theObject; try { if (timeout > -1) { - theObject = timeout > 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.poll(); + theObject = timeout > 0 ? poll(timeout) : poll(); } else { - theObject = objects.take(); + theObject = take(); } return theObject == null ? null : new ReferenceCountingResourceHolder<>( theObject, @@ -103,59 +107,179 @@ public void close() throws IOException } } + private T poll() + { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return objects.isEmpty() ? null : objects.pop(); + } finally { + lock.unlock(); + } + } + + private T poll(long timeout) throws InterruptedException + { + long nanos = TIME_UNIT.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (objects.isEmpty()) { + if (nanos <= 0) { + return null; + } + nanos = notEnough.awaitNanos(nanos); + } + return objects.pop(); + } finally { + lock.unlock(); + } + } + + private T take() throws InterruptedException + { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (objects.isEmpty()) { + notEnough.await(); + } + return objects.pop(); + } finally { + lock.unlock(); + } + } + /** - * Drains at most the given number of available resources from the pool. + * Take a resource from the pool. * - * @param maxElements the maximum number of elements to drain - * @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout. + * @param elementNum number of resources to take + * @param timeout maximum time to wait for resources, in milliseconds. Negative means do not use a timeout. * - * @return a resource holder which contains the drained resources + * @return a resource, or null if the timeout was reached */ - public ReferenceCountingResourceHolder> drain(final int maxElements, final long timeout) + public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeout) { checkInitialized(); - final List batch = Lists.newArrayListWithCapacity(maxElements); - + final List objects; try { - final int n = timeout > 0 ? - Queues.drain(objects, batch, maxElements, timeout, TimeUnit.MILLISECONDS) : - objects.drainTo(batch, maxElements); - if (n < maxElements) { - log.debug("Requested %d elements, but drained %d elements", maxElements, n); + if (timeout > -1) { + objects = timeout > 0 ? pollBatch(elementNum, timeout) : pollBatch(elementNum); + } else { + objects = takeBatch(elementNum); } + return objects == null ? null : new ReferenceCountingResourceHolder<>( + objects, + new Closeable() + { + @Override + public void close() throws IOException + { + offerBatch(objects); + } + } + ); } catch (InterruptedException e) { - for (T obj : batch) { - offer(obj); - } throw Throwables.propagate(e); } + } + + private List pollBatch(int elementNum) throws InterruptedException + { + final List list = Lists.newArrayListWithCapacity(elementNum); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + if (objects.size() < elementNum) { + return null; + } else { + for (int i = 0; i < elementNum; i++) { + list.add(objects.pop()); + } + return list; + } + } finally { + lock.unlock(); + } + } - final List resources = ImmutableList.copyOf(batch); - return new ReferenceCountingResourceHolder<>( - resources, - new Closeable() - { - @Override - public void close() throws IOException - { - for (T obj : resources) { - offer(obj); - } - } + private List pollBatch(int elementNum, long timeout) throws InterruptedException + { + long nanos = TIME_UNIT.toNanos(timeout); + final List list = Lists.newArrayListWithCapacity(elementNum); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (objects.size() < elementNum) { + if (nanos <= 0) { + return null; } - ); + nanos = notEnough.awaitNanos(nanos); + } + for (int i = 0; i < elementNum; i++) { + list.add(objects.pop()); + } + return list; + } finally { + lock.unlock(); + } + } + + private List takeBatch(int elementNum) throws InterruptedException + { + final List list = Lists.newArrayListWithCapacity(elementNum); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (objects.size() < elementNum) { + notEnough.await(); + } + for (int i = 0; i < elementNum; i++) { + list.add(objects.pop()); + } + return list; + } finally { + lock.unlock(); + } } private void checkInitialized() { - Preconditions.checkState(objects != null, "Pool was initialized with limit = 0, there are no objects to take."); + Preconditions.checkState(maxSize > 0, "Pool was initialized with limit = 0, there are no objects to take."); } private void offer(T theObject) { - if (!objects.offer(theObject)) { - log.error("WTF?! Queue offer failed, uh oh..."); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (objects.size() < maxSize) { + objects.push(theObject); + notEnough.signal(); + } else { + throw new ISE("Cannot exceed pre-configured maximum size"); + } + } finally { + lock.unlock(); + } + } + + private void offerBatch(List offers) + { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (objects.size() + offers.size() <= maxSize) { + for (T offer : offers) { + objects.push(offer); + } + notEnough.signal(); + } else { + throw new ISE("Cannot exceed pre-configured maximum size"); + } + } finally { + lock.unlock(); } } } diff --git a/common/src/test/java/io/druid/collections/BlockingPoolTest.java b/common/src/test/java/io/druid/collections/BlockingPoolTest.java index 039b087200e6..73a3d86c3075 100644 --- a/common/src/test/java/io/druid/collections/BlockingPoolTest.java +++ b/common/src/test/java/io/druid/collections/BlockingPoolTest.java @@ -20,21 +20,37 @@ package io.druid.collections; import com.google.common.base.Suppliers; +import com.google.common.collect.Lists; +import org.junit.AfterClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class BlockingPoolTest { + private static final ExecutorService SERVICE = Executors.newFixedThreadPool(2); + private static final BlockingPool POOL = new BlockingPool<>(Suppliers.ofInstance(1), 10); private static final BlockingPool EMPTY_POOL = new BlockingPool<>(Suppliers.ofInstance(1), 0); + @AfterClass + public static void teardown() + { + SERVICE.shutdown(); + } + @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -51,7 +67,7 @@ public void testDrainFromEmptyPool() { expectedException.expect(IllegalStateException.class); expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take."); - EMPTY_POOL.drain(1, 0); + EMPTY_POOL.takeBatch(1, 0); } @Test(timeout = 1000) @@ -67,16 +83,16 @@ public void testTake() @Test(timeout = 1000) public void testTakeTimeout() { - final ReferenceCountingResourceHolder> batchHolder = POOL.drain(10, 100L); + final ReferenceCountingResourceHolder> batchHolder = POOL.takeBatch(10, 100L); final ReferenceCountingResourceHolder holder = POOL.take(100); assertNull(holder); batchHolder.close(); } @Test(timeout = 1000) - public void testDrain() + public void testTakeBatch() { - final ReferenceCountingResourceHolder> holder = POOL.drain(6, 100L); + final ReferenceCountingResourceHolder> holder = POOL.takeBatch(6, 100L); assertNotNull(holder); assertEquals(6, holder.get().size()); assertEquals(4, POOL.getPoolSize()); @@ -85,13 +101,272 @@ public void testDrain() } @Test(timeout = 1000) - public void testDrainTooManyObjects() + public void testWaitAndTakeBatch() throws InterruptedException, ExecutionException { - final ReferenceCountingResourceHolder> holder = POOL.drain(100, 100L); - assertNotNull(holder); - assertEquals(10, holder.get().size()); + ReferenceCountingResourceHolder> batchHolder = POOL.takeBatch(10, 10); + assertNotNull(batchHolder); + assertEquals(10, batchHolder.get().size()); assertEquals(0, POOL.getPoolSize()); - holder.close(); + + final Future>> future = SERVICE.submit( + new Callable>>() + { + @Override + public ReferenceCountingResourceHolder> call() throws Exception + { + return POOL.takeBatch(8, 100); + } + } + ); + Thread.sleep(20); + batchHolder.close(); + + batchHolder = future.get(); + assertNotNull(batchHolder); + assertEquals(8, batchHolder.get().size()); + assertEquals(2, POOL.getPoolSize()); + + batchHolder.close(); assertEquals(10, POOL.getPoolSize()); } + + @Test(timeout = 1000) + public void testTakeBatchTooManyObjects() + { + final ReferenceCountingResourceHolder> holder = POOL.takeBatch(100, 100L); + assertNull(holder); + } + + @Test(timeout = 1000) + public void testConcurrentTake() throws ExecutionException, InterruptedException + { + final int limit1 = POOL.maxSize() / 2; + final int limit2 = POOL.maxSize() - limit1 + 1; + + final Future>> f1 = SERVICE.submit( + new Callable>>() + { + @Override + public List> call() throws Exception + { + List> result = Lists.newArrayList(); + for (int i = 0; i < limit1; i++) { + result.add(POOL.take(10)); + } + return result; + } + } + ); + final Future>> f2 = SERVICE.submit( + new Callable>>() + { + @Override + public List> call() throws Exception + { + List> result = Lists.newArrayList(); + for (int i = 0; i < limit2; i++) { + result.add(POOL.take(10)); + } + return result; + } + } + ); + + final List> r1 = f1.get(); + final List> r2 = f2.get(); + + assertEquals(0, POOL.getPoolSize()); + assertTrue(r1.contains(null) || r2.contains(null)); + + int nonNullCount = 0; + for (ReferenceCountingResourceHolder holder : r1) { + if (holder != null) { + nonNullCount++; + } + } + + for (ReferenceCountingResourceHolder holder : r2) { + if (holder != null) { + nonNullCount++; + } + } + assertEquals(POOL.maxSize(), nonNullCount); + + final Future future1 = SERVICE.submit(new Runnable() + { + @Override + public void run() + { + for (ReferenceCountingResourceHolder holder : r1) { + if (holder != null) { + holder.close(); + } + } + } + }); + final Future future2 = SERVICE.submit(new Runnable() + { + @Override + public void run() + { + for (ReferenceCountingResourceHolder holder : r2) { + if (holder != null) { + holder.close(); + } + } + } + }); + + future1.get(); + future2.get(); + + assertEquals(POOL.maxSize(), POOL.getPoolSize()); + } + + @Test(timeout = 1000) + public void testConcurrentTakeBatch() throws ExecutionException, InterruptedException + { + final int batch1 = POOL.maxSize() / 2; + final Callable>> c1 = + new Callable>>() + { + @Override + public ReferenceCountingResourceHolder> call() throws Exception + { + return POOL.takeBatch(batch1, 10); + } + }; + + final int batch2 = POOL.maxSize() - batch1 + 1; + final Callable>> c2 = + new Callable>>() + { + @Override + public ReferenceCountingResourceHolder> call() throws Exception + { + return POOL.takeBatch(batch2, 10); + } + }; + + final Future>> f1 = SERVICE.submit(c1); + final Future>> f2 = SERVICE.submit(c2); + + final ReferenceCountingResourceHolder> r1 = f1.get(); + final ReferenceCountingResourceHolder> r2 = f2.get(); + + if (r1 != null) { + assertNull(r2); + assertEquals(POOL.maxSize() - batch1, POOL.getPoolSize()); + assertEquals(batch1, r1.get().size()); + r1.close(); + } else { + assertNotNull(r2); + assertEquals(POOL.maxSize() - batch2, POOL.getPoolSize()); + assertEquals(batch2, r2.get().size()); + r2.close(); + } + + assertEquals(POOL.maxSize(), POOL.getPoolSize()); + } + + @Test(timeout = 1000) + public void testConcurrentBatchClose() throws ExecutionException, InterruptedException + { + final int batch1 = POOL.maxSize() / 2; + final Callable>> c1 = + new Callable>>() + { + @Override + public ReferenceCountingResourceHolder> call() throws Exception + { + return POOL.takeBatch(batch1, 10); + } + }; + + final int batch2 = POOL.maxSize() - batch1; + final Callable>> c2 = + new Callable>>() + { + @Override + public ReferenceCountingResourceHolder> call() throws Exception + { + return POOL.takeBatch(batch2, 10); + } + }; + + final Future>> f1 = SERVICE.submit(c1); + final Future>> f2 = SERVICE.submit(c2); + + final ReferenceCountingResourceHolder> r1 = f1.get(); + final ReferenceCountingResourceHolder> r2 = f2.get(); + + assertNotNull(r1); + assertNotNull(r2); + assertEquals(batch1, r1.get().size()); + assertEquals(batch2, r2.get().size()); + assertEquals(0, POOL.getPoolSize()); + + final Future future1 = SERVICE.submit(new Runnable() + { + @Override + public void run() + { + r1.close(); + } + }); + final Future future2 = SERVICE.submit(new Runnable() + { + @Override + public void run() + { + r2.close(); + } + }); + + future1.get(); + future2.get(); + + assertEquals(POOL.maxSize(), POOL.getPoolSize()); + } + + @Test(timeout = 1000) + public void testConcurrentTakeBatchClose() throws ExecutionException, InterruptedException + { + final ReferenceCountingResourceHolder> r1 = POOL.takeBatch(1, 10); + + final Callable>> c2 = + new Callable>>() + { + @Override + public ReferenceCountingResourceHolder> call() throws Exception + { + return POOL.takeBatch(10, 100); + } + }; + + final Future>> f2 = SERVICE.submit(c2); + final Future f1 = SERVICE.submit(new Runnable() + { + @Override + public void run() + { + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + // ignore + } + r1.close(); + } + }); + + final ReferenceCountingResourceHolder> r2 = f2.get(); + f1.get(); + assertNotNull(r2); + assertEquals(10, r2.get().size()); + assertEquals(0, POOL.getPoolSize()); + + r2.close(); + assertEquals(POOL.maxSize(), POOL.getPoolSize()); + } } diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index ba0f353f51ad..c36bb6e57cab 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -139,11 +139,10 @@ public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMerg return new GroupByQueryResource(); } else { final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, JodaUtils.MAX_INSTANT); - final ResourceHolder> mergeBufferHolders = mergeBufferPool.drain( + final ResourceHolder> mergeBufferHolders = mergeBufferPool.takeBatch( requiredMergeBufferNum, timeout.longValue() ); - if (mergeBufferHolders.get().size() < requiredMergeBufferNum) { - mergeBufferHolders.close(); + if (mergeBufferHolders == null) { throw new InsufficientResourcesException("Cannot acquire enough merge buffers"); } else { return new GroupByQueryResource(mergeBufferHolders); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index 96bd05b135fc..2cfe10349306 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -82,9 +82,9 @@ public ReferenceCountingResourceHolder take(final long timeout) } @Override - public ReferenceCountingResourceHolder> drain(final int maxElements, final long timeout) + public ReferenceCountingResourceHolder> takeBatch(final int maxElements, final long timeout) { - final ReferenceCountingResourceHolder> holder = super.drain(maxElements, timeout); + final ReferenceCountingResourceHolder> holder = super.takeBatch(maxElements, timeout); final int poolSize = getPoolSize(); if (minRemainBufferNum > poolSize) { minRemainBufferNum = poolSize; diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java index e7764d1ee622..fee40bf28c63 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -19,13 +19,20 @@ package io.druid.query.groupby; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; +import io.druid.collections.BlockingPool; +import io.druid.collections.ReferenceCountingResourceHolder; +import io.druid.collections.StupidPool; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularities; import io.druid.query.DruidProcessingConfig; +import io.druid.query.InsufficientResourcesException; import io.druid.query.QueryContextKeys; import io.druid.query.QueryDataSource; import io.druid.query.QueryInterruptedException; @@ -36,6 +43,9 @@ import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; +import io.druid.query.groupby.strategy.GroupByStrategySelector; +import io.druid.query.groupby.strategy.GroupByStrategyV1; +import io.druid.query.groupby.strategy.GroupByStrategyV2; import org.bouncycastle.util.Integers; import org.hamcrest.CoreMatchers; import org.junit.Rule; @@ -46,6 +56,7 @@ import org.junit.runners.Parameterized.Parameters; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.concurrent.TimeoutException; @@ -53,7 +64,7 @@ @RunWith(Parameterized.class) public class GroupByQueryRunnerFailureTest { - public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig() + private static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig() { @Override @@ -84,7 +95,63 @@ public int getNumThreads() @Rule public ExpectedException expectedException = ExpectedException.none(); - private static final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory( + private static GroupByQueryRunnerFactory makeQueryRunnerFactory( + final ObjectMapper mapper, + final GroupByQueryConfig config + ) + { + final Supplier configSupplier = Suppliers.ofInstance(config); + final StupidPool bufferPool = new StupidPool<>( + "GroupByQueryEngine-bufferPool", + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocateDirect(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes()); + } + } + ); + final GroupByStrategySelector strategySelector = new GroupByStrategySelector( + configSupplier, + new GroupByStrategyV1( + configSupplier, + new GroupByQueryEngine(configSupplier, bufferPool), + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + bufferPool + ), + new GroupByStrategyV2( + DEFAULT_PROCESSING_CONFIG, + configSupplier, + bufferPool, + mergeBufferPool, + mapper, + QueryRunnerTestHelper.NOOP_QUERYWATCHER + ) + ); + final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( + strategySelector, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + return new GroupByQueryRunnerFactory( + strategySelector, + toolChest + ); + } + + private final static BlockingPool mergeBufferPool = new BlockingPool<>( + new Supplier() + { + @Override + public ByteBuffer get () + { + return ByteBuffer.allocateDirect(DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes()); + } + }, + DEFAULT_PROCESSING_CONFIG.getNumMergeBuffers() + ); + + private static final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory( GroupByQueryRunnerTest.DEFAULT_MAPPER, new GroupByQueryConfig() { @@ -92,8 +159,7 @@ public String getDefaultStrategy() { return "v2"; } - }, - DEFAULT_PROCESSING_CONFIG + } ); private QueryRunner runner; @@ -110,7 +176,7 @@ public static Collection constructorFeeder() throws IOException public GroupByQueryRunnerFailureTest(QueryRunner runner) { - this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.>of(runner)); + this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of(runner)); } @Test(timeout = 10000) @@ -142,7 +208,7 @@ public void testNotEnoughMergeBuffersOnQueryable() throws IOException } @Test(timeout = 10000) - public void testNotEnoughMergeBuffersOnBroker() + public void testResourceLimitExceededOnBroker() { expectedException.expect(ResourceLimitExceededException.class); @@ -178,4 +244,34 @@ public void testNotEnoughMergeBuffersOnBroker() GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); } + + @Test(timeout = 10000, expected = InsufficientResourcesException.class) + public void testInsufficientResourcesOnBroker() throws IOException + { + final ReferenceCountingResourceHolder> holder = mergeBufferPool.takeBatch(1, 10); + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource( + new QueryDataSource( + GroupByQuery.builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval(QueryRunnerTestHelper.firstToThird) + .setGranularity(QueryGranularities.ALL) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs(Lists.newArrayList(QueryRunnerTestHelper.rowsCount)) + .build() + ) + ) + .setGranularity(QueryGranularities.ALL) + .setInterval(QueryRunnerTestHelper.firstToThird) + .setAggregatorSpecs(Lists.newArrayList(new LongSumAggregatorFactory("rows", "rows"))) + .setContext(ImmutableMap.of(QueryContextKeys.TIMEOUT, Integers.valueOf(500))) + .build(); + + try { + GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + } finally { + holder.close(); + } + } }