diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 34ef75cc87a3..146a19ec0206 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -104,6 +104,7 @@ import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; @@ -2884,7 +2885,7 @@ public void close() EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, this::makeTimeseriesAndScanConglomerate, - Execs.directExecutor(), // queryExecutorService + DirectQueryProcessingPool.INSTANCE, NoopJoinableFactory.INSTANCE, () -> EasyMock.createMock(MonitorScheduler.class), new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 19d2445f7ee2..57c2b2719ef5 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -89,6 +89,7 @@ import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.filter.SelectorDimFilter; @@ -2971,7 +2972,7 @@ public void close() EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, this::makeTimeseriesOnlyConglomerate, - Execs.directExecutor(), // queryExecutorService + DirectQueryProcessingPool.INSTANCE, NoopJoinableFactory.INSTANCE, () -> EasyMock.createMock(MonitorScheduler.class), new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index bc1e00b3d04e..dbaec5a757f7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -46,6 +46,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.Monitor; import org.apache.druid.java.util.metrics.MonitorScheduler; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; @@ -73,7 +74,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; /** * Stuff that may be needed by a Task in order to conduct its business. @@ -99,7 +99,7 @@ public class TaskToolbox private final Provider queryRunnerFactoryConglomerateProvider; @Nullable private final Provider monitorSchedulerProvider; - private final ExecutorService queryExecutorService; + private final QueryProcessingPool queryProcessingPool; private final JoinableFactory joinableFactory; private final SegmentLoader segmentLoader; private final ObjectMapper jsonMapper; @@ -141,7 +141,7 @@ public TaskToolbox( DataSegmentServerAnnouncer serverAnnouncer, SegmentHandoffNotifierFactory handoffNotifierFactory, Provider queryRunnerFactoryConglomerateProvider, - ExecutorService queryExecutorService, + QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, @Nullable Provider monitorSchedulerProvider, SegmentLoader segmentLoader, @@ -180,7 +180,7 @@ public TaskToolbox( this.serverAnnouncer = serverAnnouncer; this.handoffNotifierFactory = handoffNotifierFactory; this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider; - this.queryExecutorService = queryExecutorService; + this.queryProcessingPool = queryProcessingPool; this.joinableFactory = joinableFactory; this.monitorSchedulerProvider = monitorSchedulerProvider; this.segmentLoader = segmentLoader; @@ -268,9 +268,9 @@ public QueryRunnerFactoryConglomerate getQueryRunnerFactoryConglomerate() return queryRunnerFactoryConglomerateProvider.get(); } - public ExecutorService getQueryExecutorService() + public QueryProcessingPool getQueryProcessingPool() { - return queryExecutorService; + return queryProcessingPool; } public JoinableFactory getJoinableFactory() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index 5112fa9f4eb5..5cd4eb56067b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -33,7 +33,6 @@ import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Parent; -import org.apache.druid.guice.annotations.Processing; import org.apache.druid.guice.annotations.RemoteChatHandler; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.config.TaskConfig; @@ -44,6 +43,7 @@ import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.MonitorScheduler; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; @@ -62,7 +62,6 @@ import org.apache.druid.server.security.AuthorizerMapper; import java.io.File; -import java.util.concurrent.ExecutorService; /** * Stuff that may be needed by a Task in order to conduct its business. @@ -81,7 +80,7 @@ public class TaskToolboxFactory private final DataSegmentServerAnnouncer serverAnnouncer; private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final Provider queryRunnerFactoryConglomerateProvider; - private final ExecutorService queryExecutorService; + private final QueryProcessingPool queryProcessingPool; private final JoinableFactory joinableFactory; private final Provider monitorSchedulerProvider; private final SegmentLoaderFactory segmentLoaderFactory; @@ -122,7 +121,7 @@ public TaskToolboxFactory( DataSegmentServerAnnouncer serverAnnouncer, SegmentHandoffNotifierFactory handoffNotifierFactory, Provider queryRunnerFactoryConglomerateProvider, - @Processing ExecutorService queryExecutorService, + QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, Provider monitorSchedulerProvider, SegmentLoaderFactory segmentLoaderFactory, @@ -160,7 +159,7 @@ public TaskToolboxFactory( this.serverAnnouncer = serverAnnouncer; this.handoffNotifierFactory = handoffNotifierFactory; this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider; - this.queryExecutorService = queryExecutorService; + this.queryProcessingPool = queryProcessingPool; this.joinableFactory = joinableFactory; this.monitorSchedulerProvider = monitorSchedulerProvider; this.segmentLoaderFactory = segmentLoaderFactory; @@ -202,7 +201,7 @@ public TaskToolbox build(Task task) serverAnnouncer, handoffNotifierFactory, queryRunnerFactoryConglomerateProvider, - queryExecutorService, + queryProcessingPool, joinableFactory, monitorSchedulerProvider, segmentLoaderFactory.manufacturate(taskWorkDir), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 550f81886f1f..edc1d183ffca 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -776,7 +776,7 @@ private Appenderator newAppenderator( toolbox.getQueryRunnerFactoryConglomerate(), toolbox.getSegmentAnnouncer(), toolbox.getEmitter(), - toolbox.getQueryExecutorService(), + toolbox.getQueryProcessingPool(), toolbox.getJoinableFactory(), toolbox.getCache(), toolbox.getCacheConfig(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java index b4c75206058d..6a5fc698daf3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java @@ -344,7 +344,7 @@ public String getVersion(final Interval interval) lockingSegmentAnnouncer, segmentPublisher, toolbox.getSegmentHandoffNotifierFactory(), - toolbox.getQueryExecutorService(), + toolbox.getQueryProcessingPool(), toolbox.getJoinableFactory(), toolbox.getIndexMergerV9(), toolbox.getIndexIO(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index d3436ea3ef34..3f91d11eeb9a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -192,7 +192,7 @@ public Appenderator newAppenderator( toolbox.getQueryRunnerFactoryConglomerate(), toolbox.getSegmentAnnouncer(), toolbox.getEmitter(), - toolbox.getQueryExecutorService(), + toolbox.getQueryProcessingPool(), toolbox.getJoinableFactory(), toolbox.getCache(), toolbox.getCacheConfig(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index 671556975a96..ac0685a401bd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.MonitorScheduler; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; @@ -62,7 +63,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; public class TaskToolboxTest { @@ -81,7 +81,7 @@ public class TaskToolboxTest private QueryRunnerFactoryConglomerate mockQueryRunnerFactoryConglomerate = EasyMock.createMock(QueryRunnerFactoryConglomerate.class); private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class); - private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class); + private QueryProcessingPool mockQueryProcessingPool = EasyMock.createMock(QueryProcessingPool.class); private ObjectMapper ObjectMapper = new ObjectMapper(); private SegmentLoaderFactory mockSegmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class); private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class); @@ -126,7 +126,7 @@ public void setUp() throws IOException EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), mockHandoffNotifierFactory, () -> mockQueryRunnerFactoryConglomerate, - mockQueryExecutorService, + mockQueryProcessingPool, NoopJoinableFactory.INSTANCE, () -> mockMonitorScheduler, mockSegmentLoaderFactory, @@ -172,9 +172,9 @@ public void testGetQueryRunnerFactoryConglomerate() } @Test - public void testGetQueryExecutorService() + public void testGetQueryProcessingPool() { - Assert.assertEquals(mockQueryExecutorService, taskToolbox.build(task).getQueryExecutorService()); + Assert.assertEquals(mockQueryProcessingPool, taskToolbox.build(task).getQueryProcessingPool()); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 113d41a864af..d7c5e32260f2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -93,6 +93,7 @@ import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.Druids; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunnerFactoryConglomerate; @@ -1586,7 +1587,7 @@ public void close() EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, () -> conglomerate, - Execs.directExecutor(), // queryExecutorService + DirectQueryProcessingPool.INSTANCE, // queryExecutorService NoopJoinableFactory.INSTANCE, () -> EasyMock.createMock(MonitorScheduler.class), new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 0a2e34d7d6d7..d7e94238276f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -79,6 +79,7 @@ import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; @@ -986,7 +987,7 @@ public void close() EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, () -> conglomerate, - Execs.directExecutor(), // queryExecutorService + DirectQueryProcessingPool.INSTANCE, NoopJoinableFactory.INSTANCE, () -> EasyMock.createMock(MonitorScheduler.class), new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java index 6e963e6f9136..a20575140c55 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java @@ -25,6 +25,7 @@ import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.SegmentDescriptor; @@ -43,8 +44,6 @@ import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Interval; -import java.util.concurrent.ExecutorService; - public class TestAppenderatorsManager implements AppenderatorsManager { private Appenderator realtimeAppenderator; @@ -62,7 +61,7 @@ public Appenderator createRealtimeAppenderatorForTask( QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, - ExecutorService queryExecutorService, + QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, @@ -83,7 +82,7 @@ public Appenderator createRealtimeAppenderatorForTask( conglomerate, segmentAnnouncer, emitter, - queryExecutorService, + queryProcessingPool, joinableFactory, cache, cacheConfig, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 613e0ac78a94..6d2f1ae8f798 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -94,7 +94,6 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.jackson.JacksonUtils; @@ -105,6 +104,8 @@ import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.query.DirectQueryProcessingPool; +import org.apache.druid.query.ForwardingQueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -156,6 +157,7 @@ import org.junit.runners.Parameterized; import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.net.URI; @@ -660,7 +662,7 @@ public void unannounceSegments(Iterable segments) EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, () -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective - Execs.directExecutor(), // query executor service + DirectQueryProcessingPool.INSTANCE, // query executor service NoopJoinableFactory.INSTANCE, () -> monitorScheduler, // monitor scheduler new SegmentLoaderFactory(null, new DefaultObjectMapper()), @@ -1336,7 +1338,7 @@ public void testUnifiedAppenderatorsManagerCleanup() throws Exception final ExecutorService exec = Executors.newFixedThreadPool(8); UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager( - exec, + new ForwardingQueryProcessingPool(exec), NoopJoinableFactory.INSTANCE, new WorkerConfig(), MapCache.create(2048), diff --git a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java index eaf244f2b2cc..98c7f964c128 100644 --- a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java +++ b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java @@ -25,7 +25,6 @@ import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulator; -import org.apache.druid.guice.annotations.Processing; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.Sequence; @@ -35,6 +34,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.Query; import org.apache.druid.query.QueryCapacityExceededException; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; @@ -55,7 +55,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -91,7 +90,7 @@ public class ServerManagerForQueryErrorTest extends ServerManager public ServerManagerForQueryErrorTest( QueryRunnerFactoryConglomerate conglomerate, ServiceEmitter emitter, - @Processing ExecutorService exec, + QueryProcessingPool queryProcessingPool, CachePopulator cachePopulator, @Smile ObjectMapper objectMapper, Cache cache, @@ -104,7 +103,7 @@ public ServerManagerForQueryErrorTest( super( conglomerate, emitter, - exec, + queryProcessingPool, cachePopulator, objectMapper, cache, @@ -116,7 +115,7 @@ public ServerManagerForQueryErrorTest( } @Override - QueryRunner buildQueryRunnerForSegment( + protected QueryRunner buildQueryRunnerForSegment( Query query, SegmentDescriptor descriptor, QueryRunnerFactory> factory, diff --git a/processing/src/main/java/org/apache/druid/guice/annotations/Processing.java b/processing/src/main/java/org/apache/druid/query/AbstractPrioritizedQueryRunnerCallable.java similarity index 56% rename from processing/src/main/java/org/apache/druid/guice/annotations/Processing.java rename to processing/src/main/java/org/apache/druid/query/AbstractPrioritizedQueryRunnerCallable.java index f82691c2b766..51db7f4df691 100644 --- a/processing/src/main/java/org/apache/druid/guice/annotations/Processing.java +++ b/processing/src/main/java/org/apache/druid/query/AbstractPrioritizedQueryRunnerCallable.java @@ -17,20 +17,33 @@ * under the License. */ -package org.apache.druid.guice.annotations; - -import com.google.inject.BindingAnnotation; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; +package org.apache.druid.query; /** + * A helper class to avoid boilerplate for creating {@link PrioritizedQueryRunnerCallable} objects. + * @param + * @param */ -@BindingAnnotation -@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) -@Retention(RetentionPolicy.RUNTIME) -public @interface Processing +public abstract class AbstractPrioritizedQueryRunnerCallable implements PrioritizedQueryRunnerCallable { + private final QueryRunner runner; + private final int priority; + + public AbstractPrioritizedQueryRunnerCallable(int priority, QueryRunner runner) + { + this.priority = priority; + this.runner = runner; + } + + @Override + public QueryRunner getRunner() + { + return runner; + } + + @Override + public int getPriority() + { + return priority; + } } diff --git a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java index b9c0e25a9529..bd21034186ca 100644 --- a/processing/src/main/java/org/apache/druid/query/CacheStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/CacheStrategy.java @@ -27,7 +27,6 @@ import java.util.Iterator; import java.util.List; -import java.util.concurrent.ExecutorService; /** * Handles caching-related tasks for a particular query type. @@ -42,7 +41,7 @@ public interface CacheStrategy> * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node. * * @param query the query to be cached - * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be + * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be * called on the cached by-segment results * * @return true if the query is cacheable, otherwise false. diff --git a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java index 6269493dfd16..cc393e25b25c 100644 --- a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java @@ -25,8 +25,6 @@ import com.google.common.collect.Ordering; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -36,12 +34,10 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.context.ResponseContext; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -62,28 +58,18 @@ public class ChainedExecutionQueryRunner implements QueryRunner { private static final Logger log = new Logger(ChainedExecutionQueryRunner.class); + private final QueryProcessingPool queryProcessingPool; private final Iterable> queryables; - private final ListeningExecutorService exec; private final QueryWatcher queryWatcher; - public ChainedExecutionQueryRunner( - ExecutorService exec, - QueryWatcher queryWatcher, - QueryRunner... queryables - ) - { - this(exec, queryWatcher, Arrays.asList(queryables)); - } public ChainedExecutionQueryRunner( - ExecutorService exec, + QueryProcessingPool queryProcessingPool, QueryWatcher queryWatcher, Iterable> queryables ) { - // listeningDecorator will leave PrioritizedExecutorService unchanged, - // since it already implements ListeningExecutorService - this.exec = MoreExecutors.listeningDecorator(exec); + this.queryProcessingPool = queryProcessingPool; this.queryables = Iterables.unmodifiableIterable(queryables); this.queryWatcher = queryWatcher; } @@ -111,8 +97,8 @@ public Iterator make() throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening"); } - return exec.submit( - new AbstractPrioritizedCallable>(priority) + return queryProcessingPool.submitRunnerTask( + new AbstractPrioritizedQueryRunnerCallable, T>(priority, input) { @Override public Iterable call() @@ -142,8 +128,7 @@ public Iterable call() throw new RuntimeException(e); } } - } - ); + }); } ) ); diff --git a/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java b/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java new file mode 100644 index 000000000000..c24b0157af6c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.util.concurrent.ForwardingListeningExecutorService; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.druid.java.util.common.concurrent.Execs; + +/** + * {@link QueryProcessingPool} wrapper over {@link Execs#directExecutor()} + */ +public class DirectQueryProcessingPool extends ForwardingListeningExecutorService implements QueryProcessingPool +{ + public static DirectQueryProcessingPool INSTANCE = new DirectQueryProcessingPool(); + + private DirectQueryProcessingPool() + { + + } + + @Override + public ListenableFuture submitRunnerTask(PrioritizedQueryRunnerCallable task) + { + return delegate().submit(task); + } + + @Override + public ListeningExecutorService delegate() + { + return Execs.directExecutor(); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java b/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java new file mode 100644 index 000000000000..fdb98756c535 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.util.concurrent.ForwardingListeningExecutorService; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +import java.util.concurrent.ExecutorService; + +/** + * Default implementation of {@link QueryProcessingPool} that just forwards operations, including query execution tasks, + * to an underlying {@link ExecutorService} + */ +public class ForwardingQueryProcessingPool extends ForwardingListeningExecutorService implements QueryProcessingPool +{ + private final ListeningExecutorService delegate; + + public ForwardingQueryProcessingPool(ExecutorService executorService) + { + this.delegate = MoreExecutors.listeningDecorator(executorService); + } + + @Override + public ListenableFuture submitRunnerTask(PrioritizedQueryRunnerCallable task) + { + return delegate().submit(task); + } + + @Override + protected ListeningExecutorService delegate() + { + return delegate; + } + +} diff --git a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java index 85a96014ef70..5b7d550a027d 100644 --- a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java @@ -28,8 +28,6 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.data.input.Row; @@ -51,7 +49,6 @@ import java.util.Queue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -60,20 +57,20 @@ public class GroupByMergedQueryRunner implements QueryRunner { private static final Logger log = new Logger(GroupByMergedQueryRunner.class); private final Iterable> queryables; - private final ListeningExecutorService exec; private final Supplier configSupplier; private final QueryWatcher queryWatcher; private final NonBlockingPool bufferPool; + private final QueryProcessingPool queryProcessingPool; public GroupByMergedQueryRunner( - ExecutorService exec, + QueryProcessingPool queryProcessingPool, Supplier configSupplier, QueryWatcher queryWatcher, NonBlockingPool bufferPool, Iterable> queryables ) { - this.exec = MoreExecutors.listeningDecorator(exec); + this.queryProcessingPool = queryProcessingPool; this.queryWatcher = queryWatcher; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.configSupplier = configSupplier; @@ -109,8 +106,8 @@ public ListenableFuture apply(final QueryRunner input) throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening"); } - ListenableFuture future = exec.submit( - new AbstractPrioritizedCallable(priority) + ListenableFuture future = queryProcessingPool.submitRunnerTask( + new AbstractPrioritizedQueryRunnerCallable(priority, input) { @Override public Void call() @@ -118,10 +115,10 @@ public Void call() try { if (bySegment) { input.run(threadSafeQueryPlus, responseContext) - .accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs); + .accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs); } else { input.run(threadSafeQueryPlus, responseContext) - .accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); + .accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); } return null; diff --git a/processing/src/main/java/org/apache/druid/query/MetricsEmittingExecutorService.java b/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java similarity index 62% rename from processing/src/main/java/org/apache/druid/query/MetricsEmittingExecutorService.java rename to processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java index 14b35608a6a5..4047d7977f91 100644 --- a/processing/src/main/java/org/apache/druid/query/MetricsEmittingExecutorService.java +++ b/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java @@ -19,53 +19,28 @@ package org.apache.druid.query; -import com.google.common.util.concurrent.ForwardingListeningExecutorService; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; -import java.util.concurrent.Callable; - -public class MetricsEmittingExecutorService extends ForwardingListeningExecutorService +public class MetricsEmittingQueryProcessingPool extends ForwardingQueryProcessingPool implements ExecutorServiceMonitor.MetricEmitter { - private final ListeningExecutorService delegate; - public MetricsEmittingExecutorService( + public MetricsEmittingQueryProcessingPool( ListeningExecutorService delegate, ExecutorServiceMonitor executorServiceMonitor ) { - super(); - this.delegate = delegate; + super(delegate); executorServiceMonitor.add(this); } - @Override - protected ListeningExecutorService delegate() - { - return delegate; - } - - @SuppressWarnings("ParameterPackage") - @Override - public ListenableFuture submit(Callable tCallable) - { - return delegate.submit(tCallable); - } - - @Override - public void execute(Runnable runnable) - { - delegate.execute(runnable); - } - @Override public void emitMetrics(ServiceEmitter emitter, ServiceMetricEvent.Builder metricBuilder) { - if (delegate instanceof PrioritizedExecutorService) { - emitter.emit(metricBuilder.build("segment/scan/pending", ((PrioritizedExecutorService) delegate).getQueueSize())); + if (delegate() instanceof PrioritizedExecutorService) { + emitter.emit(metricBuilder.build("segment/scan/pending", ((PrioritizedExecutorService) delegate()).getQueueSize())); } } diff --git a/processing/src/main/java/org/apache/druid/query/PrioritizedQueryRunnerCallable.java b/processing/src/main/java/org/apache/druid/query/PrioritizedQueryRunnerCallable.java new file mode 100644 index 000000000000..6b1b5540b71e --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/PrioritizedQueryRunnerCallable.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +/** + * An implementation of {@link PrioritizedCallable} that also lets caller get access to associated {@link QueryRunner} + * It is used in implementations of {@link QueryRunnerFactory} + * @param - Type of result of {@link #call()} method + * @param - Type of {@link org.apache.druid.java.util.common.guava.Sequence} of rows returned by {@link QueryRunner} + */ +public interface PrioritizedQueryRunnerCallable extends PrioritizedCallable +{ + /** + * This method can be used by the extensions to get the runner that the given query execution task corresponds to. + * That in turn can be used to fetch any state associated with the QueryRunner such as the segment info for example. + * Extensions can carry any state from custom implementation of QuerySegmentWalker to a + * custom implementation of {@link QueryProcessingPool#submitRunnerTask(PrioritizedQueryRunnerCallable)} + */ + QueryRunner getRunner(); +} diff --git a/processing/src/main/java/org/apache/druid/query/Query.java b/processing/src/main/java/org/apache/druid/query/Query.java index fc12d5e41722..2a232cf45028 100644 --- a/processing/src/main/java/org/apache/druid/query/Query.java +++ b/processing/src/main/java/org/apache/druid/query/Query.java @@ -48,7 +48,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutorService; @ExtensionPoint @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "queryType") @@ -109,7 +108,7 @@ public interface Query /** * Comparator that represents the order in which results are generated from the * {@link QueryRunnerFactory#createRunner(Segment)} and - * {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} calls. This is used to combine streams of + * {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} calls. This is used to combine streams of * results from different sources; for example, it's used by historicals to combine streams from different segments, * and it's used by the broker to combine streams from different historicals. * diff --git a/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java b/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java new file mode 100644 index 000000000000..785c31cad792 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.druid.guice.annotations.ExtensionPoint; + +/** + * This class implements the logic of how units of query execution run concurrently. It is used in {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}. + * In a most straightforward implementation, each unit will be submitted to an {@link PrioritizedExecutorService}. Extensions, + * however, can implement their own logic for picking which unit to pick first for execution. + *

+ * This interface extends {@link ListeningExecutorService} as well. It has a separate + * method to submit query execution tasks so that implementations can differentiate those tasks from any regular async + * tasks. One example is {@link org.apache.druid.query.groupby.strategy.GroupByStrategyV2#mergeRunners(QueryProcessingPool, Iterable)} + * where different kind of tasks are submitted to same processing pool. + *

+ * Query execution task also includes a reference to {@link QueryRunner} so that any state required to decide the priority + * of a unit can be carried forward with the corresponding {@link QueryRunner}. + */ +@ExtensionPoint +public interface QueryProcessingPool extends ListeningExecutorService +{ + /** + * Submits the query execution unit task for asynchronous execution. + * + * @param task - Task to be submitted. + * @param - Task result type + * @param - Query runner sequence type + * @return - Future object for tracking the task completion. + */ + ListenableFuture submitRunnerTask(PrioritizedQueryRunnerCallable task); +} diff --git a/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java index 0832fb13ccae..add1a0a13462 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java @@ -44,6 +44,7 @@ public interface QueryRunnerFactory> QueryRunner createRunner(Segment segment); /** + * @deprecated Use {@link #mergeRunners(QueryProcessingPool, Iterable)} instead. * Runners generated with createRunner() and combined into an Iterable in (time,shardId) order are passed * along to this method with an {@link ExecutorService}. The method should then return a {@link QueryRunner} that, * when asked, will use the {@link ExecutorService} to run the base QueryRunners in some fashion. @@ -56,10 +57,38 @@ public interface QueryRunnerFactory> * * @param queryExecutor {@link ExecutorService} to be used for parallel processing * @param queryRunners Individual {@link QueryRunner} objects that produce some results - * @return a {@link QueryRunner} that, when asked, will use the {@link ExecutorService} to run the base + * @return a {@link QueryRunner} that, when asked, will use the {@link ExecutorService} to run the base * {@link QueryRunner} collection. */ - QueryRunner mergeRunners(ExecutorService queryExecutor, Iterable> queryRunners); + @Deprecated + default QueryRunner mergeRunners(ExecutorService queryExecutor, Iterable> queryRunners) + { + return mergeRunners(new ForwardingQueryProcessingPool(queryExecutor), queryRunners); + } + + /** + * Runners generated with createRunner() and combined into an Iterable in (time,shardId) order are passed + * along to this method with an {@link QueryProcessingPool}. The method should then return a {@link QueryRunner} that, + * when asked, will use the {@link QueryProcessingPool} to run the base QueryRunners in some fashion. + * + * The vast majority of the time, this should be implemented with {@link ChainedExecutionQueryRunner}: + * + * return new ChainedExecutionQueryRunner<>(queryProcessingPool, toolChest.getOrdering(), queryWatcher, queryRunners); + * + * Which will allow for parallel execution up to the maximum number of processing threads allowed. + * + * Unlike {@link #mergeRunners(ExecutorService, Iterable)}, this method takes a {@link QueryProcessingPool} instead + * which allows custom implementations for prioritize query execution on segments. + * + * @param queryProcessingPool {@link QueryProcessingPool} to be used for parallel processing + * @param queryRunners Individual {@link QueryRunner} objects that produce some results + * @return a {@link QueryRunner} that, when asked, will use the {@link ExecutorService} to run the base + * {@link QueryRunner} collection. + */ + QueryRunner mergeRunners( + QueryProcessingPool queryProcessingPool, + Iterable> queryRunners + ); /** * Provides access to the {@link QueryToolChest} for this specific {@link Query} type. diff --git a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java index 73e0fabb2b7b..31751fb13f16 100644 --- a/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryRunnerFactory.java @@ -26,6 +26,7 @@ import org.apache.druid.query.ChainedExecutionQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; @@ -36,7 +37,6 @@ import org.apache.druid.segment.StorageAdapter; import java.util.Iterator; -import java.util.concurrent.ExecutorService; /** */ @@ -64,11 +64,11 @@ public QueryRunner> createRunner(final Seg @Override public QueryRunner> mergeRunners( - ExecutorService queryExecutor, + QueryProcessingPool queryProcessingPool, Iterable>> queryRunners ) { - return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners); + return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java index 3fbeb25aa7dd..51a7d1a5aa01 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -20,13 +20,12 @@ package org.apache.druid.query.groupby; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; @@ -35,8 +34,6 @@ import org.apache.druid.segment.Segment; import org.apache.druid.segment.StorageAdapter; -import java.util.concurrent.ExecutorService; - /** * */ @@ -63,13 +60,10 @@ public QueryRunner createRunner(final Segment segment) @Override public QueryRunner mergeRunners( - final ExecutorService exec, + final QueryProcessingPool queryProcessingPool, final Iterable> queryRunners ) { - // mergeRunners should take ListeningExecutorService at some point - final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec); - return new QueryRunner() { @Override @@ -77,7 +71,7 @@ public Sequence run(QueryPlus queryPlus, ResponseContext r { QueryRunner rowQueryRunner = strategySelector .strategize((GroupByQuery) queryPlus.getQuery()) - .mergeRunners(queryExecutor, queryRunners); + .mergeRunners(queryProcessingPool, queryRunners); return rowQueryRunner.run(queryPlus, responseContext); } }; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 7e8c49c6c03c..7c2eaa6d8805 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -29,8 +29,6 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.Releaser; @@ -43,11 +41,12 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.AbstractPrioritizedCallable; +import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable; import org.apache.druid.query.ChainedExecutionQueryRunner; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryTimeoutException; import org.apache.druid.query.QueryWatcher; @@ -64,7 +63,6 @@ import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -80,7 +78,7 @@ * similarities and differences. * * Used by - * {@link org.apache.druid.query.groupby.strategy.GroupByStrategyV2#mergeRunners(ListeningExecutorService, Iterable)}. + * {@link org.apache.druid.query.groupby.strategy.GroupByStrategyV2#mergeRunners(QueryProcessingPool, Iterable)} */ public class GroupByMergingQueryRunnerV2 implements QueryRunner { @@ -89,7 +87,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner private final GroupByQueryConfig config; private final Iterable> queryables; - private final ListeningExecutorService exec; + private final QueryProcessingPool queryProcessingPool; private final QueryWatcher queryWatcher; private final int concurrencyHint; private final BlockingPool mergeBufferPool; @@ -99,7 +97,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner public GroupByMergingQueryRunnerV2( GroupByQueryConfig config, - ExecutorService exec, + QueryProcessingPool queryProcessingPool, QueryWatcher queryWatcher, Iterable> queryables, int concurrencyHint, @@ -110,7 +108,7 @@ public GroupByMergingQueryRunnerV2( ) { this.config = config; - this.exec = MoreExecutors.listeningDecorator(exec); + this.queryProcessingPool = queryProcessingPool; this.queryWatcher = queryWatcher; this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); this.concurrencyHint = concurrencyHint; @@ -142,7 +140,7 @@ public Sequence run(final QueryPlus queryPlus, final Respo .withoutThreadUnsafeState(); if (QueryContexts.isBySegment(query) || forceChainedExecution) { - ChainedExecutionQueryRunner runner = new ChainedExecutionQueryRunner<>(exec, queryWatcher, queryables); + ChainedExecutionQueryRunner runner = new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryables); return runner.run(queryPlusForRunners, responseContext); } @@ -203,7 +201,7 @@ public CloseableGrouperIterator make() concurrencyHint, temporaryStorage, spillMapper, - exec, + queryProcessingPool, // Passed as executor service priority, hasTimeout, timeoutAt, @@ -229,8 +227,8 @@ public ListenableFuture apply(final QueryRunner inpu throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening"); } - ListenableFuture future = exec.submit( - new AbstractPrioritizedCallable(priority) + ListenableFuture future = queryProcessingPool.submitRunnerTask( + new AbstractPrioritizedQueryRunnerCallable(priority, input) { @Override public AggregateResult call() @@ -244,7 +242,7 @@ public AggregateResult call() ) { // Return true if OK, false if resources were exhausted. return input.run(queryPlusForRunners, responseContext) - .accumulate(AggregateResult.ok(), accumulator); + .accumulate(AggregateResult.ok(), accumulator); } catch (QueryInterruptedException | QueryTimeoutException e) { throw e; diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java index 4a892a47ec6a..df48c968724f 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategy.java @@ -19,10 +19,10 @@ package org.apache.druid.query.groupby.strategy; -import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.context.ResponseContext; @@ -33,8 +33,8 @@ import org.apache.druid.segment.StorageAdapter; import javax.annotation.Nullable; + import java.util.Comparator; -import java.util.concurrent.ExecutorService; import java.util.function.BinaryOperator; public interface GroupByStrategy @@ -56,7 +56,7 @@ public interface GroupByStrategy * * Used by {@link GroupByQueryQueryToolChest#getCacheStrategy(GroupByQuery)}. * - * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be + * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be * called on the cached by-segment results. Can be used to distinguish if we are running on * a broker or data node. * @@ -163,18 +163,17 @@ Sequence processSubtotalsSpec( /** * Merge a variety of single-segment query runners into a combined runner. Used by - * {@link org.apache.druid.query.groupby.GroupByQueryRunnerFactory#mergeRunners(ExecutorService, Iterable)}. In + * {@link org.apache.druid.query.groupby.GroupByQueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}. In * that sense, it is intended to go along with {@link #process(GroupByQuery, StorageAdapter)} (the runners created * by that method will be fed into this method). - * + *

* This method is only called on data servers, like Historicals (not the Broker). * - * @param exec executor service used for parallel execution of the query runners - * @param queryRunners collection of query runners to merge - * + * @param queryProcessingPool {@link QueryProcessingPool} service used for parallel execution of the query runners + * @param queryRunners collection of query runners to merge * @return merged query runner */ - QueryRunner mergeRunners(ListeningExecutorService exec, Iterable> queryRunners); + QueryRunner mergeRunners(QueryProcessingPool queryProcessingPool, Iterable> queryRunners); /** * Process a groupBy query on a single {@link StorageAdapter}. This is used by diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java index 18eaab21579c..72eafed7c930 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.guice.annotations.Global; @@ -34,6 +33,7 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.GroupByMergedQueryRunner; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -270,11 +270,11 @@ public Sequence processSubtotalsSpec( @Override public QueryRunner mergeRunners( - final ListeningExecutorService exec, + final QueryProcessingPool queryProcessingPool, final Iterable> queryRunners ) { - return new GroupByMergedQueryRunner<>(exec, configSupplier, queryWatcher, bufferPool, queryRunners); + return new GroupByMergedQueryRunner<>(queryProcessingPool, configSupplier, queryWatcher, bufferPool, queryRunners); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 23d904ac2ffb..c9d598e68a7a 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -25,7 +25,6 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.NonBlockingPool; @@ -47,6 +46,7 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.ResourceLimitExceededException; @@ -557,13 +557,13 @@ private static int numMergeBuffersNeededForSubtotalsSpec(GroupByQuery query) @Override public QueryRunner mergeRunners( - final ListeningExecutorService exec, + final QueryProcessingPool queryProcessingPool, final Iterable> queryRunners ) { return new GroupByMergingQueryRunnerV2( configSupplier.get(), - exec, + queryProcessingPool, queryWatcher, queryRunners, processingConfig.getNumThreads(), diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 24980fea8dd9..c07ab5d6b7a0 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -21,8 +21,6 @@ import com.google.common.base.Function; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.StringUtils; @@ -30,12 +28,13 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.AbstractPrioritizedCallable; +import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable; import org.apache.druid.query.ConcatQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryTimeoutException; @@ -58,7 +57,6 @@ import java.util.TreeMap; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -186,11 +184,10 @@ public Sequence run(QueryPlus inQ, ResponseCon @Override public QueryRunner mergeRunners( - ExecutorService exec, + QueryProcessingPool queryProcessingPool, Iterable> queryRunners ) { - final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec); return new ConcatQueryRunner( Sequences.map( Sequences.simple(queryRunners), @@ -210,8 +207,8 @@ public Sequence run( final Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); final QueryPlus threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState(); - final ListenableFuture> future = queryExecutor.submit( - new AbstractPrioritizedCallable>(priority) + ListenableFuture> future = queryProcessingPool.submitRunnerTask( + new AbstractPrioritizedQueryRunnerCallable, SegmentAnalysis>(priority, input) { @Override public Sequence call() diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index a5bab7d53e3f..e58973ef890a 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -35,6 +35,7 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; @@ -54,7 +55,6 @@ import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; public class ScanQueryRunnerFactory implements QueryRunnerFactory @@ -83,7 +83,7 @@ public QueryRunner createRunner(Segment segment) @Override public QueryRunner mergeRunners( - final ExecutorService queryExecutor, + final QueryProcessingPool queryProcessingPool, final Iterable> queryRunners ) { diff --git a/processing/src/main/java/org/apache/druid/query/search/SearchQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/search/SearchQueryRunnerFactory.java index 060ed785a81e..2f71e55eb8d3 100644 --- a/processing/src/main/java/org/apache/druid/query/search/SearchQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/search/SearchQueryRunnerFactory.java @@ -21,6 +21,7 @@ import com.google.inject.Inject; import org.apache.druid.query.ChainedExecutionQueryRunner; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; @@ -28,8 +29,6 @@ import org.apache.druid.query.Result; import org.apache.druid.segment.Segment; -import java.util.concurrent.ExecutorService; - /** */ public class SearchQueryRunnerFactory implements QueryRunnerFactory, SearchQuery> @@ -58,11 +57,11 @@ public QueryRunner> createRunner(final Segment segment @Override public QueryRunner> mergeRunners( - ExecutorService queryExecutor, + QueryProcessingPool queryProcessingPool, Iterable>> queryRunners ) { - return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners); + return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java index eabd80b65f7f..07775d452898 100644 --- a/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerFactory.java @@ -29,6 +29,7 @@ import org.apache.druid.query.ChainedExecutionQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerHelper; @@ -47,7 +48,6 @@ import java.util.Iterator; import java.util.List; -import java.util.concurrent.ExecutorService; /** */ @@ -71,11 +71,11 @@ public QueryRunner> createRunner(final Segment s @Override public QueryRunner> mergeRunners( - ExecutorService queryExecutor, + QueryProcessingPool queryProcessingPool, Iterable>> queryRunners ) { - return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners); + return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java index fd6651c45038..2d04905b3b29 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerFactory.java @@ -25,6 +25,7 @@ import org.apache.druid.query.ChainedExecutionQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; @@ -34,8 +35,6 @@ import org.apache.druid.segment.Segment; import org.apache.druid.segment.StorageAdapter; -import java.util.concurrent.ExecutorService; - /** */ public class TimeseriesQueryRunnerFactory @@ -65,11 +64,11 @@ public QueryRunner> createRunner(final Segment seg @Override public QueryRunner> mergeRunners( - ExecutorService queryExecutor, + QueryProcessingPool queryProcessingPool, Iterable>> queryRunners ) { - return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners); + return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryRunnerFactory.java index 7044afc72746..821d37e3a199 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryRunnerFactory.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.query.ChainedExecutionQueryRunner; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; @@ -35,7 +36,6 @@ import org.apache.druid.segment.Segment; import java.nio.ByteBuffer; -import java.util.concurrent.ExecutorService; /** */ @@ -82,11 +82,11 @@ public Sequence> run( @Override public QueryRunner> mergeRunners( - ExecutorService queryExecutor, + QueryProcessingPool queryProcessingPool, Iterable>> queryRunners ) { - return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners); + return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners); } @Override diff --git a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java index cf822927fc67..b1b5084a70c0 100644 --- a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -29,14 +30,20 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesResultValue; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; @@ -47,6 +54,7 @@ import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; public class ChainedExecutionQueryRunnerTest { @@ -112,10 +120,10 @@ public Void answer() ); ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>( - exec, + new ForwardingQueryProcessingPool(exec), watcher, Lists.newArrayList( - runners + runners ) ); TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() @@ -236,7 +244,7 @@ public Void answer() ); ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>( - exec, + new ForwardingQueryProcessingPool(exec), watcher, Lists.newArrayList( runners @@ -306,6 +314,35 @@ public void run() EasyMock.verify(watcher); } + @Test + public void testSubmittedTaskType() + { + QueryProcessingPool queryProcessingPool = Mockito.mock(QueryProcessingPool.class); + QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class); + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals("2014/2015") + .aggregators(Collections.singletonList(new CountAggregatorFactory("count"))) + .context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 100, "queryId", "test")) + .build(); + List>> runners = Arrays.asList( + Mockito.mock(QueryRunner.class), + Mockito.mock(QueryRunner.class) + ); + ChainedExecutionQueryRunner> chainedRunner = new ChainedExecutionQueryRunner<>( + queryProcessingPool, + watcher, + runners + ); + + Mockito.when(queryProcessingPool.submitRunnerTask(ArgumentMatchers.any())).thenReturn(Futures.immediateFuture(Collections.singletonList(123))); + chainedRunner.run(QueryPlus.wrap(query)).toList(); + ArgumentCaptor captor = ArgumentCaptor.forClass(PrioritizedQueryRunnerCallable.class); + Mockito.verify(queryProcessingPool, Mockito.times(2)).submitRunnerTask(captor.capture()); + List actual = captor.getAllValues().stream().map(PrioritizedQueryRunnerCallable::getRunner).collect(Collectors.toList()); + Assert.assertEquals(runners, actual); + } + private class DyingQueryRunner implements QueryRunner { private final CountDownLatch start; diff --git a/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java b/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java new file mode 100644 index 000000000000..71b3a1c9e0f3 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.druid.java.util.emitter.core.Emitter; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +public class MetricsEmittingQueryProcessingPoolTest +{ + @Test + public void testPrioritizedExecutorDelegate() + { + PrioritizedExecutorService service = Mockito.mock(PrioritizedExecutorService.class); + Mockito.when(service.getQueueSize()).thenReturn(10); + ExecutorServiceMonitor monitor = new ExecutorServiceMonitor(); + List events = new ArrayList<>(); + MetricsEmittingQueryProcessingPool processingPool = new MetricsEmittingQueryProcessingPool(service, monitor); + Assert.assertSame(service, processingPool.delegate()); + + ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", Mockito.mock(Emitter.class)) + { + @Override + public void emit(Event event) + { + events.add(event); + } + }; + monitor.doMonitor(serviceEmitter); + Assert.assertEquals(1, events.size()); + Assert.assertEquals(((ServiceMetricEvent) (events.get(0))).getMetric(), "segment/scan/pending"); + Assert.assertEquals(((ServiceMetricEvent) (events.get(0))).getValue(), 10); + } + + @Test + public void testNonPrioritizedExecutorDelegate() + { + ListeningExecutorService service = Mockito.mock(ListeningExecutorService.class); + ExecutorServiceMonitor monitor = new ExecutorServiceMonitor(); + MetricsEmittingQueryProcessingPool processingPool = new MetricsEmittingQueryProcessingPool(service, monitor); + Assert.assertSame(service, processingPool.delegate()); + + ServiceEmitter serviceEmitter = Mockito.mock(ServiceEmitter.class); + monitor.doMonitor(serviceEmitter); + Mockito.verifyNoInteractions(serviceEmitter); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index bfd40ce59353..bb947af92937 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -52,6 +52,7 @@ import org.apache.druid.query.BySegmentResultValue; import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.ChainedExecutionQueryRunner; +import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.QueryContexts; @@ -10960,7 +10961,7 @@ public void testTypeConversionWithMergingChainedExecutionRunner() ); ChainedExecutionQueryRunner ceqr = new ChainedExecutionQueryRunner( - Execs.directExecutor(), + DirectQueryProcessingPool.INSTANCE, (query1, future) -> { return; }, diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java index 59700160090a..b02ae366f580 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java @@ -36,7 +36,6 @@ import org.apache.druid.collections.StupidPool; import org.apache.druid.guice.annotations.Global; import org.apache.druid.guice.annotations.Merging; -import org.apache.druid.guice.annotations.Processing; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig; @@ -45,8 +44,9 @@ import org.apache.druid.offheap.OffheapBufferGenerator; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ExecutorServiceMonitor; -import org.apache.druid.query.MetricsEmittingExecutorService; +import org.apache.druid.query.MetricsEmittingQueryProcessingPool; import org.apache.druid.query.PrioritizedExecutorService; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.server.metrics.MetricsModule; import org.apache.druid.utils.JvmUtils; @@ -93,15 +93,14 @@ public CachePopulator getCachePopulator( } @Provides - @Processing @ManageLifecycle - public ExecutorService getProcessingExecutorService( + public QueryProcessingPool getProcessingExecutorPool( DruidProcessingConfig config, ExecutorServiceMonitor executorServiceMonitor, Lifecycle lifecycle ) { - return new MetricsEmittingExecutorService( + return new MetricsEmittingQueryProcessingPool( PrioritizedExecutorService.create( lifecycle, config diff --git a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java index fdce8e8e9641..59af28a4f541 100644 --- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java @@ -28,16 +28,16 @@ import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.guice.annotations.Global; import org.apache.druid.guice.annotations.Merging; -import org.apache.druid.guice.annotations.Processing; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ExecutorServiceMonitor; +import org.apache.druid.query.ForwardingQueryProcessingPool; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.server.metrics.MetricsModule; import java.nio.ByteBuffer; -import java.util.concurrent.ExecutorService; /** * This module is used to fulfill dependency injection of query processing and caching resources: buffer pools and @@ -58,14 +58,13 @@ public void configure(Binder binder) } @Provides - @Processing @ManageLifecycle - public ExecutorService getProcessingExecutorService(DruidProcessingConfig config) + public QueryProcessingPool getProcessingExecutorPool(DruidProcessingConfig config) { if (config.getNumThreadsConfigured() != ExecutorServiceConfig.DEFAULT_NUM_THREADS) { log.error("numThreads[%d] configured, that is ignored on Router", config.getNumThreadsConfigured()); } - return Execs.dummy(); + return new ForwardingQueryProcessingPool(Execs.dummy()); } @Provides diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java index ff8799daeba4..6a86f3eaa0d6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java @@ -25,6 +25,7 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; @@ -38,8 +39,6 @@ import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; import org.apache.druid.timeline.VersionedIntervalTimeline; -import java.util.concurrent.ExecutorService; - public class Appenderators { public static Appenderator createRealtime( @@ -54,7 +53,7 @@ public static Appenderator createRealtime( QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, - ExecutorService queryExecutorService, + QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, @@ -79,7 +78,7 @@ public static Appenderator createRealtime( objectMapper, emitter, conglomerate, - queryExecutorService, + queryProcessingPool, joinableFactory, Preconditions.checkNotNull(cache, "cache"), cacheConfig, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java index c5b22cfb1b57..0bbdd402e988 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java @@ -25,6 +25,7 @@ import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.SegmentDescriptor; @@ -39,8 +40,6 @@ import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Interval; -import java.util.concurrent.ExecutorService; - /** * This interface defines entities that create and manage potentially multiple {@link Appenderator} instances. * @@ -76,7 +75,7 @@ Appenderator createRealtimeAppenderatorForTask( QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, - ExecutorService queryExecutorService, + QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java index 6ee6b4bcc985..d99aa81672d8 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java @@ -25,8 +25,8 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.guice.annotations.Json; -import org.apache.druid.guice.annotations.Processing; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; @@ -42,14 +42,13 @@ import org.apache.druid.timeline.partition.ShardSpec; import java.io.File; -import java.util.concurrent.ExecutorService; public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory { private final ServiceEmitter emitter; private final QueryRunnerFactoryConglomerate conglomerate; private final DataSegmentAnnouncer segmentAnnouncer; - private final ExecutorService queryExecutorService; + private final QueryProcessingPool queryProcessingPool; private final JoinableFactory joinableFactory; private final DataSegmentPusher dataSegmentPusher; private final ObjectMapper jsonMapper; @@ -63,7 +62,7 @@ public DefaultRealtimeAppenderatorFactory( @JacksonInject ServiceEmitter emitter, @JacksonInject QueryRunnerFactoryConglomerate conglomerate, @JacksonInject DataSegmentAnnouncer segmentAnnouncer, - @JacksonInject @Processing ExecutorService queryExecutorService, + @JacksonInject QueryProcessingPool queryProcessingPool, @JacksonInject JoinableFactory joinableFactory, @JacksonInject DataSegmentPusher dataSegmentPusher, @JacksonInject @Json ObjectMapper jsonMapper, @@ -77,7 +76,7 @@ public DefaultRealtimeAppenderatorFactory( this.emitter = emitter; this.conglomerate = conglomerate; this.segmentAnnouncer = segmentAnnouncer; - this.queryExecutorService = queryExecutorService; + this.queryProcessingPool = queryProcessingPool; this.joinableFactory = joinableFactory; this.dataSegmentPusher = dataSegmentPusher; this.jsonMapper = jsonMapper; @@ -114,7 +113,7 @@ public Appenderator build( conglomerate, segmentAnnouncer, emitter, - queryExecutorService, + queryProcessingPool, joinableFactory, cache, cacheConfig, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java index f1e2f3c91347..35b088470321 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.SegmentDescriptor; @@ -40,8 +41,6 @@ import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Interval; -import java.util.concurrent.ExecutorService; - /** * This implementation is needed because Overlords and MiddleManagers operate on Task objects which * can require an AppenderatorsManager to be injected. @@ -67,7 +66,7 @@ public Appenderator createRealtimeAppenderatorForTask( QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, - ExecutorService queryExecutorService, + QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java index 88a4f5720c18..323122a8810c 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.SegmentDescriptor; @@ -40,8 +41,6 @@ import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Interval; -import java.util.concurrent.ExecutorService; - /** * Manages Appenderators for tasks running within a CliPeon process. * @@ -73,7 +72,7 @@ public Appenderator createRealtimeAppenderatorForTask( QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, - ExecutorService queryExecutorService, + QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, @@ -99,7 +98,7 @@ public Appenderator createRealtimeAppenderatorForTask( conglomerate, segmentAnnouncer, emitter, - queryExecutorService, + queryProcessingPool, joinableFactory, cache, cacheConfig, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 6097857cff8c..da626c1f7683 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -32,19 +32,20 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.CloseQuietly; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.CPUTimeMetricQueryRunner; +import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; @@ -71,7 +72,6 @@ import java.io.Closeable; import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -89,7 +89,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker private final ObjectMapper objectMapper; private final ServiceEmitter emitter; private final QueryRunnerFactoryConglomerate conglomerate; - private final ExecutorService queryExecutorService; + private final QueryProcessingPool queryProcessingPool; private final JoinableFactoryWrapper joinableFactoryWrapper; private final Cache cache; private final CacheConfig cacheConfig; @@ -101,7 +101,7 @@ public SinkQuerySegmentWalker( ObjectMapper objectMapper, ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, - ExecutorService queryExecutorService, + QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, @@ -113,7 +113,7 @@ public SinkQuerySegmentWalker( this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); this.emitter = Preconditions.checkNotNull(emitter, "emitter"); this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate"); - this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService"); + this.queryProcessingPool = Preconditions.checkNotNull(queryProcessingPool, "queryProcessingPool"); this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory); this.cache = Preconditions.checkNotNull(cache, "cache"); this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig"); @@ -272,7 +272,7 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final sinkSegmentId, descriptor.getInterval().getStart(), factory.mergeRunners( - Execs.directExecutor(), + DirectQueryProcessingPool.INSTANCE, perHydrantRunners ) ), @@ -287,7 +287,7 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final final QueryRunner mergedRunner = toolChest.mergeResults( factory.mergeRunners( - queryExecutorService, + queryProcessingPool, perSegmentRunners ) ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 3780c3735753..faba506b4e7d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -31,7 +31,6 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.guice.annotations.Processing; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.IAE; @@ -41,6 +40,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.SegmentDescriptor; @@ -76,7 +76,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; /** * Manages {@link Appenderator} instances for the CliIndexer task execution service, which runs all tasks in @@ -108,7 +107,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager private final Map datasourceBundles = new HashMap<>(); - private final ExecutorService queryExecutorService; + private final QueryProcessingPool queryProcessingPool; private final JoinableFactory joinableFactory; private final WorkerConfig workerConfig; private final Cache cache; @@ -122,7 +121,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager @Inject public UnifiedIndexerAppenderatorsManager( - @Processing ExecutorService queryExecutorService, + QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, WorkerConfig workerConfig, Cache cache, @@ -133,7 +132,7 @@ public UnifiedIndexerAppenderatorsManager( Provider queryRunnerFactoryConglomerateProvider ) { - this.queryExecutorService = queryExecutorService; + this.queryProcessingPool = queryProcessingPool; this.joinableFactory = joinableFactory; this.workerConfig = workerConfig; this.cache = cache; @@ -161,7 +160,7 @@ public Appenderator createRealtimeAppenderatorForTask( QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, - ExecutorService queryExecutorService, + QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, @@ -347,7 +346,7 @@ public DatasourceBundle( objectMapper, serviceEmitter, queryRunnerFactoryConglomerateProvider.get(), - queryExecutorService, + queryProcessingPool, joinableFactory, Preconditions.checkNotNull(cache, "cache"), cacheConfig, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java index 20d4b75868a9..0b2ce80ed963 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java @@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; @@ -46,7 +47,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; /** @@ -70,7 +70,7 @@ public FlushingPlumber( ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, - ExecutorService queryExecutorService, + QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, IndexMerger indexMerger, IndexIO indexIO, @@ -88,7 +88,7 @@ public FlushingPlumber( emitter, conglomerate, segmentAnnouncer, - queryExecutorService, + queryProcessingPool, joinableFactory, null, null, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumberSchool.java index da827bb1dc58..787ea26cbc63 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumberSchool.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -27,8 +27,8 @@ import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; -import org.apache.druid.guice.annotations.Processing; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; @@ -39,8 +39,6 @@ import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Duration; -import java.util.concurrent.ExecutorService; - /** * This plumber just drops segments at the end of a flush duration instead of handing them off. It is only useful if you want to run * a real time node without the rest of the Druid cluster. @@ -54,7 +52,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool private final ServiceEmitter emitter; private final QueryRunnerFactoryConglomerate conglomerate; private final DataSegmentAnnouncer segmentAnnouncer; - private final ExecutorService queryExecutorService; + private final QueryProcessingPool queryProcessingPool; private final JoinableFactory joinableFactory; private final IndexMergerV9 indexMergerV9; private final IndexIO indexIO; @@ -69,7 +67,7 @@ public FlushingPlumberSchool( @JacksonInject ServiceEmitter emitter, @JacksonInject QueryRunnerFactoryConglomerate conglomerate, @JacksonInject DataSegmentAnnouncer segmentAnnouncer, - @JacksonInject @Processing ExecutorService queryExecutorService, + @JacksonInject QueryProcessingPool queryProcessingPool, @JacksonInject JoinableFactory joinableFactory, @JacksonInject IndexMergerV9 indexMergerV9, @JacksonInject IndexIO indexIO, @@ -86,7 +84,7 @@ public FlushingPlumberSchool( segmentAnnouncer, null, null, - queryExecutorService, + queryProcessingPool, joinableFactory, indexMergerV9, indexIO, @@ -100,7 +98,7 @@ public FlushingPlumberSchool( this.emitter = emitter; this.conglomerate = conglomerate; this.segmentAnnouncer = segmentAnnouncer; - this.queryExecutorService = queryExecutorService; + this.queryProcessingPool = queryProcessingPool; this.joinableFactory = joinableFactory; this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); @@ -127,7 +125,7 @@ public Plumber findPlumber( emitter, conglomerate, segmentAnnouncer, - queryExecutorService, + queryProcessingPool, joinableFactory, indexMergerV9, indexIO, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index 0c97d856a0e8..ca0484c22d57 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -47,6 +47,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QuerySegmentWalker; @@ -138,7 +139,7 @@ public RealtimePlumber( ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, - ExecutorService queryExecutorService, + QueryProcessingPool queryProcessingPool, JoinableFactory joinableFactory, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, @@ -168,7 +169,7 @@ public RealtimePlumber( objectMapper, emitter, conglomerate, - queryExecutorService, + queryProcessingPool, joinableFactory, cache, cacheConfig, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 3f808f9817b7..e2ba02cbc0e1 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -26,8 +26,8 @@ import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; -import org.apache.druid.guice.annotations.Processing; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; @@ -40,8 +40,6 @@ import org.apache.druid.segment.realtime.SegmentPublisher; import org.apache.druid.server.coordination.DataSegmentAnnouncer; -import java.util.concurrent.ExecutorService; - /** * */ @@ -53,7 +51,7 @@ public class RealtimePlumberSchool implements PlumberSchool private final DataSegmentAnnouncer segmentAnnouncer; private final SegmentPublisher segmentPublisher; private final SegmentHandoffNotifierFactory handoffNotifierFactory; - private final ExecutorService queryExecutorService; + private final QueryProcessingPool queryProcessingPool; private final JoinableFactory joinableFactory; private final IndexMergerV9 indexMergerV9; private final IndexIO indexIO; @@ -70,7 +68,7 @@ public RealtimePlumberSchool( @JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject SegmentPublisher segmentPublisher, @JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory, - @JacksonInject @Processing ExecutorService executorService, + @JacksonInject QueryProcessingPool queryProcessingPool, @JacksonInject JoinableFactory joinableFactory, @JacksonInject IndexMergerV9 indexMergerV9, @JacksonInject IndexIO indexIO, @@ -86,7 +84,7 @@ public RealtimePlumberSchool( this.segmentAnnouncer = segmentAnnouncer; this.segmentPublisher = segmentPublisher; this.handoffNotifierFactory = handoffNotifierFactory; - this.queryExecutorService = executorService; + this.queryProcessingPool = queryProcessingPool; this.joinableFactory = joinableFactory; this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); @@ -113,7 +111,7 @@ public Plumber findPlumber( emitter, conglomerate, segmentAnnouncer, - queryExecutorService, + queryProcessingPool, joinableFactory, dataSegmentPusher, segmentPublisher, diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java index 5ae9700f34d9..5c71d8404372 100644 --- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java @@ -22,9 +22,9 @@ import com.google.inject.Inject; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.FluentQueryRunnerBuilder; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; @@ -104,7 +104,7 @@ public QueryRunner getQueryRunnerForIntervals(final Query query, final final QueryRunnerFactory> queryRunnerFactory = conglomerate.findFactory(query); final QueryRunner baseRunner = queryRunnerFactory.mergeRunners( - Execs.directExecutor(), + DirectQueryProcessingPool.INSTANCE, () -> StreamSupport.stream(segments.spliterator(), false) .map(segmentMapFn) .map(queryRunnerFactory::createRunner).iterator() diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java index ffdb38d8051e..7f2b8115f42e 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java @@ -26,7 +26,6 @@ import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulator; -import org.apache.druid.guice.annotations.Processing; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -44,6 +43,7 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; @@ -72,7 +72,6 @@ import java.util.Collections; import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -86,7 +85,7 @@ public class ServerManager implements QuerySegmentWalker private static final EmittingLogger log = new EmittingLogger(ServerManager.class); private final QueryRunnerFactoryConglomerate conglomerate; private final ServiceEmitter emitter; - private final ExecutorService exec; + private final QueryProcessingPool queryProcessingPool; private final CachePopulator cachePopulator; private final Cache cache; private final ObjectMapper objectMapper; @@ -99,7 +98,7 @@ public class ServerManager implements QuerySegmentWalker public ServerManager( QueryRunnerFactoryConglomerate conglomerate, ServiceEmitter emitter, - @Processing ExecutorService exec, + QueryProcessingPool queryProcessingPool, CachePopulator cachePopulator, @Smile ObjectMapper objectMapper, Cache cache, @@ -112,7 +111,7 @@ public ServerManager( this.conglomerate = conglomerate; this.emitter = emitter; - this.exec = exec; + this.queryProcessingPool = queryProcessingPool; this.cachePopulator = cachePopulator; this.cache = cache; this.objectMapper = objectMapper; @@ -229,7 +228,7 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable( - toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)), + toolChest.mergeResults(factory.mergeRunners(queryProcessingPool, queryRunners)), toolChest ), toolChest, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java index 1231b7721693..6a640ed7fdb4 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -37,6 +37,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.ForwardingQueryProcessingPool; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -284,7 +285,7 @@ ScanQuery.class, new ScanQueryRunnerFactory( ), new NoopDataSegmentAnnouncer(), emitter, - queryExecutor, + new ForwardingQueryProcessingPool(queryExecutor), NoopJoinableFactory.INSTANCE, MapCache.create(2048), new CacheConfig(), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java index 728e98026d83..daa14cae3560 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java @@ -26,9 +26,9 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.Druids; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; @@ -56,7 +56,7 @@ public class UnifiedIndexerAppenderatorsManagerTest public final ExpectedException expectedException = ExpectedException.none(); private final UnifiedIndexerAppenderatorsManager manager = new UnifiedIndexerAppenderatorsManager( - Execs.directExecutor(), + DirectQueryProcessingPool.INSTANCE, NoopJoinableFactory.INSTANCE, new WorkerConfig(), MapCache.create(10), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index f2944224a1e7..344b34754ba9 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -37,10 +37,10 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.segment.QueryableIndex; @@ -231,7 +231,7 @@ public void setUp() throws Exception announcer, segmentPublisher, handoffNotifierFactory, - Execs.directExecutor(), + DirectQueryProcessingPool.INSTANCE, NoopJoinableFactory.INSTANCE, TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory), TestHelper.getTestIndexIO(), diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index 6f8d5a63e9db..902acd8d02af 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -46,10 +46,12 @@ import org.apache.druid.query.DataSource; import org.apache.druid.query.DefaultQueryMetrics; import org.apache.druid.query.Druids; +import org.apache.druid.query.ForwardingQueryProcessingPool; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; @@ -101,6 +103,7 @@ import org.junit.rules.ExpectedException; import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -187,7 +190,7 @@ public > QueryRunnerFactory findFact } }, new NoopServiceEmitter(), - serverManagerExec, + new ForwardingQueryProcessingPool(serverManagerExec), new ForegroundCachePopulator(new DefaultObjectMapper(), new CachePopulatorStats(), -1), new DefaultObjectMapper(), new LocalCacheProvider().get(), @@ -737,7 +740,7 @@ public QueryRunner> createRunner(Segment adapter) @Override public QueryRunner> mergeRunners( - ExecutorService queryExecutor, + QueryProcessingPool queryProcessingPool, Iterable>> queryRunners ) { diff --git a/services/src/main/java/org/apache/druid/cli/DumpSegment.java b/services/src/main/java/org/apache/druid/cli/DumpSegment.java index eab178a1fff5..a9aa9c440f5a 100644 --- a/services/src/main/java/org/apache/druid/cli/DumpSegment.java +++ b/services/src/main/java/org/apache/druid/cli/DumpSegment.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -45,11 +46,11 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; @@ -477,14 +478,15 @@ public int columnCacheSizeBytes() ); } - private static Sequence executeQuery(final Injector injector, final QueryableIndex index, final Query query) + @VisibleForTesting + static Sequence executeQuery(final Injector injector, final QueryableIndex index, final Query query) { final QueryRunnerFactoryConglomerate conglomerate = injector.getInstance(QueryRunnerFactoryConglomerate.class); final QueryRunnerFactory> factory = conglomerate.findFactory(query); final QueryRunner runner = factory.createRunner(new QueryableIndexSegment(index, SegmentId.dummy("segment"))); return factory .getToolchest() - .mergeResults(factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner))) + .mergeResults(factory.mergeRunners(DirectQueryProcessingPool.INSTANCE, ImmutableList.of(runner))) .run(QueryPlus.wrap(query), ResponseContext.createEmpty()); } diff --git a/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java new file mode 100644 index 000000000000..e22c7a3286d6 --- /dev/null +++ b/services/src/test/java/org/apache/druid/cli/DumpSegmentTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.cli; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.DirectQueryProcessingPool; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerFactory; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.util.Collections; + +public class DumpSegmentTest +{ + @Test + public void testExecuteQuery() + { + Injector injector = Mockito.mock(Injector.class); + QueryRunnerFactoryConglomerate conglomerate = Mockito.mock(QueryRunnerFactoryConglomerate.class); + QueryRunnerFactory factory = Mockito.mock(QueryRunnerFactory.class, Mockito.RETURNS_DEEP_STUBS); + QueryRunner runner = Mockito.mock(QueryRunner.class); + QueryRunner mergeRunner = Mockito.mock(QueryRunner.class); + Query query = Mockito.mock(Query.class); + Sequence expected = Sequences.simple(Collections.singletonList(123)); + Mockito.when(injector.getInstance(QueryRunnerFactoryConglomerate.class)).thenReturn(conglomerate); + Mockito.when(conglomerate.findFactory(ArgumentMatchers.any())).thenReturn(factory); + Mockito.when(factory.createRunner(ArgumentMatchers.any())).thenReturn(runner); + Mockito.when(factory.getToolchest().mergeResults(factory.mergeRunners(DirectQueryProcessingPool.INSTANCE, ImmutableList.of(runner)))).thenReturn(mergeRunner); + Mockito.when(mergeRunner.run(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(expected); + Sequence actual = DumpSegment.executeQuery(injector, null, query); + Assert.assertSame(expected, actual); + } +}