diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java index a1f4e06e948e..5e0a3898e7f5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java @@ -19,6 +19,7 @@ package org.apache.druid.msq.dart.guice; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.util.concurrent.MoreExecutors; @@ -35,8 +36,10 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycleAnnouncements; +import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.LoadScope; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.guice.annotations.Smile; import org.apache.druid.initialization.DruidModule; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -50,6 +53,7 @@ import org.apache.druid.msq.dart.controller.messages.ControllerMessage; import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; import org.apache.druid.msq.dart.worker.DartDataSegmentProvider; +import org.apache.druid.msq.dart.worker.DartDataServerQueryHandlerFactory; import org.apache.druid.msq.dart.worker.DartWorkerContextFactory; import org.apache.druid.msq.dart.worker.DartWorkerContextFactoryImpl; import org.apache.druid.msq.dart.worker.DartWorkerRunner; @@ -58,6 +62,8 @@ import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.msq.rpc.ResourcePermissionMapper; import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.QueryToolChestWarehouse; +import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.server.DruidNode; import org.apache.druid.server.security.AuthorizerMapper; @@ -156,6 +162,20 @@ public Outbox createOutbox() { return new OutboxImpl<>(); } + + @Provides + public DartDataServerQueryHandlerFactory createDataServerQueryHandlerFactory( + @EscalatedGlobal ServiceClientFactory serviceClientFactory, + @Smile ObjectMapper smileMapper, + QueryToolChestWarehouse queryToolChestWarehouse + ) + { + return new DartDataServerQueryHandlerFactory( + serviceClientFactory, + smileMapper, + queryToolChestWarehouse + ); + } } @Override diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java new file mode 100644 index 000000000000..4a6b67da6698 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.dart.worker; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.discovery.DataServerClient; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.exec.DataServerQueryHandler; +import org.apache.druid.msq.exec.DataServerQueryHandlerUtils; +import org.apache.druid.msq.exec.DataServerQueryResult; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; +import org.apache.druid.msq.input.table.RichSegmentDescriptor; +import org.apache.druid.query.Queries; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.QueryToolChestWarehouse; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.aggregation.MetricManipulatorFns; +import org.apache.druid.query.context.DefaultResponseContext; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; + +import java.util.Collections; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Dart implementation of {@link DataServerQueryHandler}. Issues queries asynchronously, with no retries. + */ +public class DartDataServerQueryHandler implements DataServerQueryHandler +{ + private final String dataSource; + private final ChannelCounters channelCounters; + private final ServiceClientFactory serviceClientFactory; + private final ObjectMapper objectMapper; + private final QueryToolChestWarehouse warehouse; + private final DataServerRequestDescriptor requestDescriptor; + + public DartDataServerQueryHandler( + String dataSource, + ChannelCounters channelCounters, + ServiceClientFactory serviceClientFactory, + ObjectMapper objectMapper, + QueryToolChestWarehouse warehouse, + DataServerRequestDescriptor requestDescriptor + ) + { + this.dataSource = dataSource; + this.channelCounters = channelCounters; + this.serviceClientFactory = serviceClientFactory; + this.objectMapper = objectMapper; + this.warehouse = warehouse; + this.requestDescriptor = requestDescriptor; + } + + /** + * {@inheritDoc} + * + * This method returns immediately. The returned future resolves when the server has started sending back + * its response. + * + * Queries are issued once, without retries. + */ + @Override + public ListenableFuture> fetchRowsFromDataServer( + Query query, + Function, Sequence> mappingFunction, + Closer closer + ) + { + final Query preparedQuery = + Queries.withSpecificSegments( + DataServerQueryHandlerUtils.prepareQuery(query, dataSource), + requestDescriptor.getSegments() + .stream() + .map(RichSegmentDescriptor::toPlainDescriptor) + .collect(Collectors.toList()) + ); + + final ServiceLocation serviceLocation = + ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata()); + final DataServerClient dataServerClient = makeDataServerClient(serviceLocation); + final QueryToolChest> toolChest = warehouse.getToolChest(query); + final Function preComputeManipulatorFn = + toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing()); + final JavaType queryResultType = toolChest.getBaseResultType(); + final ResponseContext responseContext = new DefaultResponseContext(); + + return FutureUtils.transform( + dataServerClient.run(preparedQuery, responseContext, queryResultType, closer), + resultSequence -> { + final Yielder yielder = DataServerQueryHandlerUtils.createYielder( + resultSequence.map(preComputeManipulatorFn), + mappingFunction, + channelCounters + ); + + final List missingSegments = + DataServerQueryHandlerUtils.getMissingSegments(responseContext); + + if (!missingSegments.isEmpty()) { + throw DruidException + .forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build( + "Segment[%s]%s not found on server[%s]. Please retry your query.", + missingSegments.get(0), + missingSegments.size() > 1 ? StringUtils.format(" and[%d] others", missingSegments.size() - 1) : "", + serviceLocation.getHostAndPort() + ); + } + + return new DataServerQueryResult<>( + Collections.singletonList(yielder), + Collections.emptyList(), + dataSource + ); + } + ); + } + + private DataServerClient makeDataServerClient(ServiceLocation serviceLocation) + { + return new DataServerClient(serviceClientFactory, serviceLocation, objectMapper); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java new file mode 100644 index 000000000000..a7fc2004f7bd --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.dart.worker; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.exec.DataServerQueryHandlerFactory; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; +import org.apache.druid.query.QueryToolChestWarehouse; +import org.apache.druid.rpc.ServiceClientFactory; + +/** + * Factory for {@link DartDataServerQueryHandler}. + */ +public class DartDataServerQueryHandlerFactory implements DataServerQueryHandlerFactory +{ + private final ServiceClientFactory serviceClientFactory; + private final ObjectMapper objectMapper; + private final QueryToolChestWarehouse warehouse; + + public DartDataServerQueryHandlerFactory( + ServiceClientFactory serviceClientFactory, + ObjectMapper objectMapper, + QueryToolChestWarehouse warehouse + ) + { + this.serviceClientFactory = serviceClientFactory; + this.objectMapper = objectMapper; + this.warehouse = warehouse; + } + + @Override + public DartDataServerQueryHandler createDataServerQueryHandler( + String dataSource, + ChannelCounters channelCounters, + DataServerRequestDescriptor requestDescriptor + ) + { + return new DartDataServerQueryHandler( + dataSource, + channelCounters, + serviceClientFactory, + objectMapper, + warehouse, + requestDescriptor + ); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java index b1e2fb925941..03d17b1bbd31 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.google.inject.Injector; -import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Self; @@ -31,14 +30,12 @@ import org.apache.druid.messages.server.Outbox; import org.apache.druid.msq.dart.Dart; import org.apache.druid.msq.dart.controller.messages.ControllerMessage; -import org.apache.druid.msq.exec.DataServerQueryHandlerFactory; import org.apache.druid.msq.exec.MemoryIntrospector; import org.apache.druid.msq.exec.ProcessingBuffersProvider; import org.apache.druid.msq.exec.WorkerContext; import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContext; -import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.policy.PolicyEnforcer; import org.apache.druid.rpc.ServiceClientFactory; @@ -65,8 +62,7 @@ public class DartWorkerContextFactoryImpl implements DartWorkerContextFactory private final MemoryIntrospector memoryIntrospector; private final ProcessingBuffersProvider processingBuffersProvider; private final Outbox outbox; - private final CoordinatorClient coordinatorClient; - private final QueryToolChestWarehouse warehouse; + private final DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory; private final ServiceEmitter emitter; @Inject @@ -84,8 +80,7 @@ public DartWorkerContextFactoryImpl( MemoryIntrospector memoryIntrospector, @Dart ProcessingBuffersProvider processingBuffersProvider, Outbox outbox, - CoordinatorClient coordinatorClient, - QueryToolChestWarehouse warehouse, + DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory, ServiceEmitter emitter ) { @@ -102,8 +97,7 @@ public DartWorkerContextFactoryImpl( this.memoryIntrospector = memoryIntrospector; this.processingBuffersProvider = processingBuffersProvider; this.outbox = outbox; - this.coordinatorClient = coordinatorClient; - this.warehouse = warehouse; + this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory; this.emitter = emitter; } @@ -132,12 +126,7 @@ public WorkerContext build( outbox, tempDir, queryContext, - new DataServerQueryHandlerFactory( - coordinatorClient, - serviceClientFactory, - jsonMapper, - warehouse - ), + dataServerQueryHandlerFactory, emitter ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java index 04458d701705..f9c8c14ad141 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java @@ -19,359 +19,43 @@ package org.apache.druid.msq.exec; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import org.apache.druid.client.ImmutableSegmentLoadInfo; -import org.apache.druid.client.coordinator.CoordinatorClient; -import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.discovery.DataServerClient; -import org.apache.druid.error.DruidException; -import org.apache.druid.java.util.common.RetryUtils; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.java.util.common.guava.Sequence; -import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.common.guava.Yielder; -import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.input.table.DataServerRequestDescriptor; -import org.apache.druid.msq.input.table.DataServerSelector; -import org.apache.druid.msq.input.table.RichSegmentDescriptor; -import org.apache.druid.query.Queries; import org.apache.druid.query.Query; -import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryToolChest; -import org.apache.druid.query.QueryToolChestWarehouse; -import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.MetricManipulationFn; -import org.apache.druid.query.aggregation.MetricManipulatorFns; -import org.apache.druid.query.context.DefaultResponseContext; -import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.rpc.RpcException; -import org.apache.druid.rpc.ServiceClientFactory; -import org.apache.druid.rpc.ServiceLocation; -import org.apache.druid.server.coordination.DruidServerMetadata; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; -import java.util.stream.Collectors; /** - * Class responsible for querying dataservers and retriving results for a given query. Also queries the coordinator - * to check if a segment has been handed off. + * Object for issuing native queries to data servers. This is created by a {@link DataServerQueryHandlerFactory}, + * and is used when MSQ is querying realtime data. It is required because realtime tasks are not currently able to + * execute MSQ logic themselves. */ -public class DataServerQueryHandler +public interface DataServerQueryHandler { - private static final Logger log = new Logger(DataServerQueryHandler.class); - private static final int DEFAULT_NUM_TRIES = 3; - private static final int PER_SERVER_QUERY_NUM_TRIES = 5; - private final String dataSource; - private final ChannelCounters channelCounters; - private final ServiceClientFactory serviceClientFactory; - private final CoordinatorClient coordinatorClient; - private final ObjectMapper objectMapper; - private final QueryToolChestWarehouse warehouse; - private final ScheduledExecutorService queryCancellationExecutor; - private final DataServerRequestDescriptor dataServerRequestDescriptor; - - public DataServerQueryHandler( - String dataSource, - ChannelCounters channelCounters, - ServiceClientFactory serviceClientFactory, - CoordinatorClient coordinatorClient, - ObjectMapper objectMapper, - QueryToolChestWarehouse warehouse, - ScheduledExecutorService queryCancellationExecutor, - DataServerRequestDescriptor dataServerRequestDescriptor - ) - { - this.dataSource = dataSource; - this.channelCounters = channelCounters; - this.serviceClientFactory = serviceClientFactory; - this.coordinatorClient = coordinatorClient; - this.objectMapper = objectMapper; - this.warehouse = warehouse; - this.queryCancellationExecutor = queryCancellationExecutor; - this.dataServerRequestDescriptor = dataServerRequestDescriptor; - } - - @VisibleForTesting - DataServerClient makeDataServerClient(ServiceLocation serviceLocation) - { - return new DataServerClient(serviceClientFactory, serviceLocation, objectMapper, queryCancellationExecutor); - } - /** - * Performs some necessary transforms to the query, so that the dataserver is able to understand it first. - * - Changing the datasource to a {@link TableDataSource} - * - Limiting the query to the required segments with {@link Queries#withSpecificSegments(Query, List)} - *
- * Then queries a data server and returns a {@link Yielder} for the results, retrying if needed. If a dataserver - * indicates that some segments were not found, checks with the coordinator to see if the segment was handed off. - * - If all the segments were handed off, returns a {@link DataServerQueryResult} with the yielder and list of handed - * off segments. - * - If some segments were not handed off, checks with the coordinator fetch an updated list of servers. This step is - * repeated up to {@link #DEFAULT_NUM_TRIES} times. - * - If the servers could not be found, checks if the segment was handed-off. If it was, returns a - * {@link DataServerQueryResult} with the yielder and list of handed off segments. Otherwise, throws an exception. - *
+ * Issues a query to the server and segments that were specified by the {@link DataServerRequestDescriptor} + * originally passed to {@link DataServerQueryHandlerFactory#createDataServerQueryHandler} when this instance + * was created. + * + * The query datasource is updated to refer to the specific segments from + * {@link DataServerRequestDescriptor#getSegments()}. + * * Also applies {@link QueryToolChest#makePreComputeManipulatorFn(Query, MetricManipulationFn)} and reports channel * metrics on the returned results. * - * @param result return type for the query from the data server - * @param type of the result rows after parsing from QueryType object + * @param query query to run + * @param mappingFunction function to apply to results + * @param closer will register query canceler with this closer + * @param result return type for the query from the data server + * @param type of the result rows after parsing from QueryType object */ - public DataServerQueryResult fetchRowsFromDataServer( + ListenableFuture> fetchRowsFromDataServer( Query query, Function, Sequence> mappingFunction, Closer closer - ) - { - // MSQ changes the datasource to a number datasource. This needs to be changed back for data servers to understand. - final Query preparedQuery = query.withDataSource(new TableDataSource(dataSource)); - final List> yielders = new ArrayList<>(); - final List handedOffSegments = new ArrayList<>(); - - List pendingRequests = ImmutableList.of(dataServerRequestDescriptor); - - final int maxRetries = preparedQuery.context().getNumRetriesOnMissingSegments(DEFAULT_NUM_TRIES); - int retryCount = 0; - - while (!pendingRequests.isEmpty()) { - final ResponseContext responseContext = new DefaultResponseContext(); - final Set processedSegments = new HashSet<>(); - for (DataServerRequestDescriptor descriptor : pendingRequests) { - log.info("Querying server [%s] for segments[%s]", descriptor.getServerMetadata(), descriptor.getSegments()); - processedSegments.addAll(descriptor.getSegments()); - Yielder yielder = fetchRowsFromDataServerInternal(descriptor, responseContext, closer, preparedQuery, mappingFunction); - - // Add results - if (yielder != null && !yielder.isDone()) { - yielders.add(yielder); - } - } - - // Check for missing segments - List missingSegments = getMissingSegments(responseContext); - if (missingSegments.isEmpty()) { - // No segments remaining. - break; - } - - final List handedOffSegmentDescriptors = checkSegmentHandoff(missingSegments); - - Set missingRichSegmentDescriptors = new HashSet<>(); - for (RichSegmentDescriptor richSegmentDescriptor : processedSegments) { - SegmentDescriptor segmentDescriptor = toSegmentDescriptorWithFullInterval(richSegmentDescriptor); - if (missingSegments.contains(segmentDescriptor)) { - if (handedOffSegmentDescriptors.contains(segmentDescriptor)) { - handedOffSegments.add(richSegmentDescriptor); - } else { - missingRichSegmentDescriptors.add(richSegmentDescriptor); - } - } - } - - pendingRequests = createNextPendingRequests( - missingRichSegmentDescriptors - ); - - if (!pendingRequests.isEmpty()) { - retryCount++; - if (retryCount > maxRetries) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Unable to fetch results from dataservers in [%d] retries.", retryCount); - } - } - } - - return new DataServerQueryResult<>(yielders, handedOffSegments, dataSource); - } - - private Yielder fetchRowsFromDataServerInternal( - final DataServerRequestDescriptor requestDescriptor, - final ResponseContext responseContext, - final Closer closer, - final Query query, - final Function, Sequence> mappingFunction - ) - { - final ServiceLocation serviceLocation = ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata()); - final DataServerClient dataServerClient = makeDataServerClient(serviceLocation); - final QueryToolChest> toolChest = warehouse.getToolChest(query); - final Function preComputeManipulatorFn = - toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing()); - final JavaType queryResultType = toolChest.getBaseResultType(); - final List segmentDescriptors = requestDescriptor.getSegments() - .stream() - .map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval) - .collect(Collectors.toList()); - - try { - return RetryUtils.retry( - () -> closer.register(createYielder( - dataServerClient.run( - Queries.withSpecificSegments( - query, - requestDescriptor.getSegments() - .stream() - .map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval) - .collect(Collectors.toList()) - ), responseContext, queryResultType, closer).map(preComputeManipulatorFn), mappingFunction)), - throwable -> !(throwable instanceof QueryInterruptedException - && throwable.getCause() instanceof InterruptedException), - PER_SERVER_QUERY_NUM_TRIES - ); - } - catch (QueryInterruptedException e) { - if (e.getCause() instanceof RpcException) { - // In the case that all the realtime servers for a segment are gone (for example, if they were scaled down), - // we would also be unable to fetch the segment. - responseContext.addMissingSegments(segmentDescriptors); - return Yielders.each(Sequences.empty()); - } else { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation); - } - } - catch (Exception e) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation); - } - } - - private Yielder createYielder( - final Sequence sequence, - final Function, Sequence> mappingFunction - ) - { - return Yielders.each( - mappingFunction.apply(sequence) - .map(row -> { - channelCounters.incrementRowCount(); - return row; - }) - ); - } - - private List createNextPendingRequests( - final Set richSegmentDescriptors - ) - { - final Map> serverVsSegmentsMap = new HashMap<>(); - - Iterable immutableSegmentLoadInfos = - coordinatorClient.fetchServerViewSegments( - dataSource, - richSegmentDescriptors.stream().map(RichSegmentDescriptor::getFullInterval).collect(Collectors.toList()) - ); - - Map segmentVsServerMap = new HashMap<>(); - immutableSegmentLoadInfos.forEach(immutableSegmentLoadInfo -> { - segmentVsServerMap.put(immutableSegmentLoadInfo.getSegment().toDescriptor(), immutableSegmentLoadInfo); - }); - - for (RichSegmentDescriptor richSegmentDescriptor : richSegmentDescriptors) { - SegmentDescriptor segmentDescriptorWithFullInterval = toSegmentDescriptorWithFullInterval(richSegmentDescriptor); - if (!segmentVsServerMap.containsKey(segmentDescriptorWithFullInterval)) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Could not find a server for segment[%s]", richSegmentDescriptor); - } - - ImmutableSegmentLoadInfo segmentLoadInfo = segmentVsServerMap.get(segmentDescriptorWithFullInterval); - if (segmentLoadInfo.getSegment().toDescriptor().equals(segmentDescriptorWithFullInterval)) { - Set servers = segmentLoadInfo.getServers() - .stream() - .filter(druidServerMetadata -> SegmentSource.REALTIME.getUsedServerTypes() - .contains(druidServerMetadata.getType())) - .collect(Collectors.toSet()); - if (servers.isEmpty()) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Could not find a server matching includeSegmentSource[%s] for segment[%s]. Only found servers [%s]", - SegmentSource.REALTIME, richSegmentDescriptor, servers); - } - - DruidServerMetadata druidServerMetadata = DataServerSelector.RANDOM.getSelectServerFunction().apply(servers); - serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>()); - SegmentDescriptor descriptor = segmentLoadInfo.getSegment().toDescriptor(); - serverVsSegmentsMap.get(druidServerMetadata) - .add(new RichSegmentDescriptor(richSegmentDescriptor.getFullInterval(), richSegmentDescriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber())); - } - } - - final List requestDescriptors = new ArrayList<>(); - for (Map.Entry> druidServerMetadataSetEntry : serverVsSegmentsMap.entrySet()) { - DataServerRequestDescriptor dataServerRequest = new DataServerRequestDescriptor( - druidServerMetadataSetEntry.getKey(), - ImmutableList.copyOf(druidServerMetadataSetEntry.getValue()) - ); - requestDescriptors.add(dataServerRequest); - } - - return requestDescriptors; - } - - /** - * Retreives the list of missing segments from the response context. - */ - private static List getMissingSegments(final ResponseContext responseContext) - { - List missingSegments = responseContext.getMissingSegments(); - if (missingSegments == null) { - return ImmutableList.of(); - } - return missingSegments; - } - - /** - * Queries the coordinator to check if a list of segments has been handed off. - * Returns a list of segments which have been handed off. - *
- * See {@link org.apache.druid.server.http.DataSourcesResource#isHandOffComplete(String, String, int, String)} - */ - private List checkSegmentHandoff(List segmentDescriptors) - { - try { - List handedOffSegments = new ArrayList<>(); - - for (SegmentDescriptor segmentDescriptor : segmentDescriptors) { - Boolean wasHandedOff = FutureUtils.get( - coordinatorClient.isHandoffComplete(dataSource, segmentDescriptor), - true - ); - if (Boolean.TRUE.equals(wasHandedOff)) { - handedOffSegments.add(segmentDescriptor); - } - } - return handedOffSegments; - } - catch (Exception e) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(e, "Could not contact coordinator"); - } - } - - static SegmentDescriptor toSegmentDescriptorWithFullInterval(RichSegmentDescriptor richSegmentDescriptor) - { - return new SegmentDescriptor( - richSegmentDescriptor.getFullInterval(), - richSegmentDescriptor.getVersion(), - richSegmentDescriptor.getPartitionNumber() - ); - } + ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java index 1caed919ef04..245c078a1c83 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java @@ -19,79 +19,17 @@ package org.apache.druid.msq.exec; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.client.coordinator.CoordinatorClient; -import org.apache.druid.java.util.common.RE; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.input.table.DataServerRequestDescriptor; -import org.apache.druid.query.QueryToolChestWarehouse; -import org.apache.druid.rpc.ServiceClientFactory; - -import java.io.Closeable; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; /** * Creates new instances of {@link DataServerQueryHandler} and manages the cancellation threadpool. */ -public class DataServerQueryHandlerFactory implements Closeable +public interface DataServerQueryHandlerFactory { - private static final Logger log = new Logger(DataServerQueryHandlerFactory.class); - private static final int DEFAULT_THREAD_COUNT = 4; - private final CoordinatorClient coordinatorClient; - private final ServiceClientFactory serviceClientFactory; - private final ObjectMapper objectMapper; - private final QueryToolChestWarehouse warehouse; - private final ScheduledExecutorService queryCancellationExecutor; - - public DataServerQueryHandlerFactory( - CoordinatorClient coordinatorClient, - ServiceClientFactory serviceClientFactory, - ObjectMapper objectMapper, - QueryToolChestWarehouse warehouse - ) - { - this.coordinatorClient = coordinatorClient; - this.serviceClientFactory = serviceClientFactory; - this.objectMapper = objectMapper; - this.warehouse = warehouse; - this.queryCancellationExecutor = ScheduledExecutors.fixed(DEFAULT_THREAD_COUNT, "query-cancellation-executor"); - } - - public DataServerQueryHandler createDataServerQueryHandler( + DataServerQueryHandler createDataServerQueryHandler( String dataSource, ChannelCounters channelCounters, DataServerRequestDescriptor dataServerRequestDescriptor - ) - { - return new DataServerQueryHandler( - dataSource, - channelCounters, - serviceClientFactory, - coordinatorClient, - objectMapper, - warehouse, - queryCancellationExecutor, - dataServerRequestDescriptor - ); - } - - @Override - public void close() - { - // Wait for all query cancellations to be complete. - log.info("Waiting for any data server queries to be canceled."); - queryCancellationExecutor.shutdown(); - try { - if (!queryCancellationExecutor.awaitTermination(1, TimeUnit.MINUTES)) { - log.error("Unable to cancel all ongoing queries."); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RE(e); - } - } + ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerUtils.java new file mode 100644 index 000000000000..ce63f8100e1f --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerUtils.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.discovery.DataServerClient; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.query.Queries; +import org.apache.druid.query.Query; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.context.ResponseContext; + +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** + * Static utility functions for {@link DataServerQueryHandler} implementations. + */ +public class DataServerQueryHandlerUtils +{ + private DataServerQueryHandlerUtils() + { + // No instantiation. + } + + /** + * Performs necessary transforms to a query destined for data servers. Does not update the list of segments; callers + * should do this themselves using {@link Queries#withSpecificSegments(Query, List)}. + * + * @param query the query + * @param dataSource datasource name + */ + public static > Query prepareQuery(final T query, final String dataSource) + { + // MSQ changes the datasource to an inputNumber datasource. This needs to be changed back for data servers + // to understand. + + // BUG: This transformation is incorrect; see https://github.com/apache/druid/issues/18198. It loses decorations + // such as join, unnest, etc. + return query.withDataSource(new TableDataSource(dataSource)); + } + + /** + * Given results from {@link DataServerClient#run}, returns a {@link Yielder} that applies the provided + * mapping function and increments the row count on the provided {@link ChannelCounters}. + */ + public static Yielder createYielder( + final Sequence sequence, + final Function, Sequence> mappingFunction, + final ChannelCounters channelCounters + ) + { + return Yielders.each( + mappingFunction.apply(sequence) + .map(row -> { + channelCounters.incrementRowCount(); + return row; + }) + ); + } + + /** + * Retreives the list of missing segments from the response context. + */ + public static List getMissingSegments(final ResponseContext responseContext) + { + List missingSegments = responseContext.getMissingSegments(); + if (missingSegments == null) { + return Collections.emptyList(); + } + return missingSegments; + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index e7131411556b..66d7fb1cc232 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -253,7 +253,6 @@ private Optional runInternal(final KernelHolders kernelHolders, throws Exception { context.registerWorker(this, workerCloser); - workerCloser.register(context.dataServerQueryHandlerFactory()); this.workerClient = workerCloser.register(new ExceptionWrappingWorkerClient(context.makeWorkerClient())); final FrameProcessorExecutor workerExec = new FrameProcessorExecutor(makeProcessingPool()); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java new file mode 100644 index 000000000000..b8b2afd51370 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.ImmutableSegmentLoadInfo; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.discovery.DataServerClient; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.exec.DataServerQueryHandler; +import org.apache.druid.msq.exec.DataServerQueryHandlerUtils; +import org.apache.druid.msq.exec.DataServerQueryResult; +import org.apache.druid.msq.exec.SegmentSource; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; +import org.apache.druid.msq.input.table.DataServerSelector; +import org.apache.druid.msq.input.table.RichSegmentDescriptor; +import org.apache.druid.query.Queries; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryInterruptedException; +import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.QueryToolChestWarehouse; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.aggregation.MetricManipulatorFns; +import org.apache.druid.query.context.DefaultResponseContext; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.rpc.RpcException; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.server.coordination.DruidServerMetadata; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Task implementation of {@link DataServerQueryHandler}. Implements retry logic as described in + * {@link #fetchRowsFromDataServer(Query, Function, Closer)}. + */ +public class IndexerDataServerQueryHandler implements DataServerQueryHandler +{ + private static final Logger log = new Logger(IndexerDataServerQueryHandler.class); + private static final int DEFAULT_NUM_TRIES = 3; + private static final int PER_SERVER_QUERY_NUM_TRIES = 5; + private final String dataSource; + private final ChannelCounters channelCounters; + private final ServiceClientFactory serviceClientFactory; + private final CoordinatorClient coordinatorClient; + private final ObjectMapper objectMapper; + private final QueryToolChestWarehouse warehouse; + private final DataServerRequestDescriptor dataServerRequestDescriptor; + + public IndexerDataServerQueryHandler( + String dataSource, + ChannelCounters channelCounters, + ServiceClientFactory serviceClientFactory, + CoordinatorClient coordinatorClient, + ObjectMapper objectMapper, + QueryToolChestWarehouse warehouse, + DataServerRequestDescriptor dataServerRequestDescriptor + ) + { + this.dataSource = dataSource; + this.channelCounters = channelCounters; + this.serviceClientFactory = serviceClientFactory; + this.coordinatorClient = coordinatorClient; + this.objectMapper = objectMapper; + this.warehouse = warehouse; + this.dataServerRequestDescriptor = dataServerRequestDescriptor; + } + + @VisibleForTesting + DataServerClient makeDataServerClient(ServiceLocation serviceLocation) + { + return new DataServerClient(serviceClientFactory, serviceLocation, objectMapper); + } + + /** + * {@inheritDoc} + * + * This method blocks until servers start sending their results back. When the method returns, the future is + * immediately resolved. This means that on tasks, queries to realtime servers block the processing pool. + * In principle, however, it should be possible to make the logic in this class asynchronous. + * + * Queries are retried if needed. If a data server indicates that some segments were not found, this function checks + * with the coordinator to see if the segment was handed off. + *
    + *
  • If all the segments were handed off, returns a {@link DataServerQueryResult} with the yielder and list of + * handed-off segments.
  • + *
  • If some segments were not handed off, checks with the coordinator to fetch an updated list of servers. + * This step is repeated up to {@link #DEFAULT_NUM_TRIES} times.
  • + *
  • If the servers could not be found, checks if the segment was handed-off. If it was, returns a + * {@link DataServerQueryResult} with the yielder and list of handed off segments. Otherwise, + * throws an exception.
  • + *
+ * + * @param result return type for the query from the data server + * @param type of the result rows after parsing from QueryType object + */ + @Override + public ListenableFuture> fetchRowsFromDataServer( + Query query, + Function, Sequence> mappingFunction, + Closer closer + ) + { + final Query preparedQuery = DataServerQueryHandlerUtils.prepareQuery(query, dataSource); + final List> yielders = new ArrayList<>(); + final List handedOffSegments = new ArrayList<>(); + + List pendingRequests = ImmutableList.of(dataServerRequestDescriptor); + + final int maxRetries = preparedQuery.context().getNumRetriesOnMissingSegments(DEFAULT_NUM_TRIES); + int retryCount = 0; + + while (!pendingRequests.isEmpty()) { + final ResponseContext responseContext = new DefaultResponseContext(); + final Set processedSegments = new HashSet<>(); + for (DataServerRequestDescriptor descriptor : pendingRequests) { + log.info("Querying server [%s] for segments[%s]", descriptor.getServerMetadata(), descriptor.getSegments()); + processedSegments.addAll(descriptor.getSegments()); + Yielder yielder = fetchRowsFromDataServerInternal( + descriptor, + responseContext, + closer, + preparedQuery, + mappingFunction + ); + + // Add results + if (yielder != null && !yielder.isDone()) { + yielders.add(yielder); + } + } + + // Check for missing segments + List missingSegments = DataServerQueryHandlerUtils.getMissingSegments(responseContext); + if (missingSegments.isEmpty()) { + // No segments remaining. + break; + } + + final List handedOffSegmentDescriptors = checkSegmentHandoff(missingSegments); + + Set missingRichSegmentDescriptors = new HashSet<>(); + for (RichSegmentDescriptor richSegmentDescriptor : processedSegments) { + SegmentDescriptor segmentDescriptor = toSegmentDescriptorWithFullInterval(richSegmentDescriptor); + if (missingSegments.contains(segmentDescriptor)) { + if (handedOffSegmentDescriptors.contains(segmentDescriptor)) { + handedOffSegments.add(richSegmentDescriptor); + } else { + missingRichSegmentDescriptors.add(richSegmentDescriptor); + } + } + } + + pendingRequests = createNextPendingRequests( + missingRichSegmentDescriptors + ); + + if (!pendingRequests.isEmpty()) { + retryCount++; + if (retryCount > maxRetries) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Unable to fetch results from dataservers in [%d] retries.", retryCount); + } + } + } + + // Not actually async. The retry logic above is written in synchronous fashion. Just return an immediate-future + // when we actually have all queries issued and all yielders set up. + return Futures.immediateFuture(new DataServerQueryResult<>(yielders, handedOffSegments, dataSource)); + } + + private Yielder fetchRowsFromDataServerInternal( + final DataServerRequestDescriptor requestDescriptor, + final ResponseContext responseContext, + final Closer closer, + final Query query, + final Function, Sequence> mappingFunction + ) + { + final ServiceLocation serviceLocation = ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata()); + final DataServerClient dataServerClient = makeDataServerClient(serviceLocation); + final QueryToolChest> toolChest = warehouse.getToolChest(query); + final Function preComputeManipulatorFn = + toolChest.makePreComputeManipulatorFn(query, MetricManipulatorFns.deserializing()); + final JavaType queryResultType = toolChest.getBaseResultType(); + final List segmentDescriptors = + requestDescriptor.getSegments() + .stream() + .map(IndexerDataServerQueryHandler::toSegmentDescriptorWithFullInterval) + .collect(Collectors.toList()); + + try { + return RetryUtils.retry( + () -> { + final ListenableFuture> queryFuture = dataServerClient.run( + Queries.withSpecificSegments( + query, + requestDescriptor.getSegments() + .stream() + .map(RichSegmentDescriptor::toPlainDescriptor) + .collect(Collectors.toList()) + ), + responseContext, + queryResultType, + closer + ); + + return closer.register( + DataServerQueryHandlerUtils.createYielder( + queryFuture.get().map(preComputeManipulatorFn), + mappingFunction, + channelCounters + ) + ); + }, + throwable -> !(throwable instanceof ExecutionException + && throwable.getCause() instanceof QueryInterruptedException + && throwable.getCause().getCause() instanceof InterruptedException), + PER_SERVER_QUERY_NUM_TRIES + ); + } + catch (ExecutionException e) { + if (e.getCause() instanceof QueryInterruptedException && e.getCause().getCause() instanceof RpcException) { + // In the case that all the realtime servers for a segment are gone (for example, if they were scaled down), + // we would also be unable to fetch the segment. + responseContext.addMissingSegments(segmentDescriptors); + return Yielders.each(Sequences.empty()); + } else { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation); + } + } + catch (Exception e) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Exception while fetching rows for query from dataservers[%s]", serviceLocation); + } + } + + private List createNextPendingRequests( + final Set richSegmentDescriptors + ) + { + final Map> serverVsSegmentsMap = new HashMap<>(); + + Iterable immutableSegmentLoadInfos = + coordinatorClient.fetchServerViewSegments( + dataSource, + richSegmentDescriptors.stream().map(RichSegmentDescriptor::getFullInterval).collect(Collectors.toList()) + ); + + Map segmentVsServerMap = new HashMap<>(); + immutableSegmentLoadInfos.forEach(immutableSegmentLoadInfo -> { + segmentVsServerMap.put(immutableSegmentLoadInfo.getSegment().toDescriptor(), immutableSegmentLoadInfo); + }); + + for (RichSegmentDescriptor richSegmentDescriptor : richSegmentDescriptors) { + SegmentDescriptor segmentDescriptorWithFullInterval = toSegmentDescriptorWithFullInterval(richSegmentDescriptor); + if (!segmentVsServerMap.containsKey(segmentDescriptorWithFullInterval)) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Could not find a server for segment[%s]", richSegmentDescriptor); + } + + ImmutableSegmentLoadInfo segmentLoadInfo = segmentVsServerMap.get(segmentDescriptorWithFullInterval); + if (segmentLoadInfo.getSegment().toDescriptor().equals(segmentDescriptorWithFullInterval)) { + Set servers = segmentLoadInfo.getServers() + .stream() + .filter(druidServerMetadata -> SegmentSource.REALTIME.getUsedServerTypes() + .contains( + druidServerMetadata.getType())) + .collect(Collectors.toSet()); + if (servers.isEmpty()) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build( + "Could not find a server matching includeSegmentSource[%s] for segment[%s]. Only found servers [%s]", + SegmentSource.REALTIME, richSegmentDescriptor, servers + ); + } + + DruidServerMetadata druidServerMetadata = DataServerSelector.RANDOM.getSelectServerFunction().apply(servers); + serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>()); + SegmentDescriptor descriptor = segmentLoadInfo.getSegment().toDescriptor(); + serverVsSegmentsMap.get(druidServerMetadata) + .add(new RichSegmentDescriptor( + richSegmentDescriptor.getFullInterval(), + richSegmentDescriptor.getInterval(), + descriptor.getVersion(), + descriptor.getPartitionNumber() + )); + } + } + + final List requestDescriptors = new ArrayList<>(); + for (Map.Entry> druidServerMetadataSetEntry : serverVsSegmentsMap.entrySet()) { + DataServerRequestDescriptor dataServerRequest = new DataServerRequestDescriptor( + druidServerMetadataSetEntry.getKey(), + ImmutableList.copyOf(druidServerMetadataSetEntry.getValue()) + ); + requestDescriptors.add(dataServerRequest); + } + + return requestDescriptors; + } + + /** + * Queries the coordinator to check if a list of segments has been handed off. + * Returns a list of segments which have been handed off. + *
+ * See {@link org.apache.druid.server.http.DataSourcesResource#isHandOffComplete(String, String, int, String)} + */ + private List checkSegmentHandoff(List segmentDescriptors) + { + try { + List handedOffSegments = new ArrayList<>(); + + for (SegmentDescriptor segmentDescriptor : segmentDescriptors) { + Boolean wasHandedOff = FutureUtils.get( + coordinatorClient.isHandoffComplete(dataSource, segmentDescriptor), + true + ); + if (Boolean.TRUE.equals(wasHandedOff)) { + handedOffSegments.add(segmentDescriptor); + } + } + return handedOffSegments; + } + catch (Exception e) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Could not contact coordinator"); + } + } + + static SegmentDescriptor toSegmentDescriptorWithFullInterval(RichSegmentDescriptor richSegmentDescriptor) + { + return new SegmentDescriptor( + richSegmentDescriptor.getFullInterval(), + richSegmentDescriptor.getVersion(), + richSegmentDescriptor.getPartitionNumber() + ); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java new file mode 100644 index 000000000000..46e06a5484b9 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.exec.DataServerQueryHandlerFactory; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; +import org.apache.druid.query.QueryToolChestWarehouse; +import org.apache.druid.rpc.ServiceClientFactory; + +/** + * Creates new instances of {@link IndexerDataServerQueryHandler}. + */ +public class IndexerDataServerQueryHandlerFactory implements DataServerQueryHandlerFactory +{ + private final CoordinatorClient coordinatorClient; + private final ServiceClientFactory serviceClientFactory; + private final ObjectMapper objectMapper; + private final QueryToolChestWarehouse warehouse; + + public IndexerDataServerQueryHandlerFactory( + CoordinatorClient coordinatorClient, + ServiceClientFactory serviceClientFactory, + ObjectMapper objectMapper, + QueryToolChestWarehouse warehouse + ) + { + this.coordinatorClient = coordinatorClient; + this.serviceClientFactory = serviceClientFactory; + this.objectMapper = objectMapper; + this.warehouse = warehouse; + } + + @Override + public IndexerDataServerQueryHandler createDataServerQueryHandler( + String dataSource, + ChannelCounters channelCounters, + DataServerRequestDescriptor requestDescriptor + ) + { + return new IndexerDataServerQueryHandler( + dataSource, + channelCounters, + serviceClientFactory, + coordinatorClient, + objectMapper, + warehouse, + requestDescriptor + ); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java index 9e2c9d410742..368709b7eff3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java @@ -50,7 +50,7 @@ public class IndexerFrameContext implements FrameContext private final ResourceHolder processingBuffers; private final WorkerMemoryParameters memoryParameters; private final WorkerStorageParameters storageParameters; - private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory; + private final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory; public IndexerFrameContext( StageId stageId, @@ -59,7 +59,7 @@ public IndexerFrameContext( IndexIO indexIO, DataSegmentProvider dataSegmentProvider, ResourceHolder processingBuffers, - DataServerQueryHandlerFactory dataServerQueryHandlerFactory, + IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory, WorkerMemoryParameters memoryParameters, WorkerStorageParameters storageParameters ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 035544401f14..6c55a6add951 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -77,7 +77,7 @@ public class IndexerWorkerContext implements WorkerContext private final ServiceLocator controllerLocator; private final IndexIO indexIO; private final TaskDataSegmentProvider dataSegmentProvider; - private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory; + private final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory; private final ServiceClientFactory clientFactory; private final MemoryIntrospector memoryIntrospector; private final ProcessingBuffersProvider processingBuffersProvider; @@ -98,7 +98,7 @@ public IndexerWorkerContext( final ServiceClientFactory clientFactory, final MemoryIntrospector memoryIntrospector, final ProcessingBuffersProvider processingBuffersProvider, - final DataServerQueryHandlerFactory dataServerQueryHandlerFactory + final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory ) { this.task = task; @@ -158,7 +158,7 @@ public static IndexerWorkerContext createProductionInstance( serviceClientFactory, memoryIntrospector, processingBuffersProvider, - new DataServerQueryHandlerFactory( + new IndexerDataServerQueryHandlerFactory( toolbox.getCoordinatorClient(), serviceClientFactory, smileMapper, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java index 27f5202b6ce2..869464df04da 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java @@ -93,6 +93,14 @@ public Interval getFullIntervalForJson() return fullInterval; } + /** + * Returns a plain descriptor, with the {@link #fullInterval} field dropped. + */ + public SegmentDescriptor toPlainDescriptor() + { + return new SegmentDescriptor(getInterval(), getVersion(), getPartitionNumber()); + } + @Override public boolean equals(Object o) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java index db1259f21e50..879386e87071 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java @@ -20,8 +20,10 @@ package org.apache.druid.msq.querykit.groupby; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.frame.Frame; import org.apache.druid.frame.channel.FrameWithPartition; import org.apache.druid.frame.channel.ReadableFrameChannel; @@ -63,6 +65,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.function.Function; @@ -85,6 +88,7 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed private SegmentsInputSlice handedOffSegments = null; private Yielder> currentResultsYielder; + private ListenableFuture> dataServerQueryResultFuture; public GroupByPreShuffleFrameProcessor( final GroupByQuery query, @@ -117,12 +121,23 @@ protected ReturnOrAwait runWithDataServerQuery(DataServerQue { if (resultYielder == null || resultYielder.isDone()) { if (currentResultsYielder == null) { + if (dataServerQueryResultFuture == null) { + dataServerQueryResultFuture = + dataServerQueryHandler.fetchRowsFromDataServer( + groupingEngine.prepareGroupByQuery(query), + Function.identity(), + closer + ); + + // Give up the processing thread while we wait for the query to finish. This is only really asynchronous + // with Dart. On tasks, the IndexerDataServerQueryHandler does not return from fetchRowsFromDataServer until + // the response has started to come back. + return ReturnOrAwait.awaitAllFutures(Collections.singletonList(dataServerQueryResultFuture)); + } + final DataServerQueryResult dataServerQueryResult = - dataServerQueryHandler.fetchRowsFromDataServer( - groupingEngine.prepareGroupByQuery(query), - Function.identity(), - closer - ); + FutureUtils.getUncheckedImmediately(dataServerQueryResultFuture); + dataServerQueryResultFuture = null; handedOffSegments = dataServerQueryResult.getHandedOffSegments(); if (!handedOffSegments.getDescriptors().isEmpty()) { log.info( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index c90aec81af10..73c7917ed620 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -23,8 +23,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ListenableFuture; import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.channel.FrameWithPartition; @@ -85,6 +87,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -104,6 +107,7 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor private final Closer closer = Closer.create(); private Cursor cursor; + private ListenableFuture> dataServerQueryResultFuture; private Closeable cursorCloser; private Segment segment; private final SimpleSettableOffset cursorOffset = new SimpleAscendingOffset(Integer.MAX_VALUE); @@ -196,12 +200,24 @@ protected ReturnOrAwait runWithDataServerQuery(final DataSer { if (cursor == null) { ScanQuery preparedQuery = prepareScanQueryForDataServer(query); + + if (dataServerQueryResultFuture == null) { + dataServerQueryResultFuture = + dataServerQueryHandler.fetchRowsFromDataServer( + preparedQuery, + ScanQueryFrameProcessor::mappingFunction, + closer + ); + + // Give up the processing thread while we wait for the query to finish. This is only really asynchronous + // with Dart. On tasks, the IndexerDataServerQueryHandler does not return from fetchRowsFromDataServer until + // the response has started to come back. + return ReturnOrAwait.awaitAllFutures(Collections.singletonList(dataServerQueryResultFuture)); + } + final DataServerQueryResult dataServerQueryResult = - dataServerQueryHandler.fetchRowsFromDataServer( - preparedQuery, - ScanQueryFrameProcessor::mappingFunction, - closer - ); + FutureUtils.getUncheckedImmediately(dataServerQueryResultFuture); + dataServerQueryResultFuture = null; handedOffSegments = dataServerQueryResult.getHandedOffSegments(); if (!handedOffSegments.getDescriptors().isEmpty()) { log.info( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java index 07e3854ca068..cb4a75d7b1c8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -103,18 +104,20 @@ public void testSelectWithLoadedSegmentsOnFoo() .build(); doReturn( - new DataServerQueryResult<>( - ImmutableList.of( - Yielders.each( - Sequences.simple( - ImmutableList.of( - new Object[]{1L, "qwe"}, - new Object[]{1L, "tyu"} + Futures.immediateFuture( + new DataServerQueryResult<>( + ImmutableList.of( + Yielders.each( + Sequences.simple( + ImmutableList.of( + new Object[]{1L, "qwe"}, + new Object[]{1L, "tyu"} + ) ) - ) - )), - ImmutableList.of(), - "foo" + )), + ImmutableList.of(), + "foo" + ) )).when(dataServerQueryHandler) .fetchRowsFromDataServer(any(), any(), any()); @@ -164,18 +167,20 @@ public void testSelectWithLoadedSegmentsOnFooWithOrderBy() ScanQuery query = invocationOnMock.getArgument(0); ScanQuery.verifyOrderByForNativeExecution(query); Assert.assertEquals(Long.MAX_VALUE, query.getScanRowsLimit()); - return new DataServerQueryResult<>( - ImmutableList.of( - Yielders.each( - Sequences.simple( - ImmutableList.of( - new Object[]{1L, "qwe"}, - new Object[]{1L, "tyu"} + return Futures.immediateFuture( + new DataServerQueryResult<>( + ImmutableList.of( + Yielders.each( + Sequences.simple( + ImmutableList.of( + new Object[]{1L, "qwe"}, + new Object[]{1L, "tyu"} + ) ) - ) - )), - ImmutableList.of(), - "foo" + )), + ImmutableList.of(), + "foo" + ) ); } ) @@ -225,17 +230,19 @@ public void testGroupByWithLoadedSegmentsOnFoo() .build(); doReturn( - new DataServerQueryResult<>( - ImmutableList.of( - Yielders.each( - Sequences.simple( - ImmutableList.of( - ResultRow.of(1L, 2L) + Futures.immediateFuture( + new DataServerQueryResult<>( + ImmutableList.of( + Yielders.each( + Sequences.simple( + ImmutableList.of( + ResultRow.of(1L, 2L) + ) ) - ) - )), - ImmutableList.of(), - "foo" + )), + ImmutableList.of(), + "foo" + ) ) ) .when(dataServerQueryHandler) @@ -285,17 +292,19 @@ public void testGroupByWithOnlyLoadedSegmentsOnFoo() .build(); doReturn( - new DataServerQueryResult<>( - ImmutableList.of( - Yielders.each( - Sequences.simple( - ImmutableList.of( - ResultRow.of(1L, 2L) + Futures.immediateFuture( + new DataServerQueryResult<>( + ImmutableList.of( + Yielders.each( + Sequences.simple( + ImmutableList.of( + ResultRow.of(1L, 2L) + ) ) - ) - )), - ImmutableList.of(), - "foo" + )), + ImmutableList.of(), + "foo" + ) ) ) .when(dataServerQueryHandler) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java similarity index 78% rename from extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java rename to extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java index f04a65f89a84..2285e49b7fce 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.msq.exec; +package org.apache.druid.msq.indexing; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -29,11 +29,12 @@ import org.apache.druid.discovery.DruidServiceTestUtils; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.exec.DataServerQueryResult; +import org.apache.druid.msq.exec.SegmentSource; import org.apache.druid.msq.input.table.DataServerRequestDescriptor; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.querykit.InputNumberDataSource; @@ -65,20 +66,19 @@ import org.mockito.junit.MockitoJUnitRunner; import java.util.List; +import java.util.concurrent.ExecutionException; -import static org.apache.druid.msq.exec.DataServerQueryHandler.toSegmentDescriptorWithFullInterval; import static org.apache.druid.query.Druids.newScanQueryBuilder; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @RunWith(MockitoJUnitRunner.class) -public class DataServerQueryHandlerTest +public class IndexerDataServerQueryHandlerTest { private static final String DATASOURCE1 = "dataSource1"; private static final DruidServerMetadata DRUID_SERVER_1 = new DruidServerMetadata( @@ -115,7 +115,7 @@ public class DataServerQueryHandlerTest private DataServerClient dataServerClient2; private CoordinatorClient coordinatorClient; private ScanQuery query; - private DataServerQueryHandler target; + private IndexerDataServerQueryHandler target; @Before public void setUp() @@ -136,14 +136,13 @@ public void setUp() .build() ); target = spy( - new DataServerQueryHandler( + new IndexerDataServerQueryHandler( DATASOURCE1, new ChannelCounters(), mock(ServiceClientFactory.class), coordinatorClient, DruidServiceTestUtils.newJsonMapper(), queryToolChestWarehouse, - Execs.scheduledSingleThreaded("query-cancellation-executor"), new DataServerRequestDescriptor(DRUID_SERVER_1, ImmutableList.of(SEGMENT_1, SEGMENT_2)) ) ); @@ -160,7 +159,7 @@ public void setUp() } @Test - public void testFetchRowsFromServer() + public void testFetchRowsFromServer() throws ExecutionException, InterruptedException { ScanResultValue scanResultValue = new ScanResultValue( null, @@ -172,13 +171,14 @@ public void testFetchRowsFromServer() ) ); - doReturn(Sequences.simple(ImmutableList.of(scanResultValue))).when(dataServerClient1).run(any(), any(), any(), any()); + doReturn(Futures.immediateFuture(Sequences.simple(ImmutableList.of(scanResultValue)))) + .when(dataServerClient1).run(any(), any(), any(), any()); DataServerQueryResult dataServerQueryResult = target.fetchRowsFromDataServer( query, ScanQueryFrameProcessor::mappingFunction, Closer.create() - ); + ).get(); Assert.assertTrue(dataServerQueryResult.getHandedOffSegments().getDescriptors().isEmpty()); List> events = (List>) scanResultValue.getEvents(); @@ -192,7 +192,7 @@ public void testFetchRowsFromServer() } @Test - public void testOneSegmentRelocated() + public void testOneSegmentRelocated() throws ExecutionException, InterruptedException { ScanResultValue scanResultValue1 = new ScanResultValue( null, @@ -207,10 +207,10 @@ public void testOneSegmentRelocated() ResponseContext responseContext = invocation.getArgument(1); responseContext.addMissingSegments( ImmutableList.of( - toSegmentDescriptorWithFullInterval(SEGMENT_2) + IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2) ) ); - return Sequences.simple(ImmutableList.of(scanResultValue1)); + return Futures.immediateFuture(Sequences.simple(ImmutableList.of(scanResultValue1))); }).when(dataServerClient1).run(any(), any(), any(), any()); ScanResultValue scanResultValue2 = new ScanResultValue( @@ -222,9 +222,12 @@ public void testOneSegmentRelocated() ) ); - doReturn(Sequences.simple(ImmutableList.of(scanResultValue2))).when(dataServerClient2).run(any(), any(), any(), any()); + doReturn(Futures.immediateFuture(Sequences.simple(ImmutableList.of(scanResultValue2)))) + .when(dataServerClient2).run(any(), any(), any(), any()); - doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_2)); + doReturn(Futures.immediateFuture(Boolean.FALSE)) + .when(coordinatorClient) + .isHandoffComplete(DATASOURCE1, IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2)); doReturn(ImmutableList.of( new ImmutableSegmentLoadInfo( DataSegment.builder() @@ -241,7 +244,7 @@ public void testOneSegmentRelocated() query, ScanQueryFrameProcessor::mappingFunction, Closer.create() - ); + ).get(); Assert.assertTrue(dataServerQueryResult.getHandedOffSegments().getDescriptors().isEmpty()); @@ -263,26 +266,30 @@ public void testOneSegmentRelocated() } @Test - public void testHandoff() + public void testHandoff() throws ExecutionException, InterruptedException { doAnswer(invocation -> { ResponseContext responseContext = invocation.getArgument(1); responseContext.addMissingSegments( ImmutableList.of( - toSegmentDescriptorWithFullInterval(SEGMENT_1), - toSegmentDescriptorWithFullInterval(SEGMENT_2) + IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1), + IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2) ) ); - return Sequences.empty(); + return Futures.immediateFuture(Sequences.empty()); }).when(dataServerClient1).run(any(), any(), any(), any()); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_1)); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_2)); + doReturn(Futures.immediateFuture(Boolean.TRUE)) + .when(coordinatorClient) + .isHandoffComplete(DATASOURCE1, IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1)); + doReturn(Futures.immediateFuture(Boolean.TRUE)) + .when(coordinatorClient) + .isHandoffComplete(DATASOURCE1, IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2)); DataServerQueryResult dataServerQueryResult = target.fetchRowsFromDataServer( query, ScanQueryFrameProcessor::mappingFunction, Closer.create() - ); + ).get(); Assert.assertEquals(ImmutableList.of(SEGMENT_1, SEGMENT_2), dataServerQueryResult.getHandedOffSegments().getDescriptors()); Assert.assertTrue(dataServerQueryResult.getResultsYielders().isEmpty()); @@ -291,13 +298,16 @@ public void testHandoff() @Test public void testServerNotFoundWithoutHandoffShouldThrowException() { - doThrow( - new QueryInterruptedException(new RpcException("Could not connect to server")) + doReturn( + Futures.immediateFailedFuture(new QueryInterruptedException(new RpcException("Could not connect to server"))) ).when(dataServerClient1).run(any(), any(), any(), any()); - doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_1)); + doReturn(Futures.immediateFuture(Boolean.FALSE)) + .when(coordinatorClient) + .isHandoffComplete(DATASOURCE1, IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1)); - ScanQuery queryWithRetry = query.withOverriddenContext(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 3)); + ScanQuery queryWithRetry = + query.withOverriddenContext(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 3)); Assert.assertThrows(DruidException.class, () -> target.fetchRowsFromDataServer( @@ -311,20 +321,24 @@ public void testServerNotFoundWithoutHandoffShouldThrowException() } @Test - public void testServerNotFoundButHandoffShouldReturnWithStatus() + public void testServerNotFoundButHandoffShouldReturnWithStatus() throws ExecutionException, InterruptedException { - doThrow( - new QueryInterruptedException(new RpcException("Could not connect to server")) + doReturn( + Futures.immediateFailedFuture(new QueryInterruptedException(new RpcException("Could not connect to server"))) ).when(dataServerClient1).run(any(), any(), any(), any()); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_1)); - doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, toSegmentDescriptorWithFullInterval(SEGMENT_2)); + doReturn(Futures.immediateFuture(Boolean.TRUE)) + .when(coordinatorClient) + .isHandoffComplete(DATASOURCE1, IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1)); + doReturn(Futures.immediateFuture(Boolean.TRUE)) + .when(coordinatorClient) + .isHandoffComplete(DATASOURCE1, IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2)); DataServerQueryResult dataServerQueryResult = target.fetchRowsFromDataServer( query, ScanQueryFrameProcessor::mappingFunction, Closer.create() - ); + ).get(); Assert.assertEquals(ImmutableList.of(SEGMENT_1, SEGMENT_2), dataServerQueryResult.getHandedOffSegments().getDescriptors()); Assert.assertTrue(dataServerQueryResult.getResultsYielders().isEmpty()); @@ -333,11 +347,12 @@ public void testServerNotFoundButHandoffShouldReturnWithStatus() @Test public void testQueryFail() { - SegmentDescriptor segmentDescriptorWithFullInterval = toSegmentDescriptorWithFullInterval(SEGMENT_1); + SegmentDescriptor segmentDescriptorWithFullInterval = + IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1); doAnswer(invocation -> { ResponseContext responseContext = invocation.getArgument(1); responseContext.addMissingSegments(ImmutableList.of(segmentDescriptorWithFullInterval)); - return Sequences.empty(); + return Futures.immediateFuture(Sequences.empty()); }).when(dataServerClient1).run(any(), any(), any(), any()); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1, segmentDescriptorWithFullInterval); diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java index b76263b52e90..abcff1308d6c 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java @@ -44,6 +44,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -134,7 +135,7 @@ public void run() logProcessorStatusString(processor, finished, allWritabilityFutures); if (!writabilityFuturesToWaitFor.isEmpty()) { - runProcessorAfterFutureResolves(Futures.allAsList(writabilityFuturesToWaitFor)); + runProcessorAfterFutureResolves(Futures.allAsList(writabilityFuturesToWaitFor), false); return; } @@ -150,13 +151,17 @@ public void run() if (result.isReturn()) { succeed(result.value()); + } else if (result.hasAwaitableFutures()) { + runProcessorAfterFutureResolves(Futures.allAsList(result.awaitableFutures()), true); } else { + assert result.hasAwaitableChannels(); + // Don't retain a reference to this set: it may be mutated the next time the processor runs. - final IntSet await = result.awaitSet(); + final IntSet await = result.awaitableChannels(); if (await.isEmpty()) { exec.execute(ExecutorRunnable.this); - } else if (result.isAwaitAll() || await.size() == 1) { + } else if (result.isAwaitAllChannels() || await.size() == 1) { final List> readabilityFutures = new ArrayList<>(); for (final int channelNumber : await) { @@ -169,11 +174,11 @@ public void run() if (readabilityFutures.isEmpty()) { exec.execute(ExecutorRunnable.this); } else { - runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures)); + runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures), false); } } else { // Await any. - runProcessorAfterFutureResolves(awaitAnyWidget.awaitAny(await)); + runProcessorAfterFutureResolves(awaitAnyWidget.awaitAny(await), false); } } } @@ -272,7 +277,17 @@ private Optional> runProcessorNow() } } - private void runProcessorAfterFutureResolves(final ListenableFuture future) + /** + * Schedule this processor to run after the provided future resolves. + * + * @param future the future + * @param failOnCancel whether the processor should be {@link #fail(Throwable)} if the future is itself canceled. + * This is true for futures provided by {@link ReturnOrAwait#awaitAllFutures(Collection)}, + * because the processor has declared it wants to wait for them; if they are canceled + * the processor must fail. It is false for other futures, which the processor was not + * directly waiting for. + */ + private void runProcessorAfterFutureResolves(final ListenableFuture future, final boolean failOnCancel) { final ListenableFuture cancelableFuture = registerCancelableFuture(future, false, cancellationId); @@ -294,8 +309,7 @@ public void onSuccess(final V ignored) @Override public void onFailure(Throwable t) { - // Ignore cancellation. - if (!cancelableFuture.isCancelled()) { + if (failOnCancel || !cancelableFuture.isCancelled()) { fail(t); } } diff --git a/processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java b/processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java index 4ca69b6cc498..c154b4258511 100644 --- a/processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java +++ b/processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java @@ -19,13 +19,14 @@ package org.apache.druid.frame.processor; +import com.google.common.util.concurrent.ListenableFuture; import it.unimi.dsi.fastutil.ints.IntSet; import it.unimi.dsi.fastutil.ints.IntSets; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import javax.annotation.Nullable; -import java.util.Objects; +import java.util.Collection; /** * Instances of this class are returned by {@link FrameProcessor#runIncrementally}, and are used by @@ -35,8 +36,8 @@ * In this case {@link #isReturn()} is true and {@link #value()} contains the result. * * An instance can also be an "await", which means that the {@link FrameProcessor} wants to be scheduled again - * in the future. In this case {@link #isAwait()} is true, {@link #awaitSet()} contains the set of input channels to - * wait for, and {@link #isAwaitAll()} is whether the processor wants to wait for all channels, or any channel. + * in the future. In this case {@link #isAwait()} is true, and *either* {@link #hasAwaitableChannels()} or + * {@link #hasAwaitableFutures()} is true. */ public class ReturnOrAwait { @@ -46,18 +47,31 @@ public class ReturnOrAwait private final T retVal; @Nullable - private final IntSet await; + private final IntSet awaitChannels; - private final boolean awaitAll; + private final boolean awaitAllChannels; - private ReturnOrAwait(@Nullable T retVal, @Nullable IntSet await, final boolean awaitAll) + @Nullable + private final Collection> awaitFutures; + + private ReturnOrAwait( + @Nullable T retVal, + @Nullable IntSet awaitChannels, + @Nullable Collection> awaitFutures, + final boolean awaitAllChannels + ) { this.retVal = retVal; - this.await = await; - this.awaitAll = awaitAll; + this.awaitChannels = awaitChannels; + this.awaitAllChannels = awaitAllChannels; + this.awaitFutures = awaitFutures; + + if (retVal != null && (awaitChannels != null || awaitFutures != null)) { + throw new IAE("Cannot have a value when await != null or futures != null"); + } - if (retVal != null && await != null) { - throw new IAE("Cannot have a value when await != null"); + if (awaitChannels != null && awaitFutures != null) { + throw new ISE("Cannot have both awaitChannels and awaitFutures"); } } @@ -66,7 +80,7 @@ private ReturnOrAwait(@Nullable T retVal, @Nullable IntSet await, final boolean */ public static ReturnOrAwait runAgain() { - return new ReturnOrAwait<>(null, IntSets.emptySet(), true); + return new ReturnOrAwait<>(null, IntSets.emptySet(), null, true); } /** @@ -78,7 +92,7 @@ public static ReturnOrAwait runAgain() */ public static ReturnOrAwait awaitAll(final IntSet await) { - return new ReturnOrAwait<>(null, await, true); + return new ReturnOrAwait<>(null, await, null, true); } /** @@ -86,7 +100,15 @@ public static ReturnOrAwait awaitAll(final IntSet await) */ public static ReturnOrAwait awaitAll(final int count) { - return new ReturnOrAwait<>(null, rangeSet(count), true); + return new ReturnOrAwait<>(null, rangeSet(count), null, true); + } + + /** + * Wait for all of the provided futures. + */ + public static ReturnOrAwait awaitAllFutures(final Collection> futures) + { + return new ReturnOrAwait<>(null, null, futures, true); } /** @@ -100,7 +122,7 @@ public static ReturnOrAwait awaitAll(final int count) */ public static ReturnOrAwait awaitAny(final IntSet await) { - return new ReturnOrAwait<>(null, await, false); + return new ReturnOrAwait<>(null, await, null, false); } /** @@ -108,7 +130,7 @@ public static ReturnOrAwait awaitAny(final IntSet await) */ public static ReturnOrAwait returnObject(final T o) { - return new ReturnOrAwait<>(o, null, false); + return new ReturnOrAwait<>(o, null, null, false); } /** @@ -129,13 +151,23 @@ public T value() * * Numbers in this set correspond to positions in the {@link FrameProcessor#inputChannels()} list. */ - public IntSet awaitSet() + public IntSet awaitableChannels() { - if (isReturn()) { + if (!hasAwaitableChannels()) { throw new ISE("No await set"); } - return await; + return awaitChannels; + } + + + public Collection> awaitableFutures() + { + if (!hasAwaitableFutures()) { + throw new ISE("No futures set"); + } + + return awaitFutures; } /** @@ -145,7 +177,7 @@ public IntSet awaitSet() */ public boolean isReturn() { - return await == null; + return awaitChannels == null && awaitFutures == null; } /** @@ -155,41 +187,46 @@ public boolean isReturn() */ public boolean isAwait() { - return await != null; + return !isReturn(); } /** - * Whether the processor wants to wait for all channels in {@link #awaitSet()} (true), or any channel (false) + * Whether the processor wants to wait for a set of futures. If true, {@link #awaitableFutures()} contains the + * set of futures to wait for. */ - public boolean isAwaitAll() + public boolean hasAwaitableFutures() { - return awaitAll; + return awaitFutures != null; } - @Override - public boolean equals(Object o) + /** + * Whether the processor wants to wait for a set of channels. If true, {@link #awaitableChannels()} contains the + * set of channels to wait for, and {@link #isAwaitAllChannels()}. + */ + public boolean hasAwaitableChannels() { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ReturnOrAwait that = (ReturnOrAwait) o; - return awaitAll == that.awaitAll && Objects.equals(retVal, that.retVal) && Objects.equals(await, that.await); + return awaitChannels != null; } - @Override - public int hashCode() + /** + * Whether the processor wants to wait for all channels in {@link #awaitableChannels()} (true), or any channel (false) + */ + public boolean isAwaitAllChannels() { - return Objects.hash(retVal, await, awaitAll); + if (!hasAwaitableChannels()) { + throw new ISE("No channels set"); + } + + return awaitAllChannels; } @Override public String toString() { - if (isAwait()) { - return "await=" + (awaitAll ? "all" : "any") + await; + if (hasAwaitableChannels()) { + return "await channels=" + (awaitAllChannels ? "all" : "any") + awaitChannels; + } else if (hasAwaitableFutures()) { + return "await futures=" + awaitFutures; } else { return "return=" + retVal; } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java index cf53abd0e5f4..60c2e1135b82 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java @@ -36,6 +36,7 @@ import org.apache.druid.frame.file.FrameFileWriter; import org.apache.druid.frame.processor.test.ChompingFrameProcessor; import org.apache.druid.frame.processor.test.FailingFrameProcessor; +import org.apache.druid.frame.processor.test.FutureWaitingProcessor; import org.apache.druid.frame.processor.test.InfiniteFrameProcessor; import org.apache.druid.frame.processor.test.SleepyFrameProcessor; import org.apache.druid.frame.processor.test.SuperBlasterFrameProcessor; @@ -414,6 +415,70 @@ public void test_runFully_nonexistentCancellationId() Assert.assertFalse(processor.didGetInterrupt()); Assert.assertFalse(processor.didCleanup()); } + + @Test + public void test_awaitAll_withFutures() throws Exception + { + // Test a processor that waits for futures to complete using ReturnOrAwait.awaitAll(Collection) + final SettableFuture future1 = SettableFuture.create(); + final SettableFuture future2 = SettableFuture.create(); + + // Start the processor + final FutureWaitingProcessor futureWaitingProcessor = new FutureWaitingProcessor(future1, future2); + final ListenableFuture> processorFuture = exec.runFully(futureWaitingProcessor, null); + + // Processor should be waiting for futures + Assert.assertFalse("Processor should be waiting", processorFuture.isDone()); + + // Complete the futures + future1.set("result1"); + future2.set("result2"); + + // Processor should complete now + Assert.assertEquals(List.of("result1", "result2"), processorFuture.get()); + Assert.assertTrue(futureWaitingProcessor.isCleanedUp()); + } + + @Test + public void test_awaitAll_withFutures_canceled() + { + // Test cancellation of a future that a processor is waiting for + final SettableFuture future1 = SettableFuture.create(); + final SettableFuture future2 = SettableFuture.create(); + + // Start the processor + final FutureWaitingProcessor futureWaitingProcessor = new FutureWaitingProcessor(future1, future2); + final ListenableFuture> processorFuture = exec.runFully(futureWaitingProcessor, null); + + // Resolve one, cancel one + future1.set("result1"); + future2.cancel(true); + + // Verify exception + final ExecutionException e = Assert.assertThrows(ExecutionException.class, processorFuture::get); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(CancellationException.class)); + } + + @Test + public void test_awaitAll_withFutures_error() + { + // Test errors on a future that a processor is waiting for + final SettableFuture future1 = SettableFuture.create(); + final SettableFuture future2 = SettableFuture.create(); + + // Start the processor + final FutureWaitingProcessor futureWaitingProcessor = new FutureWaitingProcessor(future1, future2); + final ListenableFuture> processorFuture = exec.runFully(futureWaitingProcessor, null); + + // Resolve one, fail out one + future1.set("result1"); + future2.setException(new RuntimeException("oops")); + + // Verify exception + final ExecutionException e = Assert.assertThrows(ExecutionException.class, processorFuture::get); + MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(RuntimeException.class)); + MatcherAssert.assertThat(e.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("oops"))); + } } public abstract static class BaseFrameProcessorExecutorTestSuite extends InitializedNullHandlingTest diff --git a/processing/src/test/java/org/apache/druid/frame/processor/ReturnOrAwaitTest.java b/processing/src/test/java/org/apache/druid/frame/processor/ReturnOrAwaitTest.java index 8377cf0639c6..0215b43f2c41 100644 --- a/processing/src/test/java/org/apache/druid/frame/processor/ReturnOrAwaitTest.java +++ b/processing/src/test/java/org/apache/druid/frame/processor/ReturnOrAwaitTest.java @@ -19,24 +19,27 @@ package org.apache.druid.frame.processor; +import com.google.common.util.concurrent.Futures; import it.unimi.dsi.fastutil.ints.IntSet; -import nl.jqno.equalsverifier.EqualsVerifier; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Test; +import java.util.Collections; + public class ReturnOrAwaitTest { @Test public void testToString() { - Assert.assertEquals("await=any{0, 1}", ReturnOrAwait.awaitAny(IntSet.of(0, 1)).toString()); - Assert.assertEquals("await=all{0, 1}", ReturnOrAwait.awaitAll(2).toString()); + Assert.assertEquals("await channels=any{0, 1}", ReturnOrAwait.awaitAny(IntSet.of(0, 1)).toString()); + Assert.assertEquals("await channels=all{0, 1}", ReturnOrAwait.awaitAll(2).toString()); Assert.assertEquals("return=xyzzy", ReturnOrAwait.returnObject("xyzzy").toString()); - } - @Test - public void testEquals() - { - EqualsVerifier.forClass(ReturnOrAwait.class).usingGetClass().verify(); + MatcherAssert.assertThat( + ReturnOrAwait.awaitAllFutures(Collections.singletonList(Futures.immediateFuture(1))).toString(), + CoreMatchers.startsWith("await futures=[com.google.") + ); } } diff --git a/processing/src/test/java/org/apache/druid/frame/processor/test/FutureWaitingProcessor.java b/processing/src/test/java/org/apache/druid/frame/processor/test/FutureWaitingProcessor.java new file mode 100644 index 000000000000..a55e257675fb --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/processor/test/FutureWaitingProcessor.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor.test; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.java.util.common.ISE; +import org.junit.Assert; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Processor that waits for two futures using {@link ReturnOrAwait#awaitAllFutures(Collection)}. + */ +public class FutureWaitingProcessor implements FrameProcessor> +{ + private final ListenableFuture future1; + private final ListenableFuture future2; + + public FutureWaitingProcessor(ListenableFuture future1, ListenableFuture future2) + { + this.future1 = future1; + this.future2 = future2; + } + + private int runCount = 0; + private boolean cleanedUp; + private final List results = new ArrayList<>(); + + @Override + public List inputChannels() + { + return Collections.emptyList(); + } + + @Override + public List outputChannels() + { + return Collections.emptyList(); + } + + @Override + public ReturnOrAwait> runIncrementally(IntSet readableInputs) + { + runCount++; + + if (runCount == 1) { + // First run: wait for both futures + return ReturnOrAwait.awaitAllFutures(ImmutableList.of(future1, future2)); + } else if (runCount == 2) { + // Second run: futures should be complete, collect results + Assert.assertTrue("future1 should be done", future1.isDone()); + Assert.assertTrue("future2 should be done", future2.isDone()); + + try { + results.add(future1.get()); + results.add(future2.get()); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + return ReturnOrAwait.returnObject(results); + } else { + throw new ISE("Should not run more than twice"); + } + } + + @Override + public void cleanup() + { + cleanedUp = true; + } + + public boolean isCleanedUp() + { + return cleanedUp; + } +} diff --git a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java index c75b646bc8f9..6a2021f8043e 100644 --- a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java +++ b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java @@ -26,16 +26,17 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.JsonParserIterator; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.http.client.response.StatusResponseHandler; -import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.query.Query; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.rpc.FixedServiceLocator; +import org.apache.druid.rpc.IgnoreHttpResponseHandler; import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.ServiceClient; import org.apache.druid.rpc.ServiceClientFactory; @@ -43,30 +44,27 @@ import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.utils.CloseableUtils; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.Duration; import java.io.InputStream; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; /** * Client to query data servers given a query. */ public class DataServerClient { - private static final String BASE_PATH = "/druid/v2/"; private static final Logger log = new Logger(DataServerClient.class); + private static final String BASE_PATH = "/druid/v2/"; + private static final Duration CANCELLATION_TIMEOUT = Duration.standardSeconds(5); + private final ServiceClient serviceClient; private final ObjectMapper objectMapper; private final ServiceLocation serviceLocation; - private final ScheduledExecutorService queryCancellationExecutor; public DataServerClient( ServiceClientFactory serviceClientFactory, ServiceLocation serviceLocation, - ObjectMapper objectMapper, - ScheduledExecutorService queryCancellationExecutor + ObjectMapper objectMapper ) { this.serviceClient = serviceClientFactory.makeClient( @@ -76,13 +74,23 @@ public DataServerClient( ); this.serviceLocation = serviceLocation; this.objectMapper = objectMapper; - this.queryCancellationExecutor = queryCancellationExecutor; } - public Sequence run(Query query, ResponseContext responseContext, JavaType queryResultType, Closer closer) + /** + * Issue a query. Returns a future that resolves when the server starts sending its response. + * + * @param query query to run + * @param responseContext response context to populate + * @param queryResultType type of result object + * @param closer closer; this call will register a query canceler with this closer + */ + public ListenableFuture> run( + final Query query, + final ResponseContext responseContext, + final JavaType queryResultType, + final Closer closer + ) { - final String cancelPath = BASE_PATH + query.getId(); - RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST, BASE_PATH); final boolean isSmile = objectMapper.getFactory() instanceof SmileFactory; if (isSmile) { @@ -114,64 +122,72 @@ public void onSuccess(InputStream result) public void onFailure(Throwable t) { if (resultStreamFuture.isCancelled()) { - cancelQuery(query, cancelPath); + cancelQuery(query.getId()); } } }, Execs.directExecutor() ); - return new BaseSequence<>( - new BaseSequence.IteratorMaker>() + return FutureUtils.transform( + resultStreamFuture, + resultStream -> new BaseSequence<>( + new BaseSequence.IteratorMaker>() + { + @Override + public JsonParserIterator make() + { + return new JsonParserIterator<>( + queryResultType, + Futures.immediateFuture(resultStream), + BASE_PATH, + query, + serviceLocation.getHost(), + objectMapper + ); + } + + @Override + public void cleanup(JsonParserIterator iterFromMake) + { + CloseableUtils.closeAndWrapExceptions(iterFromMake); + } + } + ) + ); + } + + private void cancelQuery(final String queryId) + { + if (queryId == null) { + throw DruidException.defensive("Null queryId"); + } + + final String cancelPath = BASE_PATH + queryId; + + final ListenableFuture cancelFuture = serviceClient.asyncRequest( + new RequestBuilder(HttpMethod.DELETE, cancelPath).timeout(CANCELLATION_TIMEOUT), + IgnoreHttpResponseHandler.INSTANCE + ); + + Futures.addCallback( + cancelFuture, + new FutureCallback<>() { @Override - public JsonParserIterator make() + public void onSuccess(final Void result) { - return new JsonParserIterator<>( - queryResultType, - resultStreamFuture, - BASE_PATH, - query, - serviceLocation.getHost(), - objectMapper - ); + // Do nothing on successful cancellation. } @Override - public void cleanup(JsonParserIterator iterFromMake) + public void onFailure(final Throwable t) { - CloseableUtils.closeAndWrapExceptions(iterFromMake); + log.noStackTrace() + .warn(t, "Failed to cancel query[%s] on server[%s]", queryId, serviceLocation.getHostAndPort()); } - } + }, + Execs.directExecutor() ); } - - private void cancelQuery(Query query, String cancelPath) - { - Runnable cancelRunnable = () -> { - Future cancelFuture = serviceClient.asyncRequest( - new RequestBuilder(HttpMethod.DELETE, cancelPath), - StatusResponseHandler.getInstance()); - - Runnable checkRunnable = () -> { - try { - if (!cancelFuture.isDone()) { - log.error("Error cancelling query[%s]", query); - } - StatusResponseHolder response = cancelFuture.get(); - if (response.getStatus().getCode() >= 500) { - log.error("Error cancelling query[%s]: queryable node returned status[%d] [%s].", - query, - response.getStatus().getCode(), - response.getStatus().getReasonPhrase()); - } - } - catch (ExecutionException | InterruptedException e) { - log.error(e, "Error cancelling query[%s]", query); - } - }; - queryCancellationExecutor.schedule(checkRunnable, 5, TimeUnit.SECONDS); - }; - queryCancellationExecutor.submit(cancelRunnable); - } } diff --git a/server/src/main/java/org/apache/druid/rpc/ServiceLocation.java b/server/src/main/java/org/apache/druid/rpc/ServiceLocation.java index 974f09fe89bb..552e0d6e9ba3 100644 --- a/server/src/main/java/org/apache/druid/rpc/ServiceLocation.java +++ b/server/src/main/java/org/apache/druid/rpc/ServiceLocation.java @@ -166,6 +166,18 @@ public String getHost() return host; } + /** + * Returns a host:port string for the preferred port (TLS if available; plaintext otherwise). + */ + public String getHostAndPort() + { + if (tlsPort > 0) { + return host + ":" + tlsPort; + } else { + return host + ":" + plaintextPort; + } + } + public int getPlaintextPort() { return plaintextPort; diff --git a/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java b/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java index 7afdc948a7a2..95ad46a580cc 100644 --- a/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java +++ b/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.QueryTimeoutException; @@ -51,6 +50,7 @@ import javax.ws.rs.core.MediaType; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutionException; import static org.apache.druid.query.Druids.newScanQueryBuilder; import static org.mockito.Mockito.mock; @@ -81,13 +81,12 @@ public void setUp() target = new DataServerClient( serviceClientFactory, mock(ServiceLocation.class), - jsonMapper, - Execs.scheduledSingleThreaded("query-cancellation-executor") + jsonMapper ); } @Test - public void testFetchSegmentFromDataServer() throws JsonProcessingException + public void testFetchSegmentFromDataServer() throws JsonProcessingException, ExecutionException, InterruptedException { ScanResultValue scanResultValue = new ScanResultValue( null, @@ -112,7 +111,7 @@ public void testFetchSegmentFromDataServer() throws JsonProcessingException responseContext, jsonMapper.getTypeFactory().constructType(ScanResultValue.class), Closer.create() - ); + ).get(); Assert.assertEquals(ImmutableList.of(scanResultValue), result.toList()); } @@ -170,7 +169,7 @@ public void testQueryFailure() throws JsonProcessingException responseContext, jsonMapper.getTypeFactory().constructType(ScanResultValue.class), Closer.create() - ).toList() + ).get().toList() ); } diff --git a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java index eb5b16118757..1dddc61c1e11 100644 --- a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java +++ b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java @@ -268,6 +268,7 @@ public void start() throws Exception resource.onStarted(this); } catch (Exception e) { + log.warn(e, "Failed to start resource[%s]. Stopping cluster.", resource); // Clean up the resources that have already been started stop(); throw e;