From 080188cff79d10fc44b1fd7a3b7b62b1493c6c13 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 15 Jul 2025 22:08:23 -0700 Subject: [PATCH] Dart: Async queries to realtime servers. (#18241) * Dart: Async queries to realtime servers. Prior to this patch, queries from MSQ workers to data servers would be initiated in the processing pool, and would block the processing pool until results started coming in. This patch addresses it with the strategy: 1) Update DataServerClient to return a future that resolves when the response starts being written. 2) Split DataServerQueryHandler into DartDataServerQueryHandler and IndexerDataServerQueryHandler. The Dart version doesn't do retries and doesn't follow segments to other data servers. It just returns the async future from DataServerClient. The Indexer (tasks) version retains the prior logic and isn't really async. I didn't attempt to asyncify its retry logic in this patch. 3) Add ReturnOrAwait.awaitAllFutures, which allows processors to wait for a future to resolve. 4) Update ScanQueryFrameProcessor and GroupByPreShuffleFrameProcessor to give up the processing thread when waiting for a data server query to come back. Additionally, to simplify DataServerClient, cancellations are now issued without using a scheduled executor. There should be no need for this, because the service client is async. * Fix tests and checkstyle. * Fix exception checking. --- .../msq/dart/guice/DartWorkerModule.java | 20 + .../worker/DartDataServerQueryHandler.java | 153 +++++++ .../DartDataServerQueryHandlerFactory.java | 65 +++ .../worker/DartWorkerContextFactoryImpl.java | 19 +- .../msq/exec/DataServerQueryHandler.java | 354 +--------------- .../exec/DataServerQueryHandlerFactory.java | 68 +--- .../msq/exec/DataServerQueryHandlerUtils.java | 94 +++++ .../org/apache/druid/msq/exec/WorkerImpl.java | 1 - .../IndexerDataServerQueryHandler.java | 385 ++++++++++++++++++ .../IndexerDataServerQueryHandlerFactory.java | 70 ++++ .../msq/indexing/IndexerFrameContext.java | 4 +- .../msq/indexing/IndexerWorkerContext.java | 6 +- .../input/table/RichSegmentDescriptor.java | 8 + .../GroupByPreShuffleFrameProcessor.java | 25 +- .../scan/ScanQueryFrameProcessor.java | 26 +- .../druid/msq/exec/MSQLoadedSegmentTests.java | 93 +++-- .../IndexerDataServerQueryHandlerTest.java} | 87 ++-- .../processor/FrameProcessorExecutor.java | 30 +- .../druid/frame/processor/ReturnOrAwait.java | 113 +++-- .../processor/FrameProcessorExecutorTest.java | 65 +++ .../frame/processor/ReturnOrAwaitTest.java | 19 +- .../test/FutureWaitingProcessor.java | 104 +++++ .../druid/discovery/DataServerClient.java | 132 +++--- .../org/apache/druid/rpc/ServiceLocation.java | 12 + .../druid/discovery/DataServerClientTest.java | 11 +- .../embedded/EmbeddedDruidCluster.java | 1 + 26 files changed, 1338 insertions(+), 627 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerUtils.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java rename extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/{exec/DataServerQueryHandlerTest.java => indexing/IndexerDataServerQueryHandlerTest.java} (78%) create mode 100644 processing/src/test/java/org/apache/druid/frame/processor/test/FutureWaitingProcessor.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java index a1f4e06e948e..5e0a3898e7f5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java @@ -19,6 +19,7 @@ package org.apache.druid.msq.dart.guice; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.util.concurrent.MoreExecutors; @@ -35,8 +36,10 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycleAnnouncements; +import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.LoadScope; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.guice.annotations.Smile; import org.apache.druid.initialization.DruidModule; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -50,6 +53,7 @@ import org.apache.druid.msq.dart.controller.messages.ControllerMessage; import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; import org.apache.druid.msq.dart.worker.DartDataSegmentProvider; +import org.apache.druid.msq.dart.worker.DartDataServerQueryHandlerFactory; import org.apache.druid.msq.dart.worker.DartWorkerContextFactory; import org.apache.druid.msq.dart.worker.DartWorkerContextFactoryImpl; import org.apache.druid.msq.dart.worker.DartWorkerRunner; @@ -58,6 +62,8 @@ import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.msq.rpc.ResourcePermissionMapper; import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.QueryToolChestWarehouse; +import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.server.DruidNode; import org.apache.druid.server.security.AuthorizerMapper; @@ -156,6 +162,20 @@ public Outbox createOutbox() { return new OutboxImpl<>(); } + + @Provides + public DartDataServerQueryHandlerFactory createDataServerQueryHandlerFactory( + @EscalatedGlobal ServiceClientFactory serviceClientFactory, + @Smile ObjectMapper smileMapper, + QueryToolChestWarehouse queryToolChestWarehouse + ) + { + return new DartDataServerQueryHandlerFactory( + serviceClientFactory, + smileMapper, + queryToolChestWarehouse + ); + } } @Override diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java new file mode 100644 index 000000000000..4a6b67da6698 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.dart.worker; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.discovery.DataServerClient; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.exec.DataServerQueryHandler; +import org.apache.druid.msq.exec.DataServerQueryHandlerUtils; +import org.apache.druid.msq.exec.DataServerQueryResult; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; +import org.apache.druid.msq.input.table.RichSegmentDescriptor; +import org.apache.druid.query.Queries; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.QueryToolChestWarehouse; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.aggregation.MetricManipulatorFns; +import org.apache.druid.query.context.DefaultResponseContext; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; + +import java.util.Collections; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Dart implementation of {@link DataServerQueryHandler}. Issues queries asynchronously, with no retries. + */ +public class DartDataServerQueryHandler implements DataServerQueryHandler +{ + private final String dataSource; + private final ChannelCounters channelCounters; + private final ServiceClientFactory serviceClientFactory; + private final ObjectMapper objectMapper; + private final QueryToolChestWarehouse warehouse; + private final DataServerRequestDescriptor requestDescriptor; + + public DartDataServerQueryHandler( + String dataSource, + ChannelCounters channelCounters, + ServiceClientFactory serviceClientFactory, + ObjectMapper objectMapper, + QueryToolChestWarehouse warehouse, + DataServerRequestDescriptor requestDescriptor + ) + { + this.dataSource = dataSource; + this.channelCounters = channelCounters; + this.serviceClientFactory = serviceClientFactory; + this.objectMapper = objectMapper; + this.warehouse = warehouse; + this.requestDescriptor = requestDescriptor; + } + + /** + * {@inheritDoc} + * + * This method returns immediately. The returned future resolves when the server has started sending back + * its response. + * + * Queries are issued once, without retries. + */ + @Override + public ListenableFuture> fetchRowsFromDataServer( + Query query, + Function, Sequence> mappingFunction, + Closer closer + ) + { + final Query preparedQuery = + Queries.withSpecificSegments( + DataServerQueryHandlerUtils.prepareQuery(query, dataSource), + requestDescriptor.getSegments() + .stream() + .map(RichSegmentDescriptor::toPlainDescriptor) + .collect(Collectors.toList()) + ); + + final ServiceLocation serviceLocation = + ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata()); + final DataServerClient dataServerClient = makeDataServerClient(serviceLocation); + final QueryToolChest> toolChest = warehouse.getToolChest(query); + final Function preComputeManipulatorFn = + toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing()); + final JavaType queryResultType = toolChest.getBaseResultType(); + final ResponseContext responseContext = new DefaultResponseContext(); + + return FutureUtils.transform( + dataServerClient.run(preparedQuery, responseContext, queryResultType, closer), + resultSequence -> { + final Yielder yielder = DataServerQueryHandlerUtils.createYielder( + resultSequence.map(preComputeManipulatorFn), + mappingFunction, + channelCounters + ); + + final List missingSegments = + DataServerQueryHandlerUtils.getMissingSegments(responseContext); + + if (!missingSegments.isEmpty()) { + throw DruidException + .forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build( + "Segment[%s]%s not found on server[%s]. Please retry your query.", + missingSegments.get(0), + missingSegments.size() > 1 ? StringUtils.format(" and[%d] others", missingSegments.size() - 1) : "", + serviceLocation.getHostAndPort() + ); + } + + return new DataServerQueryResult<>( + Collections.singletonList(yielder), + Collections.emptyList(), + dataSource + ); + } + ); + } + + private DataServerClient makeDataServerClient(ServiceLocation serviceLocation) + { + return new DataServerClient(serviceClientFactory, serviceLocation, objectMapper); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java new file mode 100644 index 000000000000..a7fc2004f7bd --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.dart.worker; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.exec.DataServerQueryHandlerFactory; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; +import org.apache.druid.query.QueryToolChestWarehouse; +import org.apache.druid.rpc.ServiceClientFactory; + +/** + * Factory for {@link DartDataServerQueryHandler}. + */ +public class DartDataServerQueryHandlerFactory implements DataServerQueryHandlerFactory +{ + private final ServiceClientFactory serviceClientFactory; + private final ObjectMapper objectMapper; + private final QueryToolChestWarehouse warehouse; + + public DartDataServerQueryHandlerFactory( + ServiceClientFactory serviceClientFactory, + ObjectMapper objectMapper, + QueryToolChestWarehouse warehouse + ) + { + this.serviceClientFactory = serviceClientFactory; + this.objectMapper = objectMapper; + this.warehouse = warehouse; + } + + @Override + public DartDataServerQueryHandler createDataServerQueryHandler( + String dataSource, + ChannelCounters channelCounters, + DataServerRequestDescriptor requestDescriptor + ) + { + return new DartDataServerQueryHandler( + dataSource, + channelCounters, + serviceClientFactory, + objectMapper, + warehouse, + requestDescriptor + ); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java index b1e2fb925941..03d17b1bbd31 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.google.inject.Injector; -import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Self; @@ -31,14 +30,12 @@ import org.apache.druid.messages.server.Outbox; import org.apache.druid.msq.dart.Dart; import org.apache.druid.msq.dart.controller.messages.ControllerMessage; -import org.apache.druid.msq.exec.DataServerQueryHandlerFactory; import org.apache.druid.msq.exec.MemoryIntrospector; import org.apache.druid.msq.exec.ProcessingBuffersProvider; import org.apache.druid.msq.exec.WorkerContext; import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContext; -import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.policy.PolicyEnforcer; import org.apache.druid.rpc.ServiceClientFactory; @@ -65,8 +62,7 @@ public class DartWorkerContextFactoryImpl implements DartWorkerContextFactory private final MemoryIntrospector memoryIntrospector; private final ProcessingBuffersProvider processingBuffersProvider; private final Outbox outbox; - private final CoordinatorClient coordinatorClient; - private final QueryToolChestWarehouse warehouse; + private final DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory; private final ServiceEmitter emitter; @Inject @@ -84,8 +80,7 @@ public DartWorkerContextFactoryImpl( MemoryIntrospector memoryIntrospector, @Dart ProcessingBuffersProvider processingBuffersProvider, Outbox outbox, - CoordinatorClient coordinatorClient, - QueryToolChestWarehouse warehouse, + DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory, ServiceEmitter emitter ) { @@ -102,8 +97,7 @@ public DartWorkerContextFactoryImpl( this.memoryIntrospector = memoryIntrospector; this.processingBuffersProvider = processingBuffersProvider; this.outbox = outbox; - this.coordinatorClient = coordinatorClient; - this.warehouse = warehouse; + this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory; this.emitter = emitter; } @@ -132,12 +126,7 @@ public WorkerContext build( outbox, tempDir, queryContext, - new DataServerQueryHandlerFactory( - coordinatorClient, - serviceClientFactory, - jsonMapper, - warehouse - ), + dataServerQueryHandlerFactory, emitter ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java index 04458d701705..f9c8c14ad141 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java @@ -19,359 +19,43 @@ package org.apache.druid.msq.exec; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import org.apache.druid.client.ImmutableSegmentLoadInfo; -import org.apache.druid.client.coordinator.CoordinatorClient; -import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.discovery.DataServerClient; -import org.apache.druid.error.DruidException; -import org.apache.druid.java.util.common.RetryUtils; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.java.util.common.guava.Sequence; -import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.common.guava.Yielder; -import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.input.table.DataServerRequestDescriptor; -import org.apache.druid.msq.input.table.DataServerSelector; -import org.apache.druid.msq.input.table.RichSegmentDescriptor; -import org.apache.druid.query.Queries; import org.apache.druid.query.Query; -import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryToolChest; -import org.apache.druid.query.QueryToolChestWarehouse; -import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.MetricManipulationFn; -import org.apache.druid.query.aggregation.MetricManipulatorFns; -import org.apache.druid.query.context.DefaultResponseContext; -import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.rpc.RpcException; -import org.apache.druid.rpc.ServiceClientFactory; -import org.apache.druid.rpc.ServiceLocation; -import org.apache.druid.server.coordination.DruidServerMetadata; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; -import java.util.stream.Collectors; /** - * Class responsible for querying dataservers and retriving results for a given query. Also queries the coordinator - * to check if a segment has been handed off. + * Object for issuing native queries to data servers. This is created by a {@link DataServerQueryHandlerFactory}, + * and is used when MSQ is querying realtime data. It is required because realtime tasks are not currently able to + * execute MSQ logic themselves. */ -public class DataServerQueryHandler +public interface DataServerQueryHandler { - private static final Logger log = new Logger(DataServerQueryHandler.class); - private static final int DEFAULT_NUM_TRIES = 3; - private static final int PER_SERVER_QUERY_NUM_TRIES = 5; - private final String dataSource; - private final ChannelCounters channelCounters; - private final ServiceClientFactory serviceClientFactory; - private final CoordinatorClient coordinatorClient; - private final ObjectMapper objectMapper; - private final QueryToolChestWarehouse warehouse; - private final ScheduledExecutorService queryCancellationExecutor; - private final DataServerRequestDescriptor dataServerRequestDescriptor; - - public DataServerQueryHandler( - String dataSource, - ChannelCounters channelCounters, - ServiceClientFactory serviceClientFactory, - CoordinatorClient coordinatorClient, - ObjectMapper objectMapper, - QueryToolChestWarehouse warehouse, - ScheduledExecutorService queryCancellationExecutor, - DataServerRequestDescriptor dataServerRequestDescriptor - ) - { - this.dataSource = dataSource; - this.channelCounters = channelCounters; - this.serviceClientFactory = serviceClientFactory; - this.coordinatorClient = coordinatorClient; - this.objectMapper = objectMapper; - this.warehouse = warehouse; - this.queryCancellationExecutor = queryCancellationExecutor; - this.dataServerRequestDescriptor = dataServerRequestDescriptor; - } - - @VisibleForTesting - DataServerClient makeDataServerClient(ServiceLocation serviceLocation) - { - return new DataServerClient(serviceClientFactory, serviceLocation, objectMapper, queryCancellationExecutor); - } - /** - * Performs some necessary transforms to the query, so that the dataserver is able to understand it first. - * - Changing the datasource to a {@link TableDataSource} - * - Limiting the query to the required segments with {@link Queries#withSpecificSegments(Query, List)} - *
- * Then queries a data server and returns a {@link Yielder} for the results, retrying if needed. If a dataserver - * indicates that some segments were not found, checks with the coordinator to see if the segment was handed off. - * - If all the segments were handed off, returns a {@link DataServerQueryResult} with the yielder and list of handed - * off segments. - * - If some segments were not handed off, checks with the coordinator fetch an updated list of servers. This step is - * repeated up to {@link #DEFAULT_NUM_TRIES} times. - * - If the servers could not be found, checks if the segment was handed-off. If it was, returns a - * {@link DataServerQueryResult} with the yielder and list of handed off segments. Otherwise, throws an exception. - *
+ * Issues a query to the server and segments that were specified by the {@link DataServerRequestDescriptor} + * originally passed to {@link DataServerQueryHandlerFactory#createDataServerQueryHandler} when this instance + * was created. + * + * The query datasource is updated to refer to the specific segments from + * {@link DataServerRequestDescriptor#getSegments()}. + * * Also applies {@link QueryToolChest#makePreComputeManipulatorFn(Query, MetricManipulationFn)} and reports channel * metrics on the returned results. * - * @param result return type for the query from the data server - * @param type of the result rows after parsing from QueryType object + * @param query query to run + * @param mappingFunction function to apply to results + * @param closer will register query canceler with this closer + * @param result return type for the query from the data server + * @param type of the result rows after parsing from QueryType object */ - public DataServerQueryResult fetchRowsFromDataServer( + ListenableFuture> fetchRowsFromDataServer( Query query, Function, Sequence> mappingFunction, Closer closer - ) - { - // MSQ changes the datasource to a number datasource. This needs to be changed back for data servers to understand. - final Query preparedQuery = query.withDataSource(new TableDataSource(dataSource)); - final List> yielders = new ArrayList<>(); - final List handedOffSegments = new ArrayList<>(); - - List pendingRequests = ImmutableList.of(dataServerRequestDescriptor); - - final int maxRetries = preparedQuery.context().getNumRetriesOnMissingSegments(DEFAULT_NUM_TRIES); - int retryCount = 0; - - while (!pendingRequests.isEmpty()) { - final ResponseContext responseContext = new DefaultResponseContext(); - final Set processedSegments = new HashSet<>(); - for (DataServerRequestDescriptor descriptor : pendingRequests) { - log.info("Querying server [%s] for segments[%s]", descriptor.getServerMetadata(), descriptor.getSegments()); - processedSegments.addAll(descriptor.getSegments()); - Yielder yielder = fetchRowsFromDataServerInternal(descriptor, responseContext, closer, preparedQuery, mappingFunction); - - // Add results - if (yielder != null && !yielder.isDone()) { - yielders.add(yielder); - } - } - - // Check for missing segments - List missingSegments = getMissingSegments(responseContext); - if (missingSegments.isEmpty()) { - // No segments remaining. - break; - } - - final List handedOffSegmentDescriptors = checkSegmentHandoff(missingSegments); - - Set missingRichSegmentDescriptors = new HashSet<>(); - for (RichSegmentDescriptor richSegmentDescriptor : processedSegments) { - SegmentDescriptor segmentDescriptor = toSegmentDescriptorWithFullInterval(richSegmentDescriptor); - if (missingSegments.contains(segmentDescriptor)) { - if (handedOffSegmentDescriptors.contains(segmentDescriptor)) { - handedOffSegments.add(richSegmentDescriptor); - } else { - missingRichSegmentDescriptors.add(richSegmentDescriptor); - } - } - } - - pendingRequests = createNextPendingRequests( - missingRichSegmentDescriptors - ); - - if (!pendingRequests.isEmpty()) { - retryCount++; - if (retryCount > maxRetries) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Unable to fetch results from dataservers in [%d] retries.", retryCount); - } - } - } - - return new DataServerQueryResult<>(yielders, handedOffSegments, dataSource); - } - - private Yielder fetchRowsFromDataServerInternal( - final DataServerRequestDescriptor requestDescriptor, - final ResponseContext responseContext, - final Closer closer, - final Query query, - final Function, Sequence> mappingFunction - ) - { - final ServiceLocation serviceLocation = ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata()); - final DataServerClient dataServerClient = makeDataServerClient(serviceLocation); - final QueryToolChest> toolChest = warehouse.getToolChest(query); - final Function preComputeManipulatorFn = - toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing()); - final JavaType queryResultType = toolChest.getBaseResultType(); - final List segmentDescriptors = requestDescriptor.getSegments() - .stream() - .map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval) - .collect(Collectors.toList()); - - try { - return RetryUtils.retry( - () -> closer.register(createYielder( - dataServerClient.run( - Queries.withSpecificSegments( - query, - requestDescriptor.getSegments() - .stream() - .map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval) - .collect(Collectors.toList()) - ), responseContext, queryResultType, closer).map(preComputeManipulatorFn), mappingFunction)), - throwable -> !(throwable instanceof QueryInterruptedException - && throwable.getCause() instanceof InterruptedException), - PER_SERVER_QUERY_NUM_TRIES - ); - } - catch (QueryInterruptedException e) { - if (e.getCause() instanceof RpcException) { - // In the case that all the realtime servers for a segment are gone (for example, if they were scaled down), - // we would also be unable to fetch the segment. - responseContext.addMissingSegments(segmentDescriptors); - return Yielders.each(Sequences.empty()); - } else { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation); - } - } - catch (Exception e) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation); - } - } - - private Yielder createYielder( - final Sequence sequence, - final Function, Sequence> mappingFunction - ) - { - return Yielders.each( - mappingFunction.apply(sequence) - .map(row -> { - channelCounters.incrementRowCount(); - return row; - }) - ); - } - - private List createNextPendingRequests( - final Set richSegmentDescriptors - ) - { - final Map> serverVsSegmentsMap = new HashMap<>(); - - Iterable immutableSegmentLoadInfos = - coordinatorClient.fetchServerViewSegments( - dataSource, - richSegmentDescriptors.stream().map(RichSegmentDescriptor::getFullInterval).collect(Collectors.toList()) - ); - - Map segmentVsServerMap = new HashMap<>(); - immutableSegmentLoadInfos.forEach(immutableSegmentLoadInfo -> { - segmentVsServerMap.put(immutableSegmentLoadInfo.getSegment().toDescriptor(), immutableSegmentLoadInfo); - }); - - for (RichSegmentDescriptor richSegmentDescriptor : richSegmentDescriptors) { - SegmentDescriptor segmentDescriptorWithFullInterval = toSegmentDescriptorWithFullInterval(richSegmentDescriptor); - if (!segmentVsServerMap.containsKey(segmentDescriptorWithFullInterval)) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Could not find a server for segment[%s]", richSegmentDescriptor); - } - - ImmutableSegmentLoadInfo segmentLoadInfo = segmentVsServerMap.get(segmentDescriptorWithFullInterval); - if (segmentLoadInfo.getSegment().toDescriptor().equals(segmentDescriptorWithFullInterval)) { - Set servers = segmentLoadInfo.getServers() - .stream() - .filter(druidServerMetadata -> SegmentSource.REALTIME.getUsedServerTypes() - .contains(druidServerMetadata.getType())) - .collect(Collectors.toSet()); - if (servers.isEmpty()) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Could not find a server matching includeSegmentSource[%s] for segment[%s]. Only found servers [%s]", - SegmentSource.REALTIME, richSegmentDescriptor, servers); - } - - DruidServerMetadata druidServerMetadata = DataServerSelector.RANDOM.getSelectServerFunction().apply(servers); - serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>()); - SegmentDescriptor descriptor = segmentLoadInfo.getSegment().toDescriptor(); - serverVsSegmentsMap.get(druidServerMetadata) - .add(new RichSegmentDescriptor(richSegmentDescriptor.getFullInterval(), richSegmentDescriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber())); - } - } - - final List requestDescriptors = new ArrayList<>(); - for (Map.Entry> druidServerMetadataSetEntry : serverVsSegmentsMap.entrySet()) { - DataServerRequestDescriptor dataServerRequest = new DataServerRequestDescriptor( - druidServerMetadataSetEntry.getKey(), - ImmutableList.copyOf(druidServerMetadataSetEntry.getValue()) - ); - requestDescriptors.add(dataServerRequest); - } - - return requestDescriptors; - } - - /** - * Retreives the list of missing segments from the response context. - */ - private static List getMissingSegments(final ResponseContext responseContext) - { - List missingSegments = responseContext.getMissingSegments(); - if (missingSegments == null) { - return ImmutableList.of(); - } - return missingSegments; - } - - /** - * Queries the coordinator to check if a list of segments has been handed off. - * Returns a list of segments which have been handed off. - *
- * See {@link org.apache.druid.server.http.DataSourcesResource#isHandOffComplete(String, String, int, String)} - */ - private List checkSegmentHandoff(List segmentDescriptors) - { - try { - List handedOffSegments = new ArrayList<>(); - - for (SegmentDescriptor segmentDescriptor : segmentDescriptors) { - Boolean wasHandedOff = FutureUtils.get( - coordinatorClient.isHandoffComplete(dataSource, segmentDescriptor), - true - ); - if (Boolean.TRUE.equals(wasHandedOff)) { - handedOffSegments.add(segmentDescriptor); - } - } - return handedOffSegments; - } - catch (Exception e) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(e, "Could not contact coordinator"); - } - } - - static SegmentDescriptor toSegmentDescriptorWithFullInterval(RichSegmentDescriptor richSegmentDescriptor) - { - return new SegmentDescriptor( - richSegmentDescriptor.getFullInterval(), - richSegmentDescriptor.getVersion(), - richSegmentDescriptor.getPartitionNumber() - ); - } + ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java index 1caed919ef04..245c078a1c83 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java @@ -19,79 +19,17 @@ package org.apache.druid.msq.exec; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.client.coordinator.CoordinatorClient; -import org.apache.druid.java.util.common.RE; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.input.table.DataServerRequestDescriptor; -import org.apache.druid.query.QueryToolChestWarehouse; -import org.apache.druid.rpc.ServiceClientFactory; - -import java.io.Closeable; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; /** * Creates new instances of {@link DataServerQueryHandler} and manages the cancellation threadpool. */ -public class DataServerQueryHandlerFactory implements Closeable +public interface DataServerQueryHandlerFactory { - private static final Logger log = new Logger(DataServerQueryHandlerFactory.class); - private static final int DEFAULT_THREAD_COUNT = 4; - private final CoordinatorClient coordinatorClient; - private final ServiceClientFactory serviceClientFactory; - private final ObjectMapper objectMapper; - private final QueryToolChestWarehouse warehouse; - private final ScheduledExecutorService queryCancellationExecutor; - - public DataServerQueryHandlerFactory( - CoordinatorClient coordinatorClient, - ServiceClientFactory serviceClientFactory, - ObjectMapper objectMapper, - QueryToolChestWarehouse warehouse - ) - { - this.coordinatorClient = coordinatorClient; - this.serviceClientFactory = serviceClientFactory; - this.objectMapper = objectMapper; - this.warehouse = warehouse; - this.queryCancellationExecutor = ScheduledExecutors.fixed(DEFAULT_THREAD_COUNT, "query-cancellation-executor"); - } - - public DataServerQueryHandler createDataServerQueryHandler( + DataServerQueryHandler createDataServerQueryHandler( String dataSource, ChannelCounters channelCounters, DataServerRequestDescriptor dataServerRequestDescriptor - ) - { - return new DataServerQueryHandler( - dataSource, - channelCounters, - serviceClientFactory, - coordinatorClient, - objectMapper, - warehouse, - queryCancellationExecutor, - dataServerRequestDescriptor - ); - } - - @Override - public void close() - { - // Wait for all query cancellations to be complete. - log.info("Waiting for any data server queries to be canceled."); - queryCancellationExecutor.shutdown(); - try { - if (!queryCancellationExecutor.awaitTermination(1, TimeUnit.MINUTES)) { - log.error("Unable to cancel all ongoing queries."); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RE(e); - } - } + ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerUtils.java new file mode 100644 index 000000000000..ce63f8100e1f --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerUtils.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.discovery.DataServerClient; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.query.Queries; +import org.apache.druid.query.Query; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.context.ResponseContext; + +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** + * Static utility functions for {@link DataServerQueryHandler} implementations. + */ +public class DataServerQueryHandlerUtils +{ + private DataServerQueryHandlerUtils() + { + // No instantiation. + } + + /** + * Performs necessary transforms to a query destined for data servers. Does not update the list of segments; callers + * should do this themselves using {@link Queries#withSpecificSegments(Query, List)}. + * + * @param query the query + * @param dataSource datasource name + */ + public static > Query prepareQuery(final T query, final String dataSource) + { + // MSQ changes the datasource to an inputNumber datasource. This needs to be changed back for data servers + // to understand. + + // BUG: This transformation is incorrect; see https://github.com/apache/druid/issues/18198. It loses decorations + // such as join, unnest, etc. + return query.withDataSource(new TableDataSource(dataSource)); + } + + /** + * Given results from {@link DataServerClient#run}, returns a {@link Yielder} that applies the provided + * mapping function and increments the row count on the provided {@link ChannelCounters}. + */ + public static Yielder createYielder( + final Sequence sequence, + final Function, Sequence> mappingFunction, + final ChannelCounters channelCounters + ) + { + return Yielders.each( + mappingFunction.apply(sequence) + .map(row -> { + channelCounters.incrementRowCount(); + return row; + }) + ); + } + + /** + * Retreives the list of missing segments from the response context. + */ + public static List getMissingSegments(final ResponseContext responseContext) + { + List missingSegments = responseContext.getMissingSegments(); + if (missingSegments == null) { + return Collections.emptyList(); + } + return missingSegments; + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index e7131411556b..66d7fb1cc232 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -253,7 +253,6 @@ private Optional runInternal(final KernelHolders kernelHolders, throws Exception { context.registerWorker(this, workerCloser); - workerCloser.register(context.dataServerQueryHandlerFactory()); this.workerClient = workerCloser.register(new ExceptionWrappingWorkerClient(context.makeWorkerClient())); final FrameProcessorExecutor workerExec = new FrameProcessorExecutor(makeProcessingPool()); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java new file mode 100644 index 000000000000..b8b2afd51370 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.ImmutableSegmentLoadInfo; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.discovery.DataServerClient; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.exec.DataServerQueryHandler; +import org.apache.druid.msq.exec.DataServerQueryHandlerUtils; +import org.apache.druid.msq.exec.DataServerQueryResult; +import org.apache.druid.msq.exec.SegmentSource; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; +import org.apache.druid.msq.input.table.DataServerSelector; +import org.apache.druid.msq.input.table.RichSegmentDescriptor; +import org.apache.druid.query.Queries; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.QueryToolChestWarehouse; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.aggregation.MetricManipulatorFns; +import org.apache.druid.query.context.DefaultResponseContext; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.rpc.RpcException; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.server.coordination.DruidServerMetadata; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Task implementation of {@link DataServerQueryHandler}. Implements retry logic as described in + * {@link #fetchRowsFromDataServer(Query, Function, Closer)}. + */ +public class IndexerDataServerQueryHandler implements DataServerQueryHandler +{ + private static final Logger log = new Logger(IndexerDataServerQueryHandler.class); + private static final int DEFAULT_NUM_TRIES = 3; + private static final int PER_SERVER_QUERY_NUM_TRIES = 5; + private final String dataSource; + private final ChannelCounters channelCounters; + private final ServiceClientFactory serviceClientFactory; + private final CoordinatorClient coordinatorClient; + private final ObjectMapper objectMapper; + private final QueryToolChestWarehouse warehouse; + private final DataServerRequestDescriptor dataServerRequestDescriptor; + + public IndexerDataServerQueryHandler( + String dataSource, + ChannelCounters channelCounters, + ServiceClientFactory serviceClientFactory, + CoordinatorClient coordinatorClient, + ObjectMapper objectMapper, + QueryToolChestWarehouse warehouse, + DataServerRequestDescriptor dataServerRequestDescriptor + ) + { + this.dataSource = dataSource; + this.channelCounters = channelCounters; + this.serviceClientFactory = serviceClientFactory; + this.coordinatorClient = coordinatorClient; + this.objectMapper = objectMapper; + this.warehouse = warehouse; + this.dataServerRequestDescriptor = dataServerRequestDescriptor; + } + + @VisibleForTesting + DataServerClient makeDataServerClient(ServiceLocation serviceLocation) + { + return new DataServerClient(serviceClientFactory, serviceLocation, objectMapper); + } + + /** + * {@inheritDoc} + * + * This method blocks until servers start sending their results back. When the method returns, the future is + * immediately resolved. This means that on tasks, queries to realtime servers block the processing pool. + * In principle, however, it should be possible to make the logic in this class asynchronous. + * + * Queries are retried if needed. If a data server indicates that some segments were not found, this function checks + * with the coordinator to see if the segment was handed off. + *
    + *
  • If all the segments were handed off, returns a {@link DataServerQueryResult} with the yielder and list of + * handed-off segments.
  • + *
  • If some segments were not handed off, checks with the coordinator to fetch an updated list of servers. + * This step is repeated up to {@link #DEFAULT_NUM_TRIES} times.
  • + *
  • If the servers could not be found, checks if the segment was handed-off. If it was, returns a + * {@link DataServerQueryResult} with the yielder and list of handed off segments. Otherwise, + * throws an exception.
  • + *
+ * + * @param result return type for the query from the data server + * @param type of the result rows after parsing from QueryType object + */ + @Override + public ListenableFuture> fetchRowsFromDataServer( + Query query, + Function, Sequence> mappingFunction, + Closer closer + ) + { + final Query preparedQuery = DataServerQueryHandlerUtils.prepareQuery(query, dataSource); + final List> yielders = new ArrayList<>(); + final List handedOffSegments = new ArrayList<>(); + + List pendingRequests = ImmutableList.of(dataServerRequestDescriptor); + + final int maxRetries = preparedQuery.context().getNumRetriesOnMissingSegments(DEFAULT_NUM_TRIES); + int retryCount = 0; + + while (!pendingRequests.isEmpty()) { + final ResponseContext responseContext = new DefaultResponseContext(); + final Set processedSegments = new HashSet<>(); + for (DataServerRequestDescriptor descriptor : pendingRequests) { + log.info("Querying server [%s] for segments[%s]", descriptor.getServerMetadata(), descriptor.getSegments()); + processedSegments.addAll(descriptor.getSegments()); + Yielder yielder = fetchRowsFromDataServerInternal( + descriptor, + responseContext, + closer, + preparedQuery, + mappingFunction + ); + + // Add results + if (yielder != null && !yielder.isDone()) { + yielders.add(yielder); + } + } + + // Check for missing segments + List missingSegments = DataServerQueryHandlerUtils.getMissingSegments(responseContext); + if (missingSegments.isEmpty()) { + // No segments remaining. + break; + } + + final List handedOffSegmentDescriptors = checkSegmentHandoff(missingSegments); + + Set missingRichSegmentDescriptors = new HashSet<>(); + for (RichSegmentDescriptor richSegmentDescriptor : processedSegments) { + SegmentDescriptor segmentDescriptor = toSegmentDescriptorWithFullInterval(richSegmentDescriptor); + if (missingSegments.contains(segmentDescriptor)) { + if (handedOffSegmentDescriptors.contains(segmentDescriptor)) { + handedOffSegments.add(richSegmentDescriptor); + } else { + missingRichSegmentDescriptors.add(richSegmentDescriptor); + } + } + } + + pendingRequests = createNextPendingRequests( + missingRichSegmentDescriptors + ); + + if (!pendingRequests.isEmpty()) { + retryCount++; + if (retryCount > maxRetries) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Unable to fetch results from dataservers in [%d] retries.", retryCount); + } + } + } + + // Not actually async. The retry logic above is written in synchronous fashion. Just return an immediate-future + // when we actually have all queries issued and all yielders set up. + return Futures.immediateFuture(new DataServerQueryResult<>(yielders, handedOffSegments, dataSource)); + } + + private Yielder fetchRowsFromDataServerInternal( + final DataServerRequestDescriptor requestDescriptor, + final ResponseContext responseContext, + final Closer closer, + final Query query, + final Function, Sequence> mappingFunction + ) + { + final ServiceLocation serviceLocation = ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata()); + final DataServerClient dataServerClient = makeDataServerClient(serviceLocation); + final QueryToolChest> toolChest = warehouse.getToolChest(query); + final Function preComputeManipulatorFn = + toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing()); + final JavaType queryResultType = toolChest.getBaseResultType(); + final List segmentDescriptors = + requestDescriptor.getSegments() + .stream() + .map(IndexerDataServerQueryHandler::toSegmentDescriptorWithFullInterval) + .collect(Collectors.toList()); + + try { + return RetryUtils.retry( + () -> { + final ListenableFuture> queryFuture = dataServerClient.run( + Queries.withSpecificSegments( + query, + requestDescriptor.getSegments() + .stream() + .map(RichSegmentDescriptor::toPlainDescriptor) + .collect(Collectors.toList()) + ), + responseContext, + queryResultType, + closer + ); + + return closer.register( + DataServerQueryHandlerUtils.createYielder( + queryFuture.get().map(preComputeManipulatorFn), + mappingFunction, + channelCounters + ) + ); + }, + throwable -> !(throwable instanceof ExecutionException + && throwable.getCause() instanceof QueryInterruptedException + && throwable.getCause().getCause() instanceof InterruptedException), + PER_SERVER_QUERY_NUM_TRIES + ); + } + catch (ExecutionException e) { + if (e.getCause() instanceof QueryInterruptedException && e.getCause().getCause() instanceof RpcException) { + // In the case that all the realtime servers for a segment are gone (for example, if they were scaled down), + // we would also be unable to fetch the segment. + responseContext.addMissingSegments(segmentDescriptors); + return Yielders.each(Sequences.empty()); + } else { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation); + } + } + catch (Exception e) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation); + } + } + + private List createNextPendingRequests( + final Set richSegmentDescriptors + ) + { + final Map> serverVsSegmentsMap = new HashMap<>(); + + Iterable immutableSegmentLoadInfos = + coordinatorClient.fetchServerViewSegments( + dataSource, + richSegmentDescriptors.stream().map(RichSegmentDescriptor::getFullInterval).collect(Collectors.toList()) + ); + + Map segmentVsServerMap = new HashMap<>(); + immutableSegmentLoadInfos.forEach(immutableSegmentLoadInfo -> { + segmentVsServerMap.put(immutableSegmentLoadInfo.getSegment().toDescriptor(), immutableSegmentLoadInfo); + }); + + for (RichSegmentDescriptor richSegmentDescriptor : richSegmentDescriptors) { + SegmentDescriptor segmentDescriptorWithFullInterval = toSegmentDescriptorWithFullInterval(richSegmentDescriptor); + if (!segmentVsServerMap.containsKey(segmentDescriptorWithFullInterval)) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Could not find a server for segment[%s]", richSegmentDescriptor); + } + + ImmutableSegmentLoadInfo segmentLoadInfo = segmentVsServerMap.get(segmentDescriptorWithFullInterval); + if (segmentLoadInfo.getSegment().toDescriptor().equals(segmentDescriptorWithFullInterval)) { + Set servers = segmentLoadInfo.getServers() + .stream() + .filter(druidServerMetadata -> SegmentSource.REALTIME.getUsedServerTypes() + .contains( + druidServerMetadata.getType())) + .collect(Collectors.toSet()); + if (servers.isEmpty()) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build( + "Could not find a server matching includeSegmentSource[%s] for segment[%s]. Only found servers [%s]", + SegmentSource.REALTIME, richSegmentDescriptor, servers + ); + } + + DruidServerMetadata druidServerMetadata = DataServerSelector.RANDOM.getSelectServerFunction().apply(servers); + serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>()); + SegmentDescriptor descriptor = segmentLoadInfo.getSegment().toDescriptor(); + serverVsSegmentsMap.get(druidServerMetadata) + .add(new RichSegmentDescriptor( + richSegmentDescriptor.getFullInterval(), + richSegmentDescriptor.getInterval(), + descriptor.getVersion(), + descriptor.getPartitionNumber() + )); + } + } + + final List requestDescriptors = new ArrayList<>(); + for (Map.Entry> druidServerMetadataSetEntry : serverVsSegmentsMap.entrySet()) { + DataServerRequestDescriptor dataServerRequest = new DataServerRequestDescriptor( + druidServerMetadataSetEntry.getKey(), + ImmutableList.copyOf(druidServerMetadataSetEntry.getValue()) + ); + requestDescriptors.add(dataServerRequest); + } + + return requestDescriptors; + } + + /** + * Queries the coordinator to check if a list of segments has been handed off. + * Returns a list of segments which have been handed off. + *
+ * See {@link org.apache.druid.server.http.DataSourcesResource#isHandOffComplete(String, String, int, String)} + */ + private List checkSegmentHandoff(List segmentDescriptors) + { + try { + List handedOffSegments = new ArrayList<>(); + + for (SegmentDescriptor segmentDescriptor : segmentDescriptors) { + Boolean wasHandedOff = FutureUtils.get( + coordinatorClient.isHandoffComplete(dataSource, segmentDescriptor), + true + ); + if (Boolean.TRUE.equals(wasHandedOff)) { + handedOffSegments.add(segmentDescriptor); + } + } + return handedOffSegments; + } + catch (Exception e) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Could not contact coordinator"); + } + } + + static SegmentDescriptor toSegmentDescriptorWithFullInterval(RichSegmentDescriptor richSegmentDescriptor) + { + return new SegmentDescriptor( + richSegmentDescriptor.getFullInterval(), + richSegmentDescriptor.getVersion(), + richSegmentDescriptor.getPartitionNumber() + ); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java new file mode 100644 index 000000000000..46e06a5484b9 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.exec.DataServerQueryHandlerFactory; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; +import org.apache.druid.query.QueryToolChestWarehouse; +import org.apache.druid.rpc.ServiceClientFactory; + +/** + * Creates new instances of {@link IndexerDataServerQueryHandler}. + */ +public class IndexerDataServerQueryHandlerFactory implements DataServerQueryHandlerFactory +{ + private final CoordinatorClient coordinatorClient; + private final ServiceClientFactory serviceClientFactory; + private final ObjectMapper objectMapper; + private final QueryToolChestWarehouse warehouse; + + public IndexerDataServerQueryHandlerFactory( + CoordinatorClient coordinatorClient, + ServiceClientFactory serviceClientFactory, + ObjectMapper objectMapper, + QueryToolChestWarehouse warehouse + ) + { + this.coordinatorClient = coordinatorClient; + this.serviceClientFactory = serviceClientFactory; + this.objectMapper = objectMapper; + this.warehouse = warehouse; + } + + @Override + public IndexerDataServerQueryHandler createDataServerQueryHandler( + String dataSource, + ChannelCounters channelCounters, + DataServerRequestDescriptor requestDescriptor + ) + { + return new IndexerDataServerQueryHandler( + dataSource, + channelCounters, + serviceClientFactory, + coordinatorClient, + objectMapper, + warehouse, + requestDescriptor + ); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java index 9e2c9d410742..368709b7eff3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java @@ -50,7 +50,7 @@ public class IndexerFrameContext implements FrameContext private final ResourceHolder processingBuffers; private final WorkerMemoryParameters memoryParameters; private final WorkerStorageParameters storageParameters; - private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory; + private final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory; public IndexerFrameContext( StageId stageId, @@ -59,7 +59,7 @@ public IndexerFrameContext( IndexIO indexIO, DataSegmentProvider dataSegmentProvider, ResourceHolder processingBuffers, - DataServerQueryHandlerFactory dataServerQueryHandlerFactory, + IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory, WorkerMemoryParameters memoryParameters, WorkerStorageParameters storageParameters ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 035544401f14..6c55a6add951 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -77,7 +77,7 @@ public class IndexerWorkerContext implements WorkerContext private final ServiceLocator controllerLocator; private final IndexIO indexIO; private final TaskDataSegmentProvider dataSegmentProvider; - private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory; + private final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory; private final ServiceClientFactory clientFactory; private final MemoryIntrospector memoryIntrospector; private final ProcessingBuffersProvider processingBuffersProvider; @@ -98,7 +98,7 @@ public IndexerWorkerContext( final ServiceClientFactory clientFactory, final MemoryIntrospector memoryIntrospector, final ProcessingBuffersProvider processingBuffersProvider, - final DataServerQueryHandlerFactory dataServerQueryHandlerFactory + final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory ) { this.task = task; @@ -158,7 +158,7 @@ public static IndexerWorkerContext createProductionInstance( serviceClientFactory, memoryIntrospector, processingBuffersProvider, - new DataServerQueryHandlerFactory( + new IndexerDataServerQueryHandlerFactory( toolbox.getCoordinatorClient(), serviceClientFactory, smileMapper, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java index 27f5202b6ce2..869464df04da 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java @@ -93,6 +93,14 @@ public Interval getFullIntervalForJson() return fullInterval; } + /** + * Returns a plain descriptor, with the {@link #fullInterval} field dropped. + */ + public SegmentDescriptor toPlainDescriptor() + { + return new SegmentDescriptor(getInterval(), getVersion(), getPartitionNumber()); + } + @Override public boolean equals(Object o) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java index db1259f21e50..879386e87071 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java @@ -20,8 +20,10 @@ package org.apache.druid.msq.querykit.groupby; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.frame.Frame; import org.apache.druid.frame.channel.FrameWithPartition; import org.apache.druid.frame.channel.ReadableFrameChannel; @@ -63,6 +65,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.function.Function; @@ -85,6 +88,7 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed private SegmentsInputSlice handedOffSegments = null; private Yielder> currentResultsYielder; + private ListenableFuture> dataServerQueryResultFuture; public GroupByPreShuffleFrameProcessor( final GroupByQuery query, @@ -117,12 +121,23 @@ protected ReturnOrAwait runWithDataServerQuery(DataServerQue { if (resultYielder == null || resultYielder.isDone()) { if (currentResultsYielder == null) { + if (dataServerQueryResultFuture == null) { + dataServerQueryResultFuture = + dataServerQueryHandler.fetchRowsFromDataServer( + groupingEngine.prepareGroupByQuery(query), + Function.identity(), + closer + ); + + // Give up the processing thread while we wait for the query to finish. This is only really asynchronous + // with Dart. On tasks, the IndexerDataServerQueryHandler does not return from fetchRowsFromDataServer until + // the response has started to come back. + return ReturnOrAwait.awaitAllFutures(Collections.singletonList(dataServerQueryResultFuture)); + } + final DataServerQueryResult dataServerQueryResult = - dataServerQueryHandler.fetchRowsFromDataServer( - groupingEngine.prepareGroupByQuery(query), - Function.identity(), - closer - ); + FutureUtils.getUncheckedImmediately(dataServerQueryResultFuture); + dataServerQueryResultFuture = null; handedOffSegments = dataServerQueryResult.getHandedOffSegments(); if (!handedOffSegments.getDescriptors().isEmpty()) { log.info( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index c90aec81af10..73c7917ed620 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -23,8 +23,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ListenableFuture; import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.channel.FrameWithPartition; @@ -85,6 +87,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -104,6 +107,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor private final Closer closer = Closer.create(); private Cursor cursor; + private ListenableFuture> dataServerQueryResultFuture; private Closeable cursorCloser; private Segment segment; private final SimpleSettableOffset cursorOffset = new SimpleAscendingOffset(Integer.MAX_VALUE); @@ -196,12 +200,24 @@ protected ReturnOrAwait runWithDataServerQuery(final DataSer { if (cursor == null) { ScanQuery preparedQuery = prepareScanQueryForDataServer(query); + + if (dataServerQueryResultFuture == null) { + dataServerQueryResultFuture = + dataServerQueryHandler.fetchRowsFromDataServer( + preparedQuery, + ScanQueryFrameProcessor::mappingFunction, + closer + ); + + // Give up the processing thread while we wait for the query to finish. This is only really asynchronous + // with Dart. On tasks, the IndexerDataServerQueryHandler does not return from fetchRowsFromDataServer until + // the response has started to come back. + return ReturnOrAwait.awaitAllFutures(Collections.singletonList(dataServerQueryResultFuture)); + } + final DataServerQueryResult dataServerQueryResult = - dataServerQueryHandler.fetchRowsFromDataServer( - preparedQuery, - ScanQueryFrameProcessor::mappingFunction, - closer - ); + FutureUtils.getUncheckedImmediately(dataServerQueryResultFuture); + dataServerQueryResultFuture = null; handedOffSegments = dataServerQueryResult.getHandedOffSegments(); if (!handedOffSegments.getDescriptors().isEmpty()) { log.info( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java index 07e3854ca068..cb4a75d7b1c8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -103,18 +104,20 @@ public void testSelectWithLoadedSegmentsOnFoo() .build(); doReturn( - new DataServerQueryResult<>( - ImmutableList.of( - Yielders.each( - Sequences.simple( - ImmutableList.of( - new Object[]{1L, "qwe"}, - new Object[]{1L, "tyu"} + Futures.immediateFuture( + new DataServerQueryResult<>( + ImmutableList.of( + Yielders.each( + Sequences.simple( + ImmutableList.of( + new Object[]{1L, "qwe"}, + new Object[]{1L, "tyu"} + ) ) - ) - )), - ImmutableList.of(), - "foo" + )), + ImmutableList.of(), + "foo" + ) )).when(dataServerQueryHandler) .fetchRowsFromDataServer(any(), any(), any()); @@ -164,18 +167,20 @@ public void testSelectWithLoadedSegmentsOnFooWithOrderBy() ScanQuery query = invocationOnMock.getArgument(0); ScanQuery.verifyOrderByForNativeExecution(query); Assert.assertEquals(Long.MAX_VALUE, query.getScanRowsLimit()); - return new DataServerQueryResult<>( - ImmutableList.of( - Yielders.each( - Sequences.simple( - ImmutableList.of( - new Object[]{1L, "qwe"}, - new Object[]{1L, "tyu"} + return Futures.immediateFuture( + new DataServerQueryResult<>( + ImmutableList.of( + Yielders.each( + Sequences.simple( + ImmutableList.of( + new Object[]{1L, "qwe"}, + new Object[]{1L, "tyu"} + ) ) - ) - )), - ImmutableList.of(), - "foo" + )), + ImmutableList.of(), + "foo" + ) ); } ) @@ -225,17 +230,19 @@ public void testGroupByWithLoadedSegmentsOnFoo() .build(); doReturn( - new DataServerQueryResult<>( - ImmutableList.of( - Yielders.each( - Sequences.simple( - ImmutableList.of( - ResultRow.of(1L, 2L) + Futures.immediateFuture( + new DataServerQueryResult<>( + ImmutableList.of( + Yielders.each( + Sequences.simple( + ImmutableList.of( + ResultRow.of(1L, 2L) + ) ) - ) - )), - ImmutableList.of(), - "foo" + )), + ImmutableList.of(), + "foo" + ) ) ) .when(dataServerQueryHandler) @@ -285,17 +292,19 @@ public void testGroupByWithOnlyLoadedSegmentsOnFoo() .build(); doReturn( - new DataServerQueryResult<>( - ImmutableList.of( - Yielders.each( - Sequences.simple( - ImmutableList.of( - ResultRow.of(1L, 2L) + Futures.immediateFuture( + new DataServerQueryResult<>( + ImmutableList.of( + Yielders.each( + Sequences.simple( + ImmutableList.of( + ResultRow.of(1L, 2L) + ) ) - ) - )), - ImmutableList.of(), - "foo" + )), + ImmutableList.of(), + "foo" + ) ) ) .when(dataServerQueryHandler) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java similarity index 78% rename from extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java rename to extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java index f04a65f89a84..2285e49b7fce 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.msq.exec; +package org.apache.druid.msq.indexing; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -29,11 +29,12 @@ import org.apache.druid.discovery.DruidServiceTestUtils; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.exec.DataServerQueryResult; +import org.apache.druid.msq.exec.SegmentSource; import org.apache.druid.msq.input.table.DataServerRequestDescriptor; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.querykit.InputNumberDataSource; @@ -65,20 +66,19 @@ import org.mockito.junit.MockitoJUnitRunner; import java.util.List; +import java.util.concurrent.ExecutionException; -import static org.apache.druid.msq.exec.DataServerQueryHandler.toSegmentDescriptorWithFullInterval; import static org.apache.druid.query.Druids.newScanQueryBuilder; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @RunWith(MockitoJUnitRunner.class) -public class DataServerQueryHandlerTest +public class IndexerDataServerQueryHandlerTest { private static final String DATASOURCE1 = "dataSource1"; private static final DruidServerMetadata DRUID_SERVER_1 = new DruidServerMetadata( @@ -115,7 +115,7 @@ public class DataServerQueryHandlerTest private DataServerClient dataServerClient2; private CoordinatorClient coordinatorClient; private ScanQuery query; - private DataServerQueryHandler target; + private IndexerDataServerQueryHandler target; @Before public void setUp() @@ -136,14 +136,13 @@ public void setUp() .build() ); target = spy( - new DataServerQueryHandler( + new IndexerDataServerQueryHandler( DATASOURCE1, new ChannelCounters(), mock(ServiceClientFactory.class), coordinatorClient, DruidServiceTestUtils.newJsonMapper(), queryToolChestWarehouse, - Execs.scheduledSingleThreaded("query-cancellation-executor"), new DataServerRequestDescriptor(DRUID_SERVER_1, ImmutableList.of(SEGMENT_1, SEGMENT_2)) ) ); @@ -160,7 +159,7 @@ public void setUp() } @Test - public void testFetchRowsFromServer() + public void testFetchRowsFromServer() throws ExecutionException, InterruptedException { ScanResultValue scanResultValue = new ScanResultValue( null, @@ -172,13 +171,14 @@ public void testFetchRowsFromServer() ) ); - doReturn(Sequences.simple(ImmutableList.of(scanResultValue))).when(dataServerClient1).run(any(), any(), any(), any()); + doReturn(Futures.immediateFuture(Sequences.simple(ImmutableList.of(scanResultValue)))) + .when(dataServerClient1).run(any(), any(), any(), any()); DataServerQueryResult dataServerQueryResult = target.fetchRowsFromDataServer( query, ScanQueryFrameProcessor::mappingFunction, Closer.create() - ); + ).get(); Assert.assertTrue(dataServerQueryResult.getHandedOffSegments().getDescriptors().isEmpty()); List> events = (List>) scanResultValue.getEvents(); @@ -192,7 +192,7 @@ public void testFetchRowsFromServer() } @Test - public void testOneSegmentRelocated() + public void testOneSegmentRelocated() throws ExecutionException, InterruptedException { ScanResultValue scanResultValue1 = new ScanResultValue( null, @@ -207,10 +207,10 @@ public void testOneSegmentRelocated() ResponseContext responseContext = invocation.getArgument(1); responseContext.addMissingSegments( ImmutableList.of( - toSegmentDescriptorWithFullInterval(SEGMENT_2) + IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2) ) ); - return Sequences.simple(ImmutableList.of(scanResultValue1)); + return Futures.immediateFuture(Sequences.simple(ImmutableList.of(scanResultValue1))); }).when(dataServerClient1).run(any(), any(), any(), any()); ScanResultValue scanResultValue2 = new ScanResultValue( @@ -222,9 +222,12 @@ public void testOneSegmentRelocated() ) ); - doReturn(Sequences.simple(ImmutableList.of(scanResultValue2))).when(dataServerClient2).run(any(), any(), any(), any()); + doReturn(Futures.immediateFuture(Sequences.simple(ImmutableList.of(scanResultValue2)))) + .when(dataServerClient2).run(any(), any(), any(), any()); - doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_2)); + doReturn(Futures.immediateFuture(Boolean.FALSE)) + .when(coordinatorClient) + .isHandoffComplete(DATASOURCE1, IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2)); doReturn(ImmutableList.of( new ImmutableSegmentLoadInfo( DataSegment.builder() @@ -241,7 +244,7 @@ public void testOneSegmentRelocated() query, ScanQueryFrameProcessor::mappingFunction, Closer.create() - ); + ).get(); Assert.assertTrue(dataServerQueryResult.getHandedOffSegments().getDescriptors().isEmpty()); @@ -263,26 +266,30 @@ public void testOneSegmentRelocated() } @Test - public void testHandoff() + public void testHandoff() throws ExecutionException, InterruptedException { doAnswer(invocation -> { ResponseContext responseContext = invocation.getArgument(1); responseContext.addMissingSegments( ImmutableList.of( - toSegmentDescriptorWithFullInterval(SEGMENT_1), - toSegmentDescriptorWithFullInterval(SEGMENT_2) + IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1), + IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2) ) ); - return Sequences.empty(); + return Futures.immediateFuture(Sequences.empty()); }).when(dataServerClient1).run(any(), any(), any(), any()); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_1)); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_2)); + doReturn(Futures.immediateFuture(Boolean.TRUE)) + .when(coordinatorClient) + .isHandoffComplete(DATASOURCE1, IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1)); + doReturn(Futures.immediateFuture(Boolean.TRUE)) + .when(coordinatorClient) + .isHandoffComplete(DATASOURCE1, IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2)); DataServerQueryResult dataServerQueryResult = target.fetchRowsFromDataServer( query, ScanQueryFrameProcessor::mappingFunction, Closer.create() - ); + ).get(); Assert.assertEquals(ImmutableList.of(SEGMENT_1, SEGMENT_2), dataServerQueryResult.getHandedOffSegments().getDescriptors()); Assert.assertTrue(dataServerQueryResult.getResultsYielders().isEmpty()); @@ -291,13 +298,16 @@ public void testHandoff() @Test public void testServerNotFoundWithoutHandoffShouldThrowException() { - doThrow( - new QueryInterruptedException(new RpcException("Could not connect to server")) + doReturn( + Futures.immediateFailedFuture(new QueryInterruptedException(new RpcException("Could not connect to server"))) ).when(dataServerClient1).run(any(), any(), any(), any()); - doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_1)); + doReturn(Futures.immediateFuture(Boolean.FALSE)) + .when(coordinatorClient) + .isHandoffComplete(DATASOURCE1, IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1)); - ScanQuery queryWithRetry = query.withOverriddenContext(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 3)); + ScanQuery queryWithRetry = + query.withOverriddenContext(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 3)); Assert.assertThrows(DruidException.class, () -> target.fetchRowsFromDataServer( @@ -311,20 +321,24 @@ public void testServerNotFoundWithoutHandoffShouldThrowException() } @Test - public void testServerNotFoundButHandoffShouldReturnWithStatus() + public void testServerNotFoundButHandoffShouldReturnWithStatus() throws ExecutionException, InterruptedException { - doThrow( - new QueryInterruptedException(new RpcException("Could not connect to server")) + doReturn( + Futures.immediateFailedFuture(new QueryInterruptedException(new RpcException("Could not connect to server"))) ).when(dataServerClient1).run(any(), any(), any(), any()); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_1)); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_2)); + doReturn(Futures.immediateFuture(Boolean.TRUE)) + .when(coordinatorClient) + .isHandoffComplete(DATASOURCE1, IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1)); + doReturn(Futures.immediateFuture(Boolean.TRUE)) + .when(coordinatorClient) + .isHandoffComplete(DATASOURCE1, IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2)); DataServerQueryResult dataServerQueryResult = target.fetchRowsFromDataServer( query, ScanQueryFrameProcessor::mappingFunction, Closer.create() - ); + ).get(); Assert.assertEquals(ImmutableList.of(SEGMENT_1, SEGMENT_2), dataServerQueryResult.getHandedOffSegments().getDescriptors()); Assert.assertTrue(dataServerQueryResult.getResultsYielders().isEmpty()); @@ -333,11 +347,12 @@ public void testServerNotFoundButHandoffShouldReturnWithStatus() @Test public void testQueryFail() { - SegmentDescriptor segmentDescriptorWithFullInterval = toSegmentDescriptorWithFullInterval(SEGMENT_1); + SegmentDescriptor segmentDescriptorWithFullInterval = + IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1); doAnswer(invocation -> { ResponseContext responseContext = invocation.getArgument(1); responseContext.addMissingSegments(ImmutableList.of(segmentDescriptorWithFullInterval)); - return Sequences.empty(); + return Futures.immediateFuture(Sequences.empty()); }).when(dataServerClient1).run(any(), any(), any(), any()); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, segmentDescriptorWithFullInterval); diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java index b76263b52e90..abcff1308d6c 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java @@ -44,6 +44,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -134,7 +135,7 @@ public void run() logProcessorStatusString(processor, finished, allWritabilityFutures); if (!writabilityFuturesToWaitFor.isEmpty()) { - runProcessorAfterFutureResolves(Futures.allAsList(writabilityFuturesToWaitFor)); + runProcessorAfterFutureResolves(Futures.allAsList(writabilityFuturesToWaitFor), false); return; } @@ -150,13 +151,17 @@ public void run() if (result.isReturn()) { succeed(result.value()); + } else if (result.hasAwaitableFutures()) { + runProcessorAfterFutureResolves(Futures.allAsList(result.awaitableFutures()), true); } else { + assert result.hasAwaitableChannels(); + // Don't retain a reference to this set: it may be mutated the next time the processor runs. - final IntSet await = result.awaitSet(); + final IntSet await = result.awaitableChannels(); if (await.isEmpty()) { exec.execute(ExecutorRunnable.this); - } else if (result.isAwaitAll() || await.size() == 1) { + } else if (result.isAwaitAllChannels() || await.size() == 1) { final List> readabilityFutures = new ArrayList<>(); for (final int channelNumber : await) { @@ -169,11 +174,11 @@ public void run() if (readabilityFutures.isEmpty()) { exec.execute(ExecutorRunnable.this); } else { - runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures)); + runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures), false); } } else { // Await any. - runProcessorAfterFutureResolves(awaitAnyWidget.awaitAny(await)); + runProcessorAfterFutureResolves(awaitAnyWidget.awaitAny(await), false); } } } @@ -272,7 +277,17 @@ private Optional> runProcessorNow() } } - private void runProcessorAfterFutureResolves(final ListenableFuture future) + /** + * Schedule this processor to run after the provided future resolves. + * + * @param future the future + * @param failOnCancel whether the processor should be {@link #fail(Throwable)} if the future is itself canceled. + * This is true for futures provided by {@link ReturnOrAwait#awaitAllFutures(Collection)}, + * because the processor has declared it wants to wait for them; if they are canceled + * the processor must fail. It is false for other futures, which the processor was not + * directly waiting for. + */ + private void runProcessorAfterFutureResolves(final ListenableFuture future, final boolean failOnCancel) { final ListenableFuture cancelableFuture = registerCancelableFuture(future, false, cancellationId); @@ -294,8 +309,7 @@ public void onSuccess(final V ignored) @Override public void onFailure(Throwable t) { - // Ignore cancellation. - if (!cancelableFuture.isCancelled()) { + if (failOnCancel || !cancelableFuture.isCancelled()) { fail(t); } } diff --git a/processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java b/processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java index 4ca69b6cc498..c154b4258511 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java @@ -19,13 +19,14 @@ package org.apache.druid.frame.processor; +import com.google.common.util.concurrent.ListenableFuture; import it.unimi.dsi.fastutil.ints.IntSet; import it.unimi.dsi.fastutil.ints.IntSets; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import javax.annotation.Nullable; -import java.util.Objects; +import java.util.Collection; /** * Instances of this class are returned by {@link FrameProcessor#runIncrementally}, and are used by @@ -35,8 +36,8 @@ * In this case {@link #isReturn()} is true and {@link #value()} contains the result. * * An instance can also be an "await", which means that the {@link FrameProcessor} wants to be scheduled again - * in the future. In this case {@link #isAwait()} is true, {@link #awaitSet()} contains the set of input channels to - * wait for, and {@link #isAwaitAll()} is whether the processor wants to wait for all channels, or any channel. + * in the future. In this case {@link #isAwait()} is true, and *either* {@link #hasAwaitableChannels()} or + * {@link #hasAwaitableFutures()} is true. */ public class ReturnOrAwait { @@ -46,18 +47,31 @@ public class ReturnOrAwait private final T retVal; @Nullable - private final IntSet await; + private final IntSet awaitChannels; - private final boolean awaitAll; + private final boolean awaitAllChannels; - private ReturnOrAwait(@Nullable T retVal, @Nullable IntSet await, final boolean awaitAll) + @Nullable + private final Collection> awaitFutures; + + private ReturnOrAwait( + @Nullable T retVal, + @Nullable IntSet awaitChannels, + @Nullable Collection> awaitFutures, + final boolean awaitAllChannels + ) { this.retVal = retVal; - this.await = await; - this.awaitAll = awaitAll; + this.awaitChannels = awaitChannels; + this.awaitAllChannels = awaitAllChannels; + this.awaitFutures = awaitFutures; + + if (retVal != null && (awaitChannels != null || awaitFutures != null)) { + throw new IAE("Cannot have a value when await != null or futures != null"); + } - if (retVal != null && await != null) { - throw new IAE("Cannot have a value when await != null"); + if (awaitChannels != null && awaitFutures != null) { + throw new ISE("Cannot have both awaitChannels and awaitFutures"); } } @@ -66,7 +80,7 @@ private ReturnOrAwait(@Nullable T retVal, @Nullable IntSet await, final boolean */ public static ReturnOrAwait runAgain() { - return new ReturnOrAwait<>(null, IntSets.emptySet(), true); + return new ReturnOrAwait<>(null, IntSets.emptySet(), null, true); } /** @@ -78,7 +92,7 @@ public static ReturnOrAwait runAgain() */ public static ReturnOrAwait awaitAll(final IntSet await) { - return new ReturnOrAwait<>(null, await, true); + return new ReturnOrAwait<>(null, await, null, true); } /** @@ -86,7 +100,15 @@ public static ReturnOrAwait awaitAll(final IntSet await) */ public static ReturnOrAwait awaitAll(final int count) { - return new ReturnOrAwait<>(null, rangeSet(count), true); + return new ReturnOrAwait<>(null, rangeSet(count), null, true); + } + + /** + * Wait for all of the provided futures. + */ + public static ReturnOrAwait awaitAllFutures(final Collection> futures) + { + return new ReturnOrAwait<>(null, null, futures, true); } /** @@ -100,7 +122,7 @@ public static ReturnOrAwait awaitAll(final int count) */ public static ReturnOrAwait awaitAny(final IntSet await) { - return new ReturnOrAwait<>(null, await, false); + return new ReturnOrAwait<>(null, await, null, false); } /** @@ -108,7 +130,7 @@ public static ReturnOrAwait awaitAny(final IntSet await) */ public static ReturnOrAwait returnObject(final T o) { - return new ReturnOrAwait<>(o, null, false); + return new ReturnOrAwait<>(o, null, null, false); } /** @@ -129,13 +151,23 @@ public T value() * * Numbers in this set correspond to positions in the {@link FrameProcessor#inputChannels()} list. */ - public IntSet awaitSet() + public IntSet awaitableChannels() { - if (isReturn()) { + if (!hasAwaitableChannels()) { throw new ISE("No await set"); } - return await; + return awaitChannels; + } + + + public Collection> awaitableFutures() + { + if (!hasAwaitableFutures()) { + throw new ISE("No futures set"); + } + + return awaitFutures; } /** @@ -145,7 +177,7 @@ public IntSet awaitSet() */ public boolean isReturn() { - return await == null; + return awaitChannels == null && awaitFutures == null; } /** @@ -155,41 +187,46 @@ public boolean isReturn() */ public boolean isAwait() { - return await != null; + return !isReturn(); } /** - * Whether the processor wants to wait for all channels in {@link #awaitSet()} (true), or any channel (false) + * Whether the processor wants to wait for a set of futures. If true, {@link #awaitableFutures()} contains the + * set of futures to wait for. */ - public boolean isAwaitAll() + public boolean hasAwaitableFutures() { - return awaitAll; + return awaitFutures != null; } - @Override - public boolean equals(Object o) + /** + * Whether the processor wants to wait for a set of channels. If true, {@link #awaitableChannels()} contains the + * set of channels to wait for, and {@link #isAwaitAllChannels()}. + */ + public boolean hasAwaitableChannels() { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ReturnOrAwait that = (ReturnOrAwait) o; - return awaitAll == that.awaitAll && Objects.equals(retVal, that.retVal) && Objects.equals(await, that.await); + return awaitChannels != null; } - @Override - public int hashCode() + /** + * Whether the processor wants to wait for all channels in {@link #awaitableChannels()} (true), or any channel (false) + */ + public boolean isAwaitAllChannels() { - return Objects.hash(retVal, await, awaitAll); + if (!hasAwaitableChannels()) { + throw new ISE("No channels set"); + } + + return awaitAllChannels; } @Override public String toString() { - if (isAwait()) { - return "await=" + (awaitAll ? "all" : "any") + await; + if (hasAwaitableChannels()) { + return "await channels=" + (awaitAllChannels ? "all" : "any") + awaitChannels; + } else if (hasAwaitableFutures()) { + return "await futures=" + awaitFutures; } else { return "return=" + retVal; } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java index cf53abd0e5f4..60c2e1135b82 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java @@ -36,6 +36,7 @@ import org.apache.druid.frame.file.FrameFileWriter; import org.apache.druid.frame.processor.test.ChompingFrameProcessor; import org.apache.druid.frame.processor.test.FailingFrameProcessor; +import org.apache.druid.frame.processor.test.FutureWaitingProcessor; import org.apache.druid.frame.processor.test.InfiniteFrameProcessor; import org.apache.druid.frame.processor.test.SleepyFrameProcessor; import org.apache.druid.frame.processor.test.SuperBlasterFrameProcessor; @@ -414,6 +415,70 @@ public void test_runFully_nonexistentCancellationId() Assert.assertFalse(processor.didGetInterrupt()); Assert.assertFalse(processor.didCleanup()); } + + @Test + public void test_awaitAll_withFutures() throws Exception + { + // Test a processor that waits for futures to complete using ReturnOrAwait.awaitAll(Collection) + final SettableFuture future1 = SettableFuture.create(); + final SettableFuture future2 = SettableFuture.create(); + + // Start the processor + final FutureWaitingProcessor futureWaitingProcessor = new FutureWaitingProcessor(future1, future2); + final ListenableFuture> processorFuture = exec.runFully(futureWaitingProcessor, null); + + // Processor should be waiting for futures + Assert.assertFalse("Processor should be waiting", processorFuture.isDone()); + + // Complete the futures + future1.set("result1"); + future2.set("result2"); + + // Processor should complete now + Assert.assertEquals(List.of("result1", "result2"), processorFuture.get()); + Assert.assertTrue(futureWaitingProcessor.isCleanedUp()); + } + + @Test + public void test_awaitAll_withFutures_canceled() + { + // Test cancellation of a future that a processor is waiting for + final SettableFuture future1 = SettableFuture.create(); + final SettableFuture future2 = SettableFuture.create(); + + // Start the processor + final FutureWaitingProcessor futureWaitingProcessor = new FutureWaitingProcessor(future1, future2); + final ListenableFuture> processorFuture = exec.runFully(futureWaitingProcessor, null); + + // Resolve one, cancel one + future1.set("result1"); + future2.cancel(true); + + // Verify exception + final ExecutionException e = Assert.assertThrows(ExecutionException.class, processorFuture::get); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(CancellationException.class)); + } + + @Test + public void test_awaitAll_withFutures_error() + { + // Test errors on a future that a processor is waiting for + final SettableFuture future1 = SettableFuture.create(); + final SettableFuture future2 = SettableFuture.create(); + + // Start the processor + final FutureWaitingProcessor futureWaitingProcessor = new FutureWaitingProcessor(future1, future2); + final ListenableFuture> processorFuture = exec.runFully(futureWaitingProcessor, null); + + // Resolve one, fail out one + future1.set("result1"); + future2.setException(new RuntimeException("oops")); + + // Verify exception + final ExecutionException e = Assert.assertThrows(ExecutionException.class, processorFuture::get); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RuntimeException.class)); + MatcherAssert.assertThat(e.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("oops"))); + } } public abstract static class BaseFrameProcessorExecutorTestSuite extends InitializedNullHandlingTest diff --git a/processing/src/test/java/org/apache/druid/frame/processor/ReturnOrAwaitTest.java b/processing/src/test/java/org/apache/druid/frame/processor/ReturnOrAwaitTest.java index 8377cf0639c6..0215b43f2c41 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/ReturnOrAwaitTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/ReturnOrAwaitTest.java @@ -19,24 +19,27 @@ package org.apache.druid.frame.processor; +import com.google.common.util.concurrent.Futures; import it.unimi.dsi.fastutil.ints.IntSet; -import nl.jqno.equalsverifier.EqualsVerifier; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Test; +import java.util.Collections; + public class ReturnOrAwaitTest { @Test public void testToString() { - Assert.assertEquals("await=any{0, 1}", ReturnOrAwait.awaitAny(IntSet.of(0, 1)).toString()); - Assert.assertEquals("await=all{0, 1}", ReturnOrAwait.awaitAll(2).toString()); + Assert.assertEquals("await channels=any{0, 1}", ReturnOrAwait.awaitAny(IntSet.of(0, 1)).toString()); + Assert.assertEquals("await channels=all{0, 1}", ReturnOrAwait.awaitAll(2).toString()); Assert.assertEquals("return=xyzzy", ReturnOrAwait.returnObject("xyzzy").toString()); - } - @Test - public void testEquals() - { - EqualsVerifier.forClass(ReturnOrAwait.class).usingGetClass().verify(); + MatcherAssert.assertThat( + ReturnOrAwait.awaitAllFutures(Collections.singletonList(Futures.immediateFuture(1))).toString(), + CoreMatchers.startsWith("await futures=[com.google.") + ); } } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/test/FutureWaitingProcessor.java b/processing/src/test/java/org/apache/druid/frame/processor/test/FutureWaitingProcessor.java new file mode 100644 index 000000000000..a55e257675fb --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/processor/test/FutureWaitingProcessor.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.test; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.java.util.common.ISE; +import org.junit.Assert; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Processor that waits for two futures using {@link ReturnOrAwait#awaitAllFutures(Collection)}. + */ +public class FutureWaitingProcessor implements FrameProcessor> +{ + private final ListenableFuture future1; + private final ListenableFuture future2; + + public FutureWaitingProcessor(ListenableFuture future1, ListenableFuture future2) + { + this.future1 = future1; + this.future2 = future2; + } + + private int runCount = 0; + private boolean cleanedUp; + private final List results = new ArrayList<>(); + + @Override + public List inputChannels() + { + return Collections.emptyList(); + } + + @Override + public List outputChannels() + { + return Collections.emptyList(); + } + + @Override + public ReturnOrAwait> runIncrementally(IntSet readableInputs) + { + runCount++; + + if (runCount == 1) { + // First run: wait for both futures + return ReturnOrAwait.awaitAllFutures(ImmutableList.of(future1, future2)); + } else if (runCount == 2) { + // Second run: futures should be complete, collect results + Assert.assertTrue("future1 should be done", future1.isDone()); + Assert.assertTrue("future2 should be done", future2.isDone()); + + try { + results.add(future1.get()); + results.add(future2.get()); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + return ReturnOrAwait.returnObject(results); + } else { + throw new ISE("Should not run more than twice"); + } + } + + @Override + public void cleanup() + { + cleanedUp = true; + } + + public boolean isCleanedUp() + { + return cleanedUp; + } +} diff --git a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java index c75b646bc8f9..6a2021f8043e 100644 --- a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java +++ b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java @@ -26,16 +26,17 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.JsonParserIterator; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.http.client.response.StatusResponseHandler; -import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.query.Query; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.rpc.FixedServiceLocator; +import org.apache.druid.rpc.IgnoreHttpResponseHandler; import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.ServiceClient; import org.apache.druid.rpc.ServiceClientFactory; @@ -43,30 +44,27 @@ import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.utils.CloseableUtils; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.Duration; import java.io.InputStream; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; /** * Client to query data servers given a query. */ public class DataServerClient { - private static final String BASE_PATH = "/druid/v2/"; private static final Logger log = new Logger(DataServerClient.class); + private static final String BASE_PATH = "/druid/v2/"; + private static final Duration CANCELLATION_TIMEOUT = Duration.standardSeconds(5); + private final ServiceClient serviceClient; private final ObjectMapper objectMapper; private final ServiceLocation serviceLocation; - private final ScheduledExecutorService queryCancellationExecutor; public DataServerClient( ServiceClientFactory serviceClientFactory, ServiceLocation serviceLocation, - ObjectMapper objectMapper, - ScheduledExecutorService queryCancellationExecutor + ObjectMapper objectMapper ) { this.serviceClient = serviceClientFactory.makeClient( @@ -76,13 +74,23 @@ public DataServerClient( ); this.serviceLocation = serviceLocation; this.objectMapper = objectMapper; - this.queryCancellationExecutor = queryCancellationExecutor; } - public Sequence run(Query query, ResponseContext responseContext, JavaType queryResultType, Closer closer) + /** + * Issue a query. Returns a future that resolves when the server starts sending its response. + * + * @param query query to run + * @param responseContext response context to populate + * @param queryResultType type of result object + * @param closer closer; this call will register a query canceler with this closer + */ + public ListenableFuture> run( + final Query query, + final ResponseContext responseContext, + final JavaType queryResultType, + final Closer closer + ) { - final String cancelPath = BASE_PATH + query.getId(); - RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST, BASE_PATH); final boolean isSmile = objectMapper.getFactory() instanceof SmileFactory; if (isSmile) { @@ -114,64 +122,72 @@ public void onSuccess(InputStream result) public void onFailure(Throwable t) { if (resultStreamFuture.isCancelled()) { - cancelQuery(query, cancelPath); + cancelQuery(query.getId()); } } }, Execs.directExecutor() ); - return new BaseSequence<>( - new BaseSequence.IteratorMaker>() + return FutureUtils.transform( + resultStreamFuture, + resultStream -> new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public JsonParserIterator make() + { + return new JsonParserIterator<>( + queryResultType, + Futures.immediateFuture(resultStream), + BASE_PATH, + query, + serviceLocation.getHost(), + objectMapper + ); + } + + @Override + public void cleanup(JsonParserIterator iterFromMake) + { + CloseableUtils.closeAndWrapExceptions(iterFromMake); + } + } + ) + ); + } + + private void cancelQuery(final String queryId) + { + if (queryId == null) { + throw DruidException.defensive("Null queryId"); + } + + final String cancelPath = BASE_PATH + queryId; + + final ListenableFuture cancelFuture = serviceClient.asyncRequest( + new RequestBuilder(HttpMethod.DELETE, cancelPath).timeout(CANCELLATION_TIMEOUT), + IgnoreHttpResponseHandler.INSTANCE + ); + + Futures.addCallback( + cancelFuture, + new FutureCallback<>() { @Override - public JsonParserIterator make() + public void onSuccess(final Void result) { - return new JsonParserIterator<>( - queryResultType, - resultStreamFuture, - BASE_PATH, - query, - serviceLocation.getHost(), - objectMapper - ); + // Do nothing on successful cancellation. } @Override - public void cleanup(JsonParserIterator iterFromMake) + public void onFailure(final Throwable t) { - CloseableUtils.closeAndWrapExceptions(iterFromMake); + log.noStackTrace() + .warn(t, "Failed to cancel query[%s] on server[%s]", queryId, serviceLocation.getHostAndPort()); } - } + }, + Execs.directExecutor() ); } - - private void cancelQuery(Query query, String cancelPath) - { - Runnable cancelRunnable = () -> { - Future cancelFuture = serviceClient.asyncRequest( - new RequestBuilder(HttpMethod.DELETE, cancelPath), - StatusResponseHandler.getInstance()); - - Runnable checkRunnable = () -> { - try { - if (!cancelFuture.isDone()) { - log.error("Error cancelling query[%s]", query); - } - StatusResponseHolder response = cancelFuture.get(); - if (response.getStatus().getCode() >= 500) { - log.error("Error cancelling query[%s]: queryable node returned status[%d] [%s].", - query, - response.getStatus().getCode(), - response.getStatus().getReasonPhrase()); - } - } - catch (ExecutionException | InterruptedException e) { - log.error(e, "Error cancelling query[%s]", query); - } - }; - queryCancellationExecutor.schedule(checkRunnable, 5, TimeUnit.SECONDS); - }; - queryCancellationExecutor.submit(cancelRunnable); - } } diff --git a/server/src/main/java/org/apache/druid/rpc/ServiceLocation.java b/server/src/main/java/org/apache/druid/rpc/ServiceLocation.java index 974f09fe89bb..552e0d6e9ba3 100644 --- a/server/src/main/java/org/apache/druid/rpc/ServiceLocation.java +++ b/server/src/main/java/org/apache/druid/rpc/ServiceLocation.java @@ -166,6 +166,18 @@ public String getHost() return host; } + /** + * Returns a host:port string for the preferred port (TLS if available; plaintext otherwise). + */ + public String getHostAndPort() + { + if (tlsPort > 0) { + return host + ":" + tlsPort; + } else { + return host + ":" + plaintextPort; + } + } + public int getPlaintextPort() { return plaintextPort; diff --git a/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java b/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java index 7afdc948a7a2..95ad46a580cc 100644 --- a/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java +++ b/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.QueryTimeoutException; @@ -51,6 +50,7 @@ import javax.ws.rs.core.MediaType; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutionException; import static org.apache.druid.query.Druids.newScanQueryBuilder; import static org.mockito.Mockito.mock; @@ -81,13 +81,12 @@ public void setUp() target = new DataServerClient( serviceClientFactory, mock(ServiceLocation.class), - jsonMapper, - Execs.scheduledSingleThreaded("query-cancellation-executor") + jsonMapper ); } @Test - public void testFetchSegmentFromDataServer() throws JsonProcessingException + public void testFetchSegmentFromDataServer() throws JsonProcessingException, ExecutionException, InterruptedException { ScanResultValue scanResultValue = new ScanResultValue( null, @@ -112,7 +111,7 @@ public void testFetchSegmentFromDataServer() throws JsonProcessingException responseContext, jsonMapper.getTypeFactory().constructType(ScanResultValue.class), Closer.create() - ); + ).get(); Assert.assertEquals(ImmutableList.of(scanResultValue), result.toList()); } @@ -170,7 +169,7 @@ public void testQueryFailure() throws JsonProcessingException responseContext, jsonMapper.getTypeFactory().constructType(ScanResultValue.class), Closer.create() - ).toList() + ).get().toList() ); } diff --git a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java index 6c3cfccbccb8..cbfd8ed86dc3 100644 --- a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java +++ b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java @@ -251,6 +251,7 @@ public void start() throws Exception resource.onStarted(this); } catch (Exception e) { + log.warn(e, "Failed to start resource[%s]. Stopping cluster.", resource); // Clean up the resources that have already been started stop(); throw e;