From 6b3f9d9eed8f78a577a9e916610180094d8ccb2a Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Wed, 28 Jun 2017 00:58:01 -0500 Subject: [PATCH] More fine-grained DI for management node types. Don't allocate processing resources on Router (#4429) * Remove DruidProcessingModule, QueryableModule and QueryRunnerFactoryModule from DI for coordinator, overlord, middle-manager. Add RouterDruidProcessing not to allocate processing resources on router * Fix examples * Fixes * Revert Peon configs and add comments * Remove qualifier --- .../GroupByTypeInterfaceBenchmark.java | 6 +- .../StupidPoolConcurrencyBenchmark.java | 3 +- .../benchmark/query/GroupByBenchmark.java | 6 +- .../io/druid/collections/BlockingPool.java | 269 +--------------- .../collections/DefaultBlockingPool.java | 297 ++++++++++++++++++ .../druid/collections/DummyBlockingPool.java | 70 +++++ .../collections/DummyNonBlockingPool.java | 43 +++ .../io/druid/collections/NonBlockingPool.java | 25 ++ .../java/io/druid/collections/StupidPool.java | 5 +- .../concurrent/DummyExecutorService.java | 123 ++++++++ .../main/java/io/druid/concurrent/Execs.java | 7 + .../druid/collections/BlockingPoolTest.java | 4 +- docs/content/development/router.md | 1 - .../druid/middleManager/runtime.properties | 6 +- .../druid/middleManager/runtime.properties | 6 +- integration-tests/docker/router.conf | 2 - .../concurrent/ExecutorServiceConfig.java | 22 +- .../io/druid/query/DruidProcessingConfig.java | 28 +- .../druid/query/GroupByMergedQueryRunner.java | 6 +- .../query/groupby/GroupByQueryEngine.java | 6 +- .../query/groupby/GroupByQueryHelper.java | 6 +- .../epinephelinae/GroupByQueryEngineV2.java | 4 +- .../groupby/strategy/GroupByStrategyV1.java | 6 +- .../groupby/strategy/GroupByStrategyV2.java | 6 +- .../AggregateTopNMetricFirstAlgorithm.java | 6 +- .../druid/query/topn/PooledTopNAlgorithm.java | 6 +- .../io/druid/query/topn/TopNQueryEngine.java | 6 +- .../query/topn/TopNQueryRunnerFactory.java | 6 +- .../io/druid/segment/CompressedPools.java | 9 +- .../segment/incremental/IncrementalIndex.java | 4 +- .../incremental/OffheapIncrementalIndex.java | 6 +- .../java/io/druid/query/TestQueryRunners.java | 5 +- .../groupby/GroupByQueryMergeBufferTest.java | 7 +- .../GroupByQueryRunnerFailureTest.java | 6 +- .../query/groupby/GroupByQueryRunnerTest.java | 6 +- .../guice/DruidProcessingConfigModule.java | 35 +++ .../io/druid/guice/DruidProcessingModule.java | 9 +- .../java/io/druid/guice/QueryableModule.java | 9 +- .../druid/guice/RouterProcessingModule.java | 107 +++++++ .../io/druid/guice/StorageNodeModule.java | 6 - .../druid/initialization/Initialization.java | 8 +- .../log/LoggingRequestLoggerProviderTest.java | 5 +- .../src/main/java/io/druid/cli/CliBroker.java | 6 + .../main/java/io/druid/cli/CliHistorical.java | 6 + .../src/main/java/io/druid/cli/CliPeon.java | 11 +- .../main/java/io/druid/cli/CliRealtime.java | 7 +- .../java/io/druid/cli/CliRealtimeExample.java | 6 + .../src/main/java/io/druid/cli/CliRouter.java | 6 + .../main/java/io/druid/cli/CreateTables.java | 10 +- .../main/java/io/druid/cli/DumpSegment.java | 6 + .../main/java/io/druid/cli/InsertSegment.java | 9 + .../main/java/io/druid/cli/ResetCluster.java | 9 + .../java/io/druid/cli/ValidateSegments.java | 9 + .../cli/validate/DruidJsonValidator.java | 10 +- 54 files changed, 935 insertions(+), 363 deletions(-) create mode 100644 common/src/main/java/io/druid/collections/DefaultBlockingPool.java create mode 100644 common/src/main/java/io/druid/collections/DummyBlockingPool.java create mode 100644 common/src/main/java/io/druid/collections/DummyNonBlockingPool.java create mode 100644 common/src/main/java/io/druid/collections/NonBlockingPool.java create mode 100644 common/src/main/java/io/druid/concurrent/DummyExecutorService.java create mode 100644 server/src/main/java/io/druid/guice/DruidProcessingConfigModule.java create mode 100644 server/src/main/java/io/druid/guice/RouterProcessingModule.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java index 8cfc191aa28a..2cc3ba6c7365 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -32,6 +32,8 @@ import io.druid.benchmark.datagen.BenchmarkSchemas; import io.druid.benchmark.query.QueryBenchmarkUtil; import io.druid.collections.BlockingPool; +import io.druid.collections.DefaultBlockingPool; +import io.druid.collections.NonBlockingPool; import io.druid.collections.StupidPool; import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; @@ -348,7 +350,7 @@ public void setup() throws IOException } } - StupidPool bufferPool = new StupidPool<>( + NonBlockingPool bufferPool = new StupidPool<>( "GroupByBenchmark-computeBufferPool", new OffheapBufferGenerator("compute", 250_000_000), 0, @@ -356,7 +358,7 @@ public void setup() throws IOException ); // limit of 2 is required since we simulate both historical merge and broker merge in the same process - BlockingPool mergePool = new BlockingPool<>( + BlockingPool mergePool = new DefaultBlockingPool<>( new OffheapBufferGenerator("merge", 250_000_000), 2 ); diff --git a/benchmarks/src/main/java/io/druid/benchmark/StupidPoolConcurrencyBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/StupidPoolConcurrencyBenchmark.java index 43ae737495cd..887baf432687 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/StupidPoolConcurrencyBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/StupidPoolConcurrencyBenchmark.java @@ -21,6 +21,7 @@ import com.google.common.base.Supplier; +import io.druid.collections.NonBlockingPool; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidPool; import io.druid.java.util.common.logger.Logger; @@ -64,7 +65,7 @@ public void teardown() public static class BenchmarkPool { private final AtomicLong numPools = new AtomicLong(0L); - private final StupidPool pool = new StupidPool<>( + private final NonBlockingPool pool = new StupidPool<>( "simpleObject pool", new Supplier() { diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index 6e028b99642b..09f8a1fd2f88 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -32,6 +32,8 @@ import io.druid.benchmark.datagen.BenchmarkSchemaInfo; import io.druid.benchmark.datagen.BenchmarkSchemas; import io.druid.collections.BlockingPool; +import io.druid.collections.DefaultBlockingPool; +import io.druid.collections.NonBlockingPool; import io.druid.collections.StupidPool; import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; @@ -392,7 +394,7 @@ public void setup() throws IOException } } - StupidPool bufferPool = new StupidPool<>( + NonBlockingPool bufferPool = new StupidPool<>( "GroupByBenchmark-computeBufferPool", new OffheapBufferGenerator("compute", 250_000_000), 0, @@ -400,7 +402,7 @@ public void setup() throws IOException ); // limit of 2 is required since we simulate both historical merge and broker merge in the same process - BlockingPool mergePool = new BlockingPool<>( + BlockingPool mergePool = new DefaultBlockingPool<>( new OffheapBufferGenerator("merge", 250_000_000), 2 ); diff --git a/common/src/main/java/io/druid/collections/BlockingPool.java b/common/src/main/java/io/druid/collections/BlockingPool.java index af48f53cf098..fdcf7f2356d4 100644 --- a/common/src/main/java/io/druid/collections/BlockingPool.java +++ b/common/src/main/java/io/druid/collections/BlockingPool.java @@ -19,59 +19,11 @@ package io.druid.collections; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import io.druid.java.util.common.ISE; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayDeque; import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; -/** - * Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations. - */ -public class BlockingPool +public interface BlockingPool { - private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS; - - private final ArrayDeque objects; - private final ReentrantLock lock; - private final Condition notEnough; - private final int maxSize; - - public BlockingPool( - Supplier generator, - int limit - ) - { - this.objects = new ArrayDeque<>(limit); - this.maxSize = limit; - - for (int i = 0; i < limit; i++) { - objects.add(generator.get()); - } - - this.lock = new ReentrantLock(); - this.notEnough = lock.newCondition(); - } - - public int maxSize() - { - return maxSize; - } - - @VisibleForTesting - public int getPoolSize() - { - return objects.size(); - } + int maxSize(); /** * Take a resource from the pool, waiting up to the @@ -81,91 +33,14 @@ public int getPoolSize() * * @return a resource, or null if the timeout was reached */ - public ReferenceCountingResourceHolder take(final long timeoutMs) - { - Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); - checkInitialized(); - try { - return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject()); - } - catch (InterruptedException e) { - throw Throwables.propagate(e); - } - } + ReferenceCountingResourceHolder take(long timeoutMs); /** * Take a resource from the pool, waiting if necessary until an element becomes available. * * @return a resource */ - public ReferenceCountingResourceHolder take() - { - checkInitialized(); - try { - return wrapObject(takeObject()); - } - catch (InterruptedException e) { - throw Throwables.propagate(e); - } - } - - private ReferenceCountingResourceHolder wrapObject(T theObject) - { - return theObject == null ? null : new ReferenceCountingResourceHolder<>( - theObject, - new Closeable() - { - @Override - public void close() throws IOException - { - offer(theObject); - } - } - ); - } - - private T pollObject() - { - final ReentrantLock lock = this.lock; - lock.lock(); - try { - return objects.isEmpty() ? null : objects.pop(); - } finally { - lock.unlock(); - } - } - - private T pollObject(long timeoutMs) throws InterruptedException - { - long nanos = TIME_UNIT.toNanos(timeoutMs); - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try { - while (objects.isEmpty()) { - if (nanos <= 0) { - return null; - } - nanos = notEnough.awaitNanos(nanos); - } - return objects.pop(); - } finally { - lock.unlock(); - } - } - - private T takeObject() throws InterruptedException - { - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try { - while (objects.isEmpty()) { - notEnough.await(); - } - return objects.pop(); - } finally { - lock.unlock(); - } - } + ReferenceCountingResourceHolder take(); /** * Take resources from the pool, waiting up to the @@ -176,17 +51,7 @@ private T takeObject() throws InterruptedException * * @return a resource, or null if the timeout was reached */ - public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeoutMs) - { - Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); - checkInitialized(); - try { - return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum)); - } - catch (InterruptedException e) { - throw Throwables.propagate(e); - } - } + ReferenceCountingResourceHolder> takeBatch(int elementNum, long timeoutMs); /** * Take resources from the pool, waiting if necessary until the elements of the given number become available. @@ -195,127 +60,5 @@ public ReferenceCountingResourceHolder> takeBatch(final int elementNum, * * @return a resource */ - public ReferenceCountingResourceHolder> takeBatch(final int elementNum) - { - checkInitialized(); - try { - return wrapObjects(takeObjects(elementNum)); - } - catch (InterruptedException e) { - throw Throwables.propagate(e); - } - } - - private ReferenceCountingResourceHolder> wrapObjects(List theObjects) - { - return theObjects == null ? null : new ReferenceCountingResourceHolder<>( - theObjects, - new Closeable() - { - @Override - public void close() throws IOException - { - offerBatch(theObjects); - } - } - ); - } - - private List pollObjects(int elementNum) throws InterruptedException - { - final List list = Lists.newArrayListWithCapacity(elementNum); - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try { - if (objects.size() < elementNum) { - return null; - } else { - for (int i = 0; i < elementNum; i++) { - list.add(objects.pop()); - } - return list; - } - } finally { - lock.unlock(); - } - } - - private List pollObjects(int elementNum, long timeoutMs) throws InterruptedException - { - long nanos = TIME_UNIT.toNanos(timeoutMs); - final List list = Lists.newArrayListWithCapacity(elementNum); - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try { - while (objects.size() < elementNum) { - if (nanos <= 0) { - return null; - } - nanos = notEnough.awaitNanos(nanos); - } - for (int i = 0; i < elementNum; i++) { - list.add(objects.pop()); - } - return list; - } finally { - lock.unlock(); - } - } - - private List takeObjects(int elementNum) throws InterruptedException - { - final List list = Lists.newArrayListWithCapacity(elementNum); - final ReentrantLock lock = this.lock; - lock.lockInterruptibly(); - try { - while (objects.size() < elementNum) { - notEnough.await(); - } - for (int i = 0; i < elementNum; i++) { - list.add(objects.pop()); - } - return list; - } finally { - lock.unlock(); - } - } - - private void checkInitialized() - { - Preconditions.checkState(maxSize > 0, "Pool was initialized with limit = 0, there are no objects to take."); - } - - private void offer(T theObject) - { - final ReentrantLock lock = this.lock; - lock.lock(); - try { - if (objects.size() < maxSize) { - objects.push(theObject); - notEnough.signal(); - } else { - throw new ISE("Cannot exceed pre-configured maximum size"); - } - } finally { - lock.unlock(); - } - } - - private void offerBatch(List offers) - { - final ReentrantLock lock = this.lock; - lock.lock(); - try { - if (objects.size() + offers.size() <= maxSize) { - for (T offer : offers) { - objects.push(offer); - } - notEnough.signal(); - } else { - throw new ISE("Cannot exceed pre-configured maximum size"); - } - } finally { - lock.unlock(); - } - } + ReferenceCountingResourceHolder> takeBatch(int elementNum); } diff --git a/common/src/main/java/io/druid/collections/DefaultBlockingPool.java b/common/src/main/java/io/druid/collections/DefaultBlockingPool.java new file mode 100644 index 000000000000..6e30add171c6 --- /dev/null +++ b/common/src/main/java/io/druid/collections/DefaultBlockingPool.java @@ -0,0 +1,297 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.collections; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import io.druid.java.util.common.ISE; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations. + */ +public class DefaultBlockingPool implements BlockingPool +{ + private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS; + + private final ArrayDeque objects; + private final ReentrantLock lock; + private final Condition notEnough; + private final int maxSize; + + public DefaultBlockingPool( + Supplier generator, + int limit + ) + { + this.objects = new ArrayDeque<>(limit); + this.maxSize = limit; + + for (int i = 0; i < limit; i++) { + objects.add(generator.get()); + } + + this.lock = new ReentrantLock(); + this.notEnough = lock.newCondition(); + } + + @Override + public int maxSize() + { + return maxSize; + } + + @VisibleForTesting + public int getPoolSize() + { + return objects.size(); + } + + @Override + public ReferenceCountingResourceHolder take(final long timeoutMs) + { + Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); + checkInitialized(); + try { + return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject()); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + @Override + public ReferenceCountingResourceHolder take() + { + checkInitialized(); + try { + return wrapObject(takeObject()); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + private ReferenceCountingResourceHolder wrapObject(T theObject) + { + return theObject == null ? null : new ReferenceCountingResourceHolder<>( + theObject, + new Closeable() + { + @Override + public void close() throws IOException + { + offer(theObject); + } + } + ); + } + + private T pollObject() + { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return objects.isEmpty() ? null : objects.pop(); + } finally { + lock.unlock(); + } + } + + private T pollObject(long timeoutMs) throws InterruptedException + { + long nanos = TIME_UNIT.toNanos(timeoutMs); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (objects.isEmpty()) { + if (nanos <= 0) { + return null; + } + nanos = notEnough.awaitNanos(nanos); + } + return objects.pop(); + } finally { + lock.unlock(); + } + } + + private T takeObject() throws InterruptedException + { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (objects.isEmpty()) { + notEnough.await(); + } + return objects.pop(); + } finally { + lock.unlock(); + } + } + + @Override + public ReferenceCountingResourceHolder> takeBatch(final int elementNum, final long timeoutMs) + { + Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); + checkInitialized(); + try { + return wrapObjects(timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum)); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + @Override + public ReferenceCountingResourceHolder> takeBatch(final int elementNum) + { + checkInitialized(); + try { + return wrapObjects(takeObjects(elementNum)); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + + private ReferenceCountingResourceHolder> wrapObjects(List theObjects) + { + return theObjects == null ? null : new ReferenceCountingResourceHolder<>( + theObjects, + new Closeable() + { + @Override + public void close() throws IOException + { + offerBatch(theObjects); + } + } + ); + } + + private List pollObjects(int elementNum) throws InterruptedException + { + final List list = Lists.newArrayListWithCapacity(elementNum); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + if (objects.size() < elementNum) { + return null; + } else { + for (int i = 0; i < elementNum; i++) { + list.add(objects.pop()); + } + return list; + } + } finally { + lock.unlock(); + } + } + + private List pollObjects(int elementNum, long timeoutMs) throws InterruptedException + { + long nanos = TIME_UNIT.toNanos(timeoutMs); + final List list = Lists.newArrayListWithCapacity(elementNum); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (objects.size() < elementNum) { + if (nanos <= 0) { + return null; + } + nanos = notEnough.awaitNanos(nanos); + } + for (int i = 0; i < elementNum; i++) { + list.add(objects.pop()); + } + return list; + } finally { + lock.unlock(); + } + } + + private List takeObjects(int elementNum) throws InterruptedException + { + final List list = Lists.newArrayListWithCapacity(elementNum); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (objects.size() < elementNum) { + notEnough.await(); + } + for (int i = 0; i < elementNum; i++) { + list.add(objects.pop()); + } + return list; + } finally { + lock.unlock(); + } + } + + private void checkInitialized() + { + Preconditions.checkState(maxSize > 0, "Pool was initialized with limit = 0, there are no objects to take."); + } + + private void offer(T theObject) + { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (objects.size() < maxSize) { + objects.push(theObject); + notEnough.signal(); + } else { + throw new ISE("Cannot exceed pre-configured maximum size"); + } + } finally { + lock.unlock(); + } + } + + private void offerBatch(List offers) + { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (objects.size() + offers.size() <= maxSize) { + for (T offer : offers) { + objects.push(offer); + } + notEnough.signal(); + } else { + throw new ISE("Cannot exceed pre-configured maximum size"); + } + } finally { + lock.unlock(); + } + } +} diff --git a/common/src/main/java/io/druid/collections/DummyBlockingPool.java b/common/src/main/java/io/druid/collections/DummyBlockingPool.java new file mode 100644 index 000000000000..2752e68a8ad2 --- /dev/null +++ b/common/src/main/java/io/druid/collections/DummyBlockingPool.java @@ -0,0 +1,70 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.collections; + +import java.util.List; + +/** + * BlockingPool with 0 maxSize, all take*() methods immediately throw {@link UnsupportedOperationException}. + */ +public final class DummyBlockingPool implements BlockingPool +{ + private static final DummyBlockingPool INSTANCE = new DummyBlockingPool(); + + @SuppressWarnings("unchecked") + public static BlockingPool instance() + { + return INSTANCE; + } + + private DummyBlockingPool() + { + } + + @Override + public int maxSize() + { + return 0; + } + + @Override + public ReferenceCountingResourceHolder take(long timeoutMs) + { + throw new UnsupportedOperationException(); + } + + @Override + public ReferenceCountingResourceHolder take() + { + throw new UnsupportedOperationException(); + } + + @Override + public ReferenceCountingResourceHolder> takeBatch(int elementNum, long timeoutMs) + { + throw new UnsupportedOperationException(); + } + + @Override + public ReferenceCountingResourceHolder> takeBatch(int elementNum) + { + throw new UnsupportedOperationException(); + } +} diff --git a/common/src/main/java/io/druid/collections/DummyNonBlockingPool.java b/common/src/main/java/io/druid/collections/DummyNonBlockingPool.java new file mode 100644 index 000000000000..c03dfd70c90f --- /dev/null +++ b/common/src/main/java/io/druid/collections/DummyNonBlockingPool.java @@ -0,0 +1,43 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.collections; + +/** + * NonBlockingPool which is not able to allocate objects, {@link #take()} throws {@link UnsupportedOperationException}. + */ +public final class DummyNonBlockingPool implements NonBlockingPool +{ + private static final DummyNonBlockingPool INSTANCE = new DummyNonBlockingPool(); + + @SuppressWarnings("unchecked") + public static NonBlockingPool instance() + { + return INSTANCE; + } + + private DummyNonBlockingPool() + { + } + @Override + public ResourceHolder take() + { + throw new UnsupportedOperationException(); + } +} diff --git a/common/src/main/java/io/druid/collections/NonBlockingPool.java b/common/src/main/java/io/druid/collections/NonBlockingPool.java new file mode 100644 index 000000000000..9941dcf47870 --- /dev/null +++ b/common/src/main/java/io/druid/collections/NonBlockingPool.java @@ -0,0 +1,25 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.collections; + +public interface NonBlockingPool +{ + ResourceHolder take(); +} diff --git a/common/src/main/java/io/druid/collections/StupidPool.java b/common/src/main/java/io/druid/collections/StupidPool.java index 939377a3eec1..dd1918732de1 100644 --- a/common/src/main/java/io/druid/collections/StupidPool.java +++ b/common/src/main/java/io/druid/collections/StupidPool.java @@ -35,7 +35,7 @@ /** */ -public class StupidPool +public class StupidPool implements NonBlockingPool { private static final Logger log = new Logger(StupidPool.class); @@ -95,6 +95,7 @@ public String toString() "}"; } + @Override public ResourceHolder take() { ObjectResourceHolder resourceHolder = objects.poll(); @@ -233,7 +234,7 @@ private static class ObjectLeakNotifier implements Runnable ObjectLeakNotifier(StupidPool pool) { - poolReference = new WeakReference>(pool); + poolReference = new WeakReference<>(pool); leakedObjectsCounter = pool.leakedObjectsCounter; } diff --git a/common/src/main/java/io/druid/concurrent/DummyExecutorService.java b/common/src/main/java/io/druid/concurrent/DummyExecutorService.java new file mode 100644 index 000000000000..aa7e0428f055 --- /dev/null +++ b/common/src/main/java/io/druid/concurrent/DummyExecutorService.java @@ -0,0 +1,123 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.concurrent; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * ExecutorService which is terminated and shutdown from the moment of creation and not able to accept any tasks. + */ +final class DummyExecutorService implements ExecutorService +{ + static final DummyExecutorService INSTANCE = new DummyExecutorService(); + + private DummyExecutorService() + { + } + + @Override + public void shutdown() + { + // Do nothing, alread shutdown + } + + @Override + public List shutdownNow() + { + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() + { + return true; + } + + @Override + public boolean isTerminated() + { + return true; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException + { + return true; + } + + @Override + public Future submit(Callable task) + { + throw new UnsupportedOperationException(); + } + + @Override + public Future submit(Runnable task, T result) + { + throw new UnsupportedOperationException(); + } + + @Override + public Future submit(Runnable task) + { + throw new UnsupportedOperationException(); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + @Override + public List> invokeAll( + Collection> tasks, long timeout, TimeUnit unit + ) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException + { + throw new UnsupportedOperationException(); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException + { + throw new UnsupportedOperationException(); + } + + @Override + public void execute(Runnable command) + { + throw new UnsupportedOperationException(); + } +} diff --git a/common/src/main/java/io/druid/concurrent/Execs.java b/common/src/main/java/io/druid/concurrent/Execs.java index 04b84f135bbc..65c5181341e4 100644 --- a/common/src/main/java/io/druid/concurrent/Execs.java +++ b/common/src/main/java/io/druid/concurrent/Execs.java @@ -41,6 +41,13 @@ */ public class Execs { + /** + * Returns an ExecutorService which is terminated and shutdown from the beginning and not able to accept any tasks. + */ + public static ExecutorService dummy() + { + return DummyExecutorService.INSTANCE; + } public static ExecutorService singleThreaded(@NotNull String nameFormat) { diff --git a/common/src/test/java/io/druid/collections/BlockingPoolTest.java b/common/src/test/java/io/druid/collections/BlockingPoolTest.java index 73a3d86c3075..f76339ca99c2 100644 --- a/common/src/test/java/io/druid/collections/BlockingPoolTest.java +++ b/common/src/test/java/io/druid/collections/BlockingPoolTest.java @@ -42,8 +42,8 @@ public class BlockingPoolTest { private static final ExecutorService SERVICE = Executors.newFixedThreadPool(2); - private static final BlockingPool POOL = new BlockingPool<>(Suppliers.ofInstance(1), 10); - private static final BlockingPool EMPTY_POOL = new BlockingPool<>(Suppliers.ofInstance(1), 0); + private static final DefaultBlockingPool POOL = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 10); + private static final BlockingPool EMPTY_POOL = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 0); @AfterClass public static void teardown() diff --git a/docs/content/development/router.md b/docs/content/development/router.md index 8fdb21ad3035..63ba585d4427 100644 --- a/docs/content/development/router.md +++ b/docs/content/development/router.md @@ -49,7 +49,6 @@ druid.host=#{IP_ADDR}:8080 druid.port=8080 druid.service=druid/router -druid.processing.numThreads=1 druid.router.defaultBrokerServiceName=druid:broker-cold druid.router.coordinatorServiceName=druid:coordinator druid.router.tierToBrokerMap={"hot":"druid:broker-hot","_default_tier":"druid:broker-cold"} diff --git a/examples/conf-quickstart/druid/middleManager/runtime.properties b/examples/conf-quickstart/druid/middleManager/runtime.properties index dc1f6aceddb3..254148fc51dd 100644 --- a/examples/conf-quickstart/druid/middleManager/runtime.properties +++ b/examples/conf-quickstart/druid/middleManager/runtime.properties @@ -11,9 +11,9 @@ druid.indexer.task.baseTaskDir=var/druid/task # HTTP server threads druid.server.http.numThreads=9 -# Processing threads and buffers -druid.processing.buffer.sizeBytes=256000000 -druid.processing.numThreads=2 +# Processing threads and buffers on Peons +druid.indexer.fork.property.druid.processing.buffer.sizeBytes=256000000 +druid.indexer.fork.property.druid.processing.numThreads=2 # Hadoop indexing druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp diff --git a/examples/conf/druid/middleManager/runtime.properties b/examples/conf/druid/middleManager/runtime.properties index ce857a0d09d8..50f03013c5a7 100644 --- a/examples/conf/druid/middleManager/runtime.properties +++ b/examples/conf/druid/middleManager/runtime.properties @@ -11,9 +11,9 @@ druid.indexer.task.baseTaskDir=var/druid/task # HTTP server threads druid.server.http.numThreads=25 -# Processing threads and buffers -druid.processing.buffer.sizeBytes=536870912 -druid.processing.numThreads=2 +# Processing threads and buffers on Peons +druid.indexer.fork.property.druid.processing.buffer.sizeBytes=536870912 +druid.indexer.fork.property.druid.processing.numThreads=2 # Hadoop indexing druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp diff --git a/integration-tests/docker/router.conf b/integration-tests/docker/router.conf index 17d5146bdf7b..c24c6995562f 100644 --- a/integration-tests/docker/router.conf +++ b/integration-tests/docker/router.conf @@ -9,9 +9,7 @@ command=java -Dfile.encoding=UTF-8 -Ddruid.host=%(ENV_HOST_IP)s -Ddruid.zk.service.host=druid-zookeeper-kafka - -Ddruid.computation.buffer.size=75000000 -Ddruid.server.http.numThreads=100 - -Ddruid.processing.numThreads=1 -cp /usr/local/druid/lib/* io.druid.cli.Main server router redirect_stderr=true diff --git a/java-util/src/main/java/io/druid/java/util/common/concurrent/ExecutorServiceConfig.java b/java-util/src/main/java/io/druid/java/util/common/concurrent/ExecutorServiceConfig.java index 2b0ae7756f5f..9c4c139a75be 100644 --- a/java-util/src/main/java/io/druid/java/util/common/concurrent/ExecutorServiceConfig.java +++ b/java-util/src/main/java/io/druid/java/util/common/concurrent/ExecutorServiceConfig.java @@ -26,13 +26,31 @@ */ public abstract class ExecutorServiceConfig { + public static final int DEFAULT_NUM_THREADS = -1; + @Config(value = "${base_path}.formatString") @Default("processing-%s") public abstract String getFormatString(); - @Config(value = "${base_path}.numThreads") public int getNumThreads() { - return Runtime.getRuntime().availableProcessors() - 1; + int numThreadsConfigured = getNumThreadsConfigured(); + if (numThreadsConfigured != DEFAULT_NUM_THREADS) { + return numThreadsConfigured; + } else { + return Math.max(Runtime.getRuntime().availableProcessors() - 1, 1); + } + } + + /** + * Returns the number of threads _explicitly_ configured, or -1 if it is not explicitly configured, that is not + * a valid number of threads. To get the configured value or the default (valid) number, use {@link #getNumThreads()}. + * This method exists for ability to distinguish between the default value set when there is no explicit config, and + * an explicitly configured value. + */ + @Config(value = "${base_path}.numThreads") + public int getNumThreadsConfigured() + { + return DEFAULT_NUM_THREADS; } } diff --git a/processing/src/main/java/io/druid/query/DruidProcessingConfig.java b/processing/src/main/java/io/druid/query/DruidProcessingConfig.java index 5445e5a7324b..f6c7c9fdd9ec 100644 --- a/processing/src/main/java/io/druid/query/DruidProcessingConfig.java +++ b/processing/src/main/java/io/druid/query/DruidProcessingConfig.java @@ -25,6 +25,8 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implements ColumnConfig { + public static final int DEFAULT_NUM_MERGE_BUFFERS = -1; + @Config({"druid.computation.buffer.size", "${base_path}.buffer.sizeBytes"}) public int intermediateComputeSizeBytes() { @@ -39,17 +41,31 @@ public int poolCacheMaxCount() @Override @Config(value = "${base_path}.numThreads") - public int getNumThreads() + public int getNumThreadsConfigured() { - // default to leaving one core for background tasks - final int processors = Runtime.getRuntime().availableProcessors(); - return processors > 1 ? processors - 1 : processors; + return DEFAULT_NUM_THREADS; } - @Config("${base_path}.numMergeBuffers") public int getNumMergeBuffers() { - return Math.max(2, getNumThreads() / 4); + int numMergeBuffersConfigured = getNumMergeBuffersConfigured(); + if (numMergeBuffersConfigured != DEFAULT_NUM_MERGE_BUFFERS) { + return numMergeBuffersConfigured; + } else { + return Math.max(2, getNumThreads() / 4); + } + } + + /** + * Returns the number of merge buffers _explicitly_ configured, or -1 if it is not explicitly configured, that is not + * a valid number of buffers. To get the configured value or the default (valid) number, use {@link + * #getNumMergeBuffers()}. This method exists for ability to distinguish between the default value set when there is + * no explicit config, and an explicitly configured value. + */ + @Config("${base_path}.numMergeBuffers") + public int getNumMergeBuffersConfigured() + { + return DEFAULT_NUM_MERGE_BUFFERS; } @Override diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index dda375b100be..dc90592eb57b 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -29,7 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.collections.StupidPool; +import io.druid.collections.NonBlockingPool; import io.druid.data.input.Row; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; @@ -60,13 +60,13 @@ public class GroupByMergedQueryRunner implements QueryRunner private final ListeningExecutorService exec; private final Supplier configSupplier; private final QueryWatcher queryWatcher; - private final StupidPool bufferPool; + private final NonBlockingPool bufferPool; public GroupByMergedQueryRunner( ExecutorService exec, Supplier configSupplier, QueryWatcher queryWatcher, - StupidPool bufferPool, + NonBlockingPool bufferPool, Iterable> queryables ) { diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index 32da5b3b268e..97d75da8c219 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -27,8 +27,8 @@ import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.google.inject.Inject; +import io.druid.collections.NonBlockingPool; import io.druid.collections.ResourceHolder; -import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.guice.annotations.Global; @@ -71,12 +71,12 @@ public class GroupByQueryEngine private static final int MISSING_VALUE = -1; private final Supplier config; - private final StupidPool intermediateResultsBufferPool; + private final NonBlockingPool intermediateResultsBufferPool; @Inject public GroupByQueryEngine( Supplier config, - @Global StupidPool intermediateResultsBufferPool + @Global NonBlockingPool intermediateResultsBufferPool ) { this.config = config; diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index b9d0910d8dd5..2e9f33d1fed3 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -23,7 +23,7 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import io.druid.collections.StupidPool; +import io.druid.collections.NonBlockingPool; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; @@ -59,7 +59,7 @@ public class GroupByQueryHelper public static Pair> createIndexAccumulatorPair( final GroupByQuery query, final GroupByQueryConfig config, - StupidPool bufferPool, + NonBlockingPool bufferPool, final boolean combine ) { @@ -188,7 +188,7 @@ public Queue accumulate(Queue accumulated, T in) public static IncrementalIndex makeIncrementalIndex( GroupByQuery query, GroupByQueryConfig config, - StupidPool bufferPool, + NonBlockingPool bufferPool, Sequence rows, boolean combine ) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 63394d06b25e..49f1b72917cc 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -23,8 +23,8 @@ import com.google.common.base.Strings; import com.google.common.base.Suppliers; import com.google.common.collect.Maps; +import io.druid.collections.NonBlockingPool; import io.druid.collections.ResourceHolder; -import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.java.util.common.IAE; @@ -87,7 +87,7 @@ private GroupByQueryEngineV2() public static Sequence process( final GroupByQuery query, final StorageAdapter storageAdapter, - final StupidPool intermediateResultsBufferPool, + final NonBlockingPool intermediateResultsBufferPool, final GroupByQueryConfig config ) { diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index 0cf98c313503..180659999516 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -29,7 +29,7 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; -import io.druid.collections.StupidPool; +import io.druid.collections.NonBlockingPool; import io.druid.data.input.Row; import io.druid.guice.annotations.Global; import io.druid.java.util.common.IAE; @@ -65,14 +65,14 @@ public class GroupByStrategyV1 implements GroupByStrategy private final Supplier configSupplier; private final GroupByQueryEngine engine; private final QueryWatcher queryWatcher; - private final StupidPool bufferPool; + private final NonBlockingPool bufferPool; @Inject public GroupByStrategyV1( Supplier configSupplier, GroupByQueryEngine engine, QueryWatcher queryWatcher, - @Global StupidPool bufferPool + @Global NonBlockingPool bufferPool ) { this.configSupplier = configSupplier; diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index ff9a43032973..add5387e75bf 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -28,8 +28,8 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; import io.druid.collections.BlockingPool; +import io.druid.collections.NonBlockingPool; import io.druid.collections.ResourceHolder; -import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.guice.annotations.Global; @@ -80,7 +80,7 @@ public class GroupByStrategyV2 implements GroupByStrategy private final DruidProcessingConfig processingConfig; private final Supplier configSupplier; - private final StupidPool bufferPool; + private final NonBlockingPool bufferPool; private final BlockingPool mergeBufferPool; private final ObjectMapper spillMapper; private final QueryWatcher queryWatcher; @@ -89,7 +89,7 @@ public class GroupByStrategyV2 implements GroupByStrategy public GroupByStrategyV2( DruidProcessingConfig processingConfig, Supplier configSupplier, - @Global StupidPool bufferPool, + @Global NonBlockingPool bufferPool, @Merging BlockingPool mergeBufferPool, @Smile ObjectMapper spillMapper, QueryWatcher queryWatcher diff --git a/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java b/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java index e826d2cd81b0..14ce9694b788 100644 --- a/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java @@ -19,7 +19,7 @@ package io.druid.query.topn; -import io.druid.collections.StupidPool; +import io.druid.collections.NonBlockingPool; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.query.ColumnSelectorPlus; @@ -41,12 +41,12 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm bufferPool; + private final NonBlockingPool bufferPool; public AggregateTopNMetricFirstAlgorithm( Capabilities capabilities, TopNQuery query, - StupidPool bufferPool + NonBlockingPool bufferPool ) { this.capabilities = capabilities; diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java index 27fea230d68c..2605480fdcee 100644 --- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java @@ -22,8 +22,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; +import io.druid.collections.NonBlockingPool; import io.druid.collections.ResourceHolder; -import io.druid.collections.StupidPool; import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.query.BaseQuery; @@ -185,13 +185,13 @@ private static void computeSpecializedScanAndAggregateImplementations() } private final TopNQuery query; - private final StupidPool bufferPool; + private final NonBlockingPool bufferPool; private static final int AGG_UNROLL_COUNT = 8; // Must be able to fit loop below public PooledTopNAlgorithm( Capabilities capabilities, TopNQuery query, - StupidPool bufferPool + NonBlockingPool bufferPool ) { super(capabilities); diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java index a81873865f8d..c1bb74717585 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java @@ -22,7 +22,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; -import io.druid.collections.StupidPool; +import io.druid.collections.NonBlockingPool; import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -51,9 +51,9 @@ public class TopNQueryEngine { private static final Logger log = new Logger(TopNQueryEngine.class); - private final StupidPool bufferPool; + private final NonBlockingPool bufferPool; - public TopNQueryEngine(StupidPool bufferPool) + public TopNQueryEngine(NonBlockingPool bufferPool) { this.bufferPool = bufferPool; } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java index f28077bb6c29..cf8c3aab69b5 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryRunnerFactory.java @@ -20,7 +20,7 @@ package io.druid.query.topn; import com.google.inject.Inject; -import io.druid.collections.StupidPool; +import io.druid.collections.NonBlockingPool; import io.druid.guice.annotations.Global; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; @@ -41,13 +41,13 @@ */ public class TopNQueryRunnerFactory implements QueryRunnerFactory, TopNQuery> { - private final StupidPool computationBufferPool; + private final NonBlockingPool computationBufferPool; private final TopNQueryQueryToolChest toolchest; private final QueryWatcher queryWatcher; @Inject public TopNQueryRunnerFactory( - @Global StupidPool computationBufferPool, + @Global NonBlockingPool computationBufferPool, TopNQueryQueryToolChest toolchest, QueryWatcher queryWatcher ) diff --git a/processing/src/main/java/io/druid/segment/CompressedPools.java b/processing/src/main/java/io/druid/segment/CompressedPools.java index c2453b920aaf..b140d33f6f41 100644 --- a/processing/src/main/java/io/druid/segment/CompressedPools.java +++ b/processing/src/main/java/io/druid/segment/CompressedPools.java @@ -21,6 +21,7 @@ import com.google.common.base.Supplier; import com.ning.compress.BufferRecycler; +import io.druid.collections.NonBlockingPool; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidPool; import io.druid.java.util.common.logger.Logger; @@ -36,7 +37,7 @@ public class CompressedPools private static final Logger log = new Logger(CompressedPools.class); public static final int BUFFER_SIZE = 0x10000; - private static final StupidPool bufferRecyclerPool = new StupidPool( + private static final NonBlockingPool bufferRecyclerPool = new StupidPool<>( "bufferRecyclerPool", new Supplier() { @@ -56,7 +57,7 @@ public static ResourceHolder getBufferRecycler() return bufferRecyclerPool.take(); } - private static final StupidPool outputBytesPool = new StupidPool( + private static final NonBlockingPool outputBytesPool = new StupidPool( "outputBytesPool", new Supplier() { @@ -76,7 +77,7 @@ public static ResourceHolder getOutputBytes() return outputBytesPool.take(); } - private static final StupidPool bigEndByteBufPool = new StupidPool( + private static final NonBlockingPool bigEndByteBufPool = new StupidPool( "bigEndByteBufPool", new Supplier() { @@ -91,7 +92,7 @@ public ByteBuffer get() } ); - private static final StupidPool littleEndByteBufPool = new StupidPool( + private static final NonBlockingPool littleEndByteBufPool = new StupidPool( "littleEndByteBufPool", new Supplier() { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 9e1c121abec8..7cea6d9be1f1 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -30,7 +30,7 @@ import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import io.druid.collections.StupidPool; +import io.druid.collections.NonBlockingPool; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; @@ -377,7 +377,7 @@ public IncrementalIndex buildOnheap() ); } - public IncrementalIndex buildOffheap(final StupidPool bufferPool) + public IncrementalIndex buildOffheap(final NonBlockingPool bufferPool) { if (maxRowCount <= 0) { throw new IllegalArgumentException("Invalid max row count: " + maxRowCount); diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 598d454a27c3..a6aa82925773 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -21,8 +21,8 @@ import com.google.common.base.Supplier; import com.google.common.collect.Maps; +import io.druid.collections.NonBlockingPool; import io.druid.collections.ResourceHolder; -import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; @@ -44,7 +44,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex { private static final Logger log = new Logger(OffheapIncrementalIndex.class); - private final StupidPool bufferPool; + private final NonBlockingPool bufferPool; private final List> aggBuffers = new ArrayList<>(); private final List indexAndOffsets = new ArrayList<>(); @@ -72,7 +72,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex boolean concurrentEventAdd, boolean sortFacts, int maxRowCount, - StupidPool bufferPool + NonBlockingPool bufferPool ) { super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd); diff --git a/processing/src/test/java/io/druid/query/TestQueryRunners.java b/processing/src/test/java/io/druid/query/TestQueryRunners.java index e4daec6ea846..a4f877a2f511 100644 --- a/processing/src/test/java/io/druid/query/TestQueryRunners.java +++ b/processing/src/test/java/io/druid/query/TestQueryRunners.java @@ -21,6 +21,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import io.druid.collections.NonBlockingPool; import io.druid.collections.StupidPool; import io.druid.query.search.SearchQueryQueryToolChest; import io.druid.query.search.SearchQueryRunnerFactory; @@ -41,7 +42,7 @@ */ public class TestQueryRunners { - public static final StupidPool pool = new StupidPool( + public static final NonBlockingPool pool = new StupidPool( "TestQueryRunners-bufferPool", new Supplier() { @@ -54,7 +55,7 @@ public ByteBuffer get() ); public static final TopNQueryConfig topNConfig = new TopNQueryConfig(); - public static StupidPool getPool() + public static NonBlockingPool getPool() { return pool; } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java index 82180d295007..ea9bb4389230 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -26,7 +26,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.collections.BlockingPool; +import io.druid.collections.DefaultBlockingPool; +import io.druid.collections.NonBlockingPool; import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.StupidPool; import io.druid.data.input.Row; @@ -59,7 +60,7 @@ @RunWith(Parameterized.class) public class GroupByQueryMergeBufferTest { - private static class TestBlockingPool extends BlockingPool + private static class TestBlockingPool extends DefaultBlockingPool { private int minRemainBufferNum; @@ -136,7 +137,7 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( ) { final Supplier configSupplier = Suppliers.ofInstance(config); - final StupidPool bufferPool = new StupidPool<>( + final NonBlockingPool bufferPool = new StupidPool<>( "GroupByQueryEngine-bufferPool", new Supplier() { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java index 11dd20b1507a..198c22df36ee 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -27,6 +27,8 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import io.druid.collections.BlockingPool; +import io.druid.collections.DefaultBlockingPool; +import io.druid.collections.NonBlockingPool; import io.druid.collections.ReferenceCountingResourceHolder; import io.druid.collections.StupidPool; import io.druid.data.input.Row; @@ -100,7 +102,7 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( ) { final Supplier configSupplier = Suppliers.ofInstance(config); - final StupidPool bufferPool = new StupidPool<>( + final NonBlockingPool bufferPool = new StupidPool<>( "GroupByQueryEngine-bufferPool", new Supplier() { @@ -138,7 +140,7 @@ public ByteBuffer get() ); } - private final static BlockingPool mergeBufferPool = new BlockingPool<>( + private final static BlockingPool mergeBufferPool = new DefaultBlockingPool<>( new Supplier() { @Override diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 867f62b657b0..85feca8e6800 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -32,6 +32,8 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; import io.druid.collections.BlockingPool; +import io.druid.collections.DefaultBlockingPool; +import io.druid.collections.NonBlockingPool; import io.druid.collections.StupidPool; import io.druid.data.input.Row; import io.druid.java.util.common.IAE; @@ -325,7 +327,7 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( ) { final Supplier configSupplier = Suppliers.ofInstance(config); - final StupidPool bufferPool = new StupidPool<>( + final NonBlockingPool bufferPool = new StupidPool<>( "GroupByQueryEngine-bufferPool", new Supplier() { @@ -336,7 +338,7 @@ public ByteBuffer get() } } ); - final BlockingPool mergeBufferPool = new BlockingPool<>( + final BlockingPool mergeBufferPool = new DefaultBlockingPool<>( new Supplier() { @Override diff --git a/server/src/main/java/io/druid/guice/DruidProcessingConfigModule.java b/server/src/main/java/io/druid/guice/DruidProcessingConfigModule.java new file mode 100644 index 000000000000..0c1daca0e066 --- /dev/null +++ b/server/src/main/java/io/druid/guice/DruidProcessingConfigModule.java @@ -0,0 +1,35 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.guice; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Module; +import io.druid.query.DruidProcessingConfig; + +public class DruidProcessingConfigModule implements Module +{ + + @Override + public void configure(Binder binder) + { + ConfigProvider.bind(binder, DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing")); + } +} diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java index 79ede1b4fcdf..ac1b8a267fd8 100644 --- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java @@ -19,16 +19,16 @@ package io.druid.guice; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.ProvisionException; - import io.druid.client.cache.CacheConfig; import io.druid.collections.BlockingPool; +import io.druid.collections.DefaultBlockingPool; +import io.druid.collections.NonBlockingPool; import io.druid.collections.StupidPool; import io.druid.common.utils.VMUtils; import io.druid.guice.annotations.BackgroundCaching; @@ -58,7 +58,6 @@ public class DruidProcessingModule implements Module @Override public void configure(Binder binder) { - ConfigProvider.bind(binder, DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing")); binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); } @@ -105,7 +104,7 @@ public ExecutorService getProcessingExecutorService( @Provides @LazySingleton @Global - public StupidPool getIntermediateResultsPool(DruidProcessingConfig config) + public NonBlockingPool getIntermediateResultsPool(DruidProcessingConfig config) { verifyDirectMemory(config); return new StupidPool<>( @@ -122,7 +121,7 @@ public StupidPool getIntermediateResultsPool(DruidProcessingConfig c public BlockingPool getMergeBufferPool(DruidProcessingConfig config) { verifyDirectMemory(config); - return new BlockingPool<>( + return new DefaultBlockingPool<>( new OffheapBufferGenerator("result merging", config.intermediateComputeSizeBytes()), config.getNumMergeBuffers() ); diff --git a/server/src/main/java/io/druid/guice/QueryableModule.java b/server/src/main/java/io/druid/guice/QueryableModule.java index b35742da466f..f4e783edd982 100644 --- a/server/src/main/java/io/druid/guice/QueryableModule.java +++ b/server/src/main/java/io/druid/guice/QueryableModule.java @@ -22,9 +22,9 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; -import com.google.inject.util.Providers; import io.druid.initialization.DruidModule; -import io.druid.query.QuerySegmentWalker; +import io.druid.query.DefaultQueryRunnerFactoryConglomerate; +import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.server.log.ComposingRequestLoggerProvider; import io.druid.server.log.EmittingRequestLoggerProvider; import io.druid.server.log.FileRequestLoggerProvider; @@ -43,9 +43,12 @@ public class QueryableModule implements DruidModule @Override public void configure(Binder binder) { - binder.bind(QuerySegmentWalker.class).toProvider(Providers.of(null)); binder.bind(RequestLogger.class).toProvider(RequestLoggerProvider.class).in(ManageLifecycle.class); JsonConfigProvider.bind(binder, "druid.request.logging", RequestLoggerProvider.class); + + binder.bind(QueryRunnerFactoryConglomerate.class) + .to(DefaultQueryRunnerFactoryConglomerate.class) + .in(LazySingleton.class); } @Override diff --git a/server/src/main/java/io/druid/guice/RouterProcessingModule.java b/server/src/main/java/io/druid/guice/RouterProcessingModule.java new file mode 100644 index 000000000000..bc3ac38fd25d --- /dev/null +++ b/server/src/main/java/io/druid/guice/RouterProcessingModule.java @@ -0,0 +1,107 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import io.druid.client.cache.CacheConfig; +import io.druid.collections.BlockingPool; +import io.druid.collections.DummyBlockingPool; +import io.druid.collections.DummyNonBlockingPool; +import io.druid.collections.NonBlockingPool; +import io.druid.concurrent.Execs; +import io.druid.guice.annotations.BackgroundCaching; +import io.druid.guice.annotations.Global; +import io.druid.guice.annotations.Merging; +import io.druid.guice.annotations.Processing; +import io.druid.java.util.common.concurrent.ExecutorServiceConfig; +import io.druid.java.util.common.logger.Logger; +import io.druid.query.DruidProcessingConfig; +import io.druid.query.ExecutorServiceMonitor; +import io.druid.server.metrics.MetricsModule; + +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; + +/** + * This module is used to fulfill dependency injection of query processing and caching resources: buffer pools and + * thread pools on Router Druid node type. Router needs to inject those resources, because it depends on + * {@link io.druid.query.QueryToolChest}s, and they couple query type aspects not related to processing and caching, + * which Router uses, and related to processing and caching, which Router doesn't use, but they inject the resources. + */ +public class RouterProcessingModule implements Module +{ + private static final Logger log = new Logger(RouterProcessingModule.class); + + @Override + public void configure(Binder binder) + { + binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class); + MetricsModule.register(binder, ExecutorServiceMonitor.class); + } + + @Provides + @BackgroundCaching + @LazySingleton + public ExecutorService getBackgroundExecutorService(CacheConfig cacheConfig) + { + if (cacheConfig.getNumBackgroundThreads() > 0) { + log.error( + "numBackgroundThreads[%d] configured, that is ignored on Router", + cacheConfig.getNumBackgroundThreads() + ); + } + return Execs.dummy(); + } + + @Provides + @Processing + @ManageLifecycle + public ExecutorService getProcessingExecutorService(DruidProcessingConfig config) + { + if (config.getNumThreadsConfigured() != ExecutorServiceConfig.DEFAULT_NUM_THREADS) { + log.error("numThreads[%d] configured, that is ignored on Router", config.getNumThreadsConfigured()); + } + return Execs.dummy(); + } + + @Provides + @LazySingleton + @Global + public NonBlockingPool getIntermediateResultsPool() + { + return DummyNonBlockingPool.instance(); + } + + @Provides + @LazySingleton + @Merging + public BlockingPool getMergeBufferPool(DruidProcessingConfig config) + { + if (config.getNumMergeBuffersConfigured() != DruidProcessingConfig.DEFAULT_NUM_MERGE_BUFFERS) { + log.error( + "numMergeBuffers[%d] configured, that is ignored on Router", + config.getNumMergeBuffersConfigured() + ); + } + return DummyBlockingPool.instance(); + } +} diff --git a/server/src/main/java/io/druid/guice/StorageNodeModule.java b/server/src/main/java/io/druid/guice/StorageNodeModule.java index 65c4a9d7f125..d4d74add21a0 100644 --- a/server/src/main/java/io/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/io/druid/guice/StorageNodeModule.java @@ -26,9 +26,7 @@ import com.google.inject.util.Providers; import io.druid.client.DruidServerConfig; import io.druid.guice.annotations.Self; -import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.DruidProcessingConfig; -import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.column.ColumnConfig; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.server.DruidNode; @@ -48,10 +46,6 @@ public void configure(Binder binder) binder.bind(NodeTypeConfig.class).toProvider(Providers.of(null)); binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); - - binder.bind(QueryRunnerFactoryConglomerate.class) - .to(DefaultQueryRunnerFactoryConglomerate.class) - .in(LazySingleton.class); } @Provides diff --git a/server/src/main/java/io/druid/initialization/Initialization.java b/server/src/main/java/io/druid/initialization/Initialization.java index d525391ac4d5..c4197bc0d75f 100644 --- a/server/src/main/java/io/druid/initialization/Initialization.java +++ b/server/src/main/java/io/druid/initialization/Initialization.java @@ -35,7 +35,7 @@ import io.druid.guice.AWSModule; import io.druid.guice.AnnouncerModule; import io.druid.guice.CoordinatorDiscoveryModule; -import io.druid.guice.DruidProcessingModule; +import io.druid.guice.DruidProcessingConfigModule; import io.druid.guice.DruidSecondaryModule; import io.druid.guice.ExpressionModule; import io.druid.guice.ExtensionsConfig; @@ -47,8 +47,6 @@ import io.druid.guice.LocalDataStorageDruidModule; import io.druid.guice.MetadataConfigModule; import io.druid.guice.ParsersModule; -import io.druid.guice.QueryRunnerFactoryModule; -import io.druid.guice.QueryableModule; import io.druid.guice.ServerModule; import io.druid.guice.ServerViewModule; import io.druid.guice.StartupLoggingModule; @@ -332,14 +330,12 @@ public static Injector makeInjectorWithModules(final Injector baseInjector, Iter new HttpClientModule("druid.broker.http", Client.class), new CuratorModule(), new AnnouncerModule(), - new DruidProcessingModule(), new AWSModule(), new MetricsModule(), new ServerModule(), + new DruidProcessingConfigModule(), new StorageNodeModule(), new JettyServerModule(), - new QueryableModule(), - new QueryRunnerFactoryModule(), new ExpressionModule(), new DiscoveryModule(), new ServerViewModule(), diff --git a/server/src/test/java/io/druid/server/log/LoggingRequestLoggerProviderTest.java b/server/src/test/java/io/druid/server/log/LoggingRequestLoggerProviderTest.java index d1e920e69d09..ab1ce25f2c15 100644 --- a/server/src/test/java/io/druid/server/log/LoggingRequestLoggerProviderTest.java +++ b/server/src/test/java/io/druid/server/log/LoggingRequestLoggerProviderTest.java @@ -28,6 +28,8 @@ import io.druid.guice.GuiceInjectors; import io.druid.guice.JsonConfigProvider; import io.druid.guice.JsonConfigurator; +import io.druid.guice.ManageLifecycle; +import io.druid.guice.QueryableModule; import io.druid.initialization.Initialization; import org.junit.Assert; import org.junit.Test; @@ -73,11 +75,12 @@ private Injector makeInjector() return Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), ImmutableList.of( - new Module() + new QueryableModule() { @Override public void configure(Binder binder) { + binder.bind(RequestLogger.class).toProvider(RequestLoggerProvider.class).in(ManageLifecycle.class); binder.bind(Key.get(String.class, Names.named("serviceName"))).toInstance("some service"); binder.bind(Key.get(Integer.class, Names.named("servicePort"))).toInstance(0); JsonConfigProvider.bind(binder, propertyPrefix, RequestLoggerProvider.class); diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 17a0bdb3b58a..e4e748c44d5a 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -34,10 +34,13 @@ import io.druid.client.selector.ServerSelectorStrategy; import io.druid.client.selector.TierSelectorStrategy; import io.druid.guice.CacheModule; +import io.druid.guice.DruidProcessingModule; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; import io.druid.java.util.common.logger.Logger; import io.druid.query.QuerySegmentWalker; import io.druid.query.RetryQueryRunnerConfig; @@ -75,6 +78,9 @@ public CliBroker() protected List getModules() { return ImmutableList.of( + new DruidProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), new Module() { @Override diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index 89d92f175f1a..9845331dbe75 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -27,12 +27,15 @@ import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheMonitor; import io.druid.guice.CacheModule; +import io.druid.guice.DruidProcessingModule; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.NodeTypeConfig; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; import io.druid.java.util.common.logger.Logger; import io.druid.query.QuerySegmentWalker; import io.druid.query.lookup.LookupModule; @@ -69,6 +72,9 @@ public CliHistorical() protected List getModules() { return ImmutableList.of( + new DruidProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), new Module() { @Override diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index d236ac593e75..f0c455b81c04 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -31,7 +31,6 @@ import com.google.inject.multibindings.MapBinder; import com.google.inject.name.Named; import com.google.inject.name.Names; - import io.airlift.airline.Arguments; import io.airlift.airline.Command; import io.airlift.airline.Option; @@ -39,6 +38,7 @@ import io.druid.client.coordinator.CoordinatorClient; import io.druid.guice.Binders; import io.druid.guice.CacheModule; +import io.druid.guice.DruidProcessingModule; import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.Jerseys; import io.druid.guice.JsonConfigProvider; @@ -47,6 +47,8 @@ import io.druid.guice.ManageLifecycle; import io.druid.guice.NodeTypeConfig; import io.druid.guice.PolyBind; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; import io.druid.guice.annotations.Json; import io.druid.indexing.common.RetryPolicyConfig; import io.druid.indexing.common.RetryPolicyFactory; @@ -84,13 +86,13 @@ import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig; import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.server.QueryResource; import io.druid.server.coordination.ServerType; import io.druid.server.http.SegmentListerResource; -import io.druid.server.metrics.QueryCountStatsProvider; -import io.druid.server.QueryResource; import io.druid.server.initialization.jetty.ChatHandlerServerModule; import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.metrics.DataSourceTaskIdHolder; +import io.druid.server.metrics.QueryCountStatsProvider; import org.eclipse.jetty.server.Server; import java.io.File; @@ -129,6 +131,9 @@ public CliPeon() protected List getModules() { return ImmutableList.of( + new DruidProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), new Module() { @Override diff --git a/services/src/main/java/io/druid/cli/CliRealtime.java b/services/src/main/java/io/druid/cli/CliRealtime.java index f0b5cdf60636..2c9236511c48 100644 --- a/services/src/main/java/io/druid/cli/CliRealtime.java +++ b/services/src/main/java/io/druid/cli/CliRealtime.java @@ -24,8 +24,10 @@ import com.google.inject.Inject; import com.google.inject.Module; import com.google.inject.name.Names; - import io.airlift.airline.Command; +import io.druid.guice.DruidProcessingModule; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; import io.druid.guice.RealtimeModule; import io.druid.java.util.common.logger.Logger; import io.druid.query.lookup.LookupModule; @@ -56,6 +58,9 @@ public CliRealtime() protected List getModules() { return ImmutableList.of( + new DruidProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), new RealtimeModule(), new Module() { diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index b6516fb4c66d..5075b65b1a72 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -29,7 +29,10 @@ import io.druid.client.DruidServer; import io.druid.client.InventoryView; import io.druid.client.ServerView; +import io.druid.guice.DruidProcessingModule; import io.druid.guice.LazySingleton; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; import io.druid.guice.RealtimeModule; import io.druid.java.util.common.logger.Logger; import io.druid.query.lookup.LookupModule; @@ -68,6 +71,9 @@ public CliRealtimeExample() protected List getModules() { return ImmutableList.of( + new DruidProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), new RealtimeModule(), new Module() { diff --git a/services/src/main/java/io/druid/cli/CliRouter.java b/services/src/main/java/io/druid/cli/CliRouter.java index ae797509fc41..a926252ff044 100644 --- a/services/src/main/java/io/druid/cli/CliRouter.java +++ b/services/src/main/java/io/druid/cli/CliRouter.java @@ -33,6 +33,9 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; +import io.druid.guice.RouterProcessingModule; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; import io.druid.guice.annotations.Self; import io.druid.guice.http.JettyHttpClientModule; import io.druid.java.util.common.logger.Logger; @@ -70,6 +73,9 @@ public CliRouter() protected List getModules() { return ImmutableList.of( + new RouterProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), new JettyHttpClientModule("druid.router.http", Router.class), new Module() { diff --git a/services/src/main/java/io/druid/cli/CreateTables.java b/services/src/main/java/io/druid/cli/CreateTables.java index aa9e3f5d98d6..abb5a7638acc 100644 --- a/services/src/main/java/io/druid/cli/CreateTables.java +++ b/services/src/main/java/io/druid/cli/CreateTables.java @@ -24,10 +24,12 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; - import io.airlift.airline.Command; import io.airlift.airline.Option; +import io.druid.guice.DruidProcessingModule; import io.druid.guice.JsonConfigProvider; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; import io.druid.guice.annotations.Self; import io.druid.java.util.common.logger.Logger; import io.druid.metadata.MetadataStorageConnector; @@ -66,6 +68,12 @@ public CreateTables() protected List getModules() { return ImmutableList.of( + // It's unknown why those modules are required in CreateTables, and if all of those modules are required or not. + // Maybe some of those modules could be removed. + // See https://github.com/druid-io/druid/pull/4429#discussion_r123602930 + new DruidProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), new Module() { @Override diff --git a/services/src/main/java/io/druid/cli/DumpSegment.java b/services/src/main/java/io/druid/cli/DumpSegment.java index 58d7abf957db..d377e50ec4ed 100644 --- a/services/src/main/java/io/druid/cli/DumpSegment.java +++ b/services/src/main/java/io/druid/cli/DumpSegment.java @@ -41,6 +41,9 @@ import io.druid.collections.bitmap.ConciseBitmapFactory; import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.collections.bitmap.RoaringBitmapFactory; +import io.druid.guice.DruidProcessingModule; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; import io.druid.guice.annotations.Json; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; @@ -425,6 +428,9 @@ private T withOutputStream(Function f) throws IOException protected List getModules() { return ImmutableList.of( + new DruidProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), new Module() { @Override diff --git a/services/src/main/java/io/druid/cli/InsertSegment.java b/services/src/main/java/io/druid/cli/InsertSegment.java index f6b1850d2b6f..ad585af418ae 100644 --- a/services/src/main/java/io/druid/cli/InsertSegment.java +++ b/services/src/main/java/io/druid/cli/InsertSegment.java @@ -29,7 +29,10 @@ import com.google.inject.Module; import io.airlift.airline.Command; import io.airlift.airline.Option; +import io.druid.guice.DruidProcessingModule; import io.druid.guice.JsonConfigProvider; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Self; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -71,6 +74,12 @@ public InsertSegment() protected List getModules() { return ImmutableList.of( + // It's unknown if those modules are required in InsertSegment. + // Maybe some of those modules could be removed. + // See https://github.com/druid-io/druid/pull/4429#discussion_r123603498 + new DruidProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), new Module() { @Override diff --git a/services/src/main/java/io/druid/cli/ResetCluster.java b/services/src/main/java/io/druid/cli/ResetCluster.java index 4ae0dcf22ebe..76c14f59c61a 100644 --- a/services/src/main/java/io/druid/cli/ResetCluster.java +++ b/services/src/main/java/io/druid/cli/ResetCluster.java @@ -26,8 +26,11 @@ import com.google.inject.Module; import io.airlift.airline.Command; import io.airlift.airline.Option; +import io.druid.guice.DruidProcessingModule; import io.druid.guice.IndexingServiceTaskLogsModule; import io.druid.guice.JsonConfigProvider; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; import io.druid.guice.annotations.Self; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.HadoopTask; @@ -76,6 +79,12 @@ public ResetCluster() protected List getModules() { return ImmutableList.of( + // It's unknown if those modules are required in ResetCluster. + // Maybe some of those modules could be removed. + // See https://github.com/druid-io/druid/pull/4429#discussion_r123603498 + new DruidProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), new Module() { @Override diff --git a/services/src/main/java/io/druid/cli/ValidateSegments.java b/services/src/main/java/io/druid/cli/ValidateSegments.java index dcac614d921e..b021c201692f 100644 --- a/services/src/main/java/io/druid/cli/ValidateSegments.java +++ b/services/src/main/java/io/druid/cli/ValidateSegments.java @@ -27,6 +27,9 @@ import com.google.inject.name.Names; import io.airlift.airline.Arguments; import io.airlift.airline.Command; +import io.druid.guice.DruidProcessingModule; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; import io.druid.java.util.common.IAE; import io.druid.java.util.common.logger.Logger; import io.druid.query.DruidProcessingConfig; @@ -72,6 +75,12 @@ public void run() { protected List getModules() { return ImmutableList.of( + // It's unknown if those modules are required in ValidateSegments. + // Maybe some of those modules could be removed. + // See https://github.com/druid-io/druid/pull/4429#discussion_r123603498 + new DruidProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), new Module() { @Override diff --git a/services/src/main/java/io/druid/cli/validate/DruidJsonValidator.java b/services/src/main/java/io/druid/cli/validate/DruidJsonValidator.java index 21d05db7c508..99e7abaa2b2c 100644 --- a/services/src/main/java/io/druid/cli/validate/DruidJsonValidator.java +++ b/services/src/main/java/io/druid/cli/validate/DruidJsonValidator.java @@ -31,17 +31,19 @@ import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.name.Names; - import io.airlift.airline.Command; import io.airlift.airline.Option; import io.druid.cli.GuiceRunnable; import io.druid.data.input.InputRow; import io.druid.data.input.impl.StringInputRowParser; +import io.druid.guice.DruidProcessingModule; import io.druid.guice.ExtensionsConfig; import io.druid.guice.FirehoseModule; import io.druid.guice.IndexingServiceFirehoseModule; import io.druid.guice.LocalDataStorageDruidModule; import io.druid.guice.ParsersModule; +import io.druid.guice.QueryRunnerFactoryModule; +import io.druid.guice.QueryableModule; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.IndexingHadoopModule; import io.druid.indexing.common.task.Task; @@ -94,6 +96,12 @@ public DruidJsonValidator() protected List getModules() { return ImmutableList.of( + // It's unknown if those modules are required in DruidJsonValidator. + // Maybe some of those modules could be removed. + // See https://github.com/druid-io/druid/pull/4429#discussion_r123603498 + new DruidProcessingModule(), + new QueryableModule(), + new QueryRunnerFactoryModule(), new com.google.inject.Module() { @Override