Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
Expand Down Expand Up @@ -2884,7 +2885,7 @@ public void close()
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
this::makeTimeseriesAndScanConglomerate,
Execs.directExecutor(), // queryExecutorService
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.filter.SelectorDimFilter;
Expand Down Expand Up @@ -2971,7 +2972,7 @@ public void close()
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
this::makeTimeseriesOnlyConglomerate,
Execs.directExecutor(), // queryExecutorService
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
Expand Down Expand Up @@ -73,7 +74,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
* Stuff that may be needed by a Task in order to conduct its business.
Expand All @@ -99,7 +99,7 @@ public class TaskToolbox
private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
@Nullable
private final Provider<MonitorScheduler> monitorSchedulerProvider;
private final ExecutorService queryExecutorService;
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final SegmentLoader segmentLoader;
private final ObjectMapper jsonMapper;
Expand Down Expand Up @@ -141,7 +141,7 @@ public TaskToolbox(
DataSegmentServerAnnouncer serverAnnouncer,
SegmentHandoffNotifierFactory handoffNotifierFactory,
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
@Nullable Provider<MonitorScheduler> monitorSchedulerProvider,
SegmentLoader segmentLoader,
Expand Down Expand Up @@ -180,7 +180,7 @@ public TaskToolbox(
this.serverAnnouncer = serverAnnouncer;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
this.queryExecutorService = queryExecutorService;
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.monitorSchedulerProvider = monitorSchedulerProvider;
this.segmentLoader = segmentLoader;
Expand Down Expand Up @@ -268,9 +268,9 @@ public QueryRunnerFactoryConglomerate getQueryRunnerFactoryConglomerate()
return queryRunnerFactoryConglomerateProvider.get();
}

public ExecutorService getQueryExecutorService()
public QueryProcessingPool getQueryProcessingPool()
{
return queryExecutorService;
return queryProcessingPool;
}

public JoinableFactory getJoinableFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Parent;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.guice.annotations.RemoteChatHandler;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
Expand All @@ -44,6 +43,7 @@
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
Expand All @@ -62,7 +62,6 @@
import org.apache.druid.server.security.AuthorizerMapper;

import java.io.File;
import java.util.concurrent.ExecutorService;

/**
* Stuff that may be needed by a Task in order to conduct its business.
Expand All @@ -81,7 +80,7 @@ public class TaskToolboxFactory
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
private final ExecutorService queryExecutorService;
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final Provider<MonitorScheduler> monitorSchedulerProvider;
private final SegmentLoaderFactory segmentLoaderFactory;
Expand Down Expand Up @@ -122,7 +121,7 @@ public TaskToolboxFactory(
DataSegmentServerAnnouncer serverAnnouncer,
SegmentHandoffNotifierFactory handoffNotifierFactory,
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
@Processing ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
Provider<MonitorScheduler> monitorSchedulerProvider,
SegmentLoaderFactory segmentLoaderFactory,
Expand Down Expand Up @@ -160,7 +159,7 @@ public TaskToolboxFactory(
this.serverAnnouncer = serverAnnouncer;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
this.queryExecutorService = queryExecutorService;
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.monitorSchedulerProvider = monitorSchedulerProvider;
this.segmentLoaderFactory = segmentLoaderFactory;
Expand Down Expand Up @@ -202,7 +201,7 @@ public TaskToolbox build(Task task)
serverAnnouncer,
handoffNotifierFactory,
queryRunnerFactoryConglomerateProvider,
queryExecutorService,
queryProcessingPool,
joinableFactory,
monitorSchedulerProvider,
segmentLoaderFactory.manufacturate(taskWorkDir),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ private Appenderator newAppenderator(
toolbox.getQueryRunnerFactoryConglomerate(),
toolbox.getSegmentAnnouncer(),
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getQueryProcessingPool(),
toolbox.getJoinableFactory(),
toolbox.getCache(),
toolbox.getCacheConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public String getVersion(final Interval interval)
lockingSegmentAnnouncer,
segmentPublisher,
toolbox.getSegmentHandoffNotifierFactory(),
toolbox.getQueryExecutorService(),
toolbox.getQueryProcessingPool(),
toolbox.getJoinableFactory(),
toolbox.getIndexMergerV9(),
toolbox.getIndexIO(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public Appenderator newAppenderator(
toolbox.getQueryRunnerFactoryConglomerate(),
toolbox.getSegmentAnnouncer(),
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getQueryProcessingPool(),
toolbox.getJoinableFactory(),
toolbox.getCache(),
toolbox.getCacheConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
Expand Down Expand Up @@ -62,7 +63,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

public class TaskToolboxTest
{
Expand All @@ -81,7 +81,7 @@ public class TaskToolboxTest
private QueryRunnerFactoryConglomerate mockQueryRunnerFactoryConglomerate
= EasyMock.createMock(QueryRunnerFactoryConglomerate.class);
private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class);
private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class);
private QueryProcessingPool mockQueryProcessingPool = EasyMock.createMock(QueryProcessingPool.class);
private ObjectMapper ObjectMapper = new ObjectMapper();
private SegmentLoaderFactory mockSegmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class);
private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class);
Expand Down Expand Up @@ -126,7 +126,7 @@ public void setUp() throws IOException
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
mockHandoffNotifierFactory,
() -> mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService,
mockQueryProcessingPool,
NoopJoinableFactory.INSTANCE,
() -> mockMonitorScheduler,
mockSegmentLoaderFactory,
Expand Down Expand Up @@ -172,9 +172,9 @@ public void testGetQueryRunnerFactoryConglomerate()
}

@Test
public void testGetQueryExecutorService()
public void testGetQueryProcessingPool()
{
Assert.assertEquals(mockQueryExecutorService, taskToolbox.build(task).getQueryExecutorService());
Assert.assertEquals(mockQueryProcessingPool, taskToolbox.build(task).getQueryProcessingPool());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
Expand Down Expand Up @@ -1586,7 +1587,7 @@ public void close()
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> conglomerate,
Execs.directExecutor(), // queryExecutorService
DirectQueryProcessingPool.INSTANCE, // queryExecutorService
NoopJoinableFactory.INSTANCE,
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
Expand Down Expand Up @@ -986,7 +987,7 @@ public void close()
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> conglomerate,
Execs.directExecutor(), // queryExecutorService
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
Expand All @@ -43,8 +44,6 @@
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;

import java.util.concurrent.ExecutorService;

public class TestAppenderatorsManager implements AppenderatorsManager
{
private Appenderator realtimeAppenderator;
Expand All @@ -62,7 +61,7 @@ public Appenderator createRealtimeAppenderatorForTask(
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
Expand All @@ -83,7 +82,7 @@ public Appenderator createRealtimeAppenderatorForTask(
conglomerate,
segmentAnnouncer,
emitter,
queryExecutorService,
queryProcessingPool,
joinableFactory,
cache,
cacheConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
Expand All @@ -105,6 +104,8 @@
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
Expand Down Expand Up @@ -156,6 +157,7 @@
import org.junit.runners.Parameterized;

import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -660,7 +662,7 @@ public void unannounceSegments(Iterable<DataSegment> segments)
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
Execs.directExecutor(), // query executor service
DirectQueryProcessingPool.INSTANCE, // query executor service
NoopJoinableFactory.INSTANCE,
() -> monitorScheduler, // monitor scheduler
new SegmentLoaderFactory(null, new DefaultObjectMapper()),
Expand Down Expand Up @@ -1336,7 +1338,7 @@ public void testUnifiedAppenderatorsManagerCleanup() throws Exception
final ExecutorService exec = Executors.newFixedThreadPool(8);

UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager(
exec,
new ForwardingQueryProcessingPool(exec),
NoopJoinableFactory.INSTANCE,
new WorkerConfig(),
MapCache.create(2048),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
Expand All @@ -35,6 +34,7 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
Expand All @@ -55,7 +55,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

Expand Down Expand Up @@ -91,7 +90,7 @@ public class ServerManagerForQueryErrorTest extends ServerManager
public ServerManagerForQueryErrorTest(
QueryRunnerFactoryConglomerate conglomerate,
ServiceEmitter emitter,
@Processing ExecutorService exec,
QueryProcessingPool queryProcessingPool,
CachePopulator cachePopulator,
@Smile ObjectMapper objectMapper,
Cache cache,
Expand All @@ -104,7 +103,7 @@ public ServerManagerForQueryErrorTest(
super(
conglomerate,
emitter,
exec,
queryProcessingPool,
cachePopulator,
objectMapper,
cache,
Expand All @@ -116,7 +115,7 @@ public ServerManagerForQueryErrorTest(
}

@Override
<T> QueryRunner<T> buildQueryRunnerForSegment(
protected <T> QueryRunner<T> buildQueryRunnerForSegment(
Query<T> query,
SegmentDescriptor descriptor,
QueryRunnerFactory<T, Query<T>> factory,
Expand Down
Loading