diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index 0a277949fdda..35fb7fe3c4e0 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -66,7 +66,7 @@ See [SQL query context](sql-query-context.md) for other query context parameters |`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:
- Log the stack trace of the exception (if any) produced by the query | |`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. | |`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.| -|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `preferClones`. `excludeClones` means that clone Historicals are not queried by the broker. `preferClones` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones. Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries. MSQ does not query Historicals directly, and Dart will not respect this context parameter.| +|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `preferClones`. `excludeClones` means that clone Historicals are not queried by the broker. `preferClones` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones. Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries. MSQ does not query Historicals directly.| ## Parameters by query type diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java index a76f15d6b60c..39464406cefb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java @@ -34,6 +34,7 @@ import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.ControllerMemoryParameters; import org.apache.druid.msq.exec.MemoryIntrospector; +import org.apache.druid.msq.exec.SegmentSource; import org.apache.druid.msq.exec.WorkerFailureListener; import org.apache.druid.msq.exec.WorkerManager; import org.apache.druid.msq.indexing.IndexerControllerContext; @@ -77,12 +78,15 @@ public class DartControllerContext implements ControllerContext */ public static final int DEFAULT_MAX_NON_LEAF_WORKER_COUNT = 1; + public static final SegmentSource DEFAULT_SEGMENT_SOURCE = SegmentSource.REALTIME; + private final Injector injector; private final ObjectMapper jsonMapper; private final DruidNode selfNode; private final DartWorkerClient workerClient; private final TimelineServerView serverView; private final MemoryIntrospector memoryIntrospector; + private final QueryContext context; private final ServiceMetricEvent.Builder metricBuilder; private final ServiceEmitter emitter; @@ -93,7 +97,8 @@ public DartControllerContext( final DartWorkerClient workerClient, final MemoryIntrospector memoryIntrospector, final TimelineServerView serverView, - final ServiceEmitter emitter + final ServiceEmitter emitter, + final QueryContext context ) { this.injector = injector; @@ -102,6 +107,7 @@ public DartControllerContext( this.workerClient = workerClient; this.serverView = serverView; this.memoryIntrospector = memoryIntrospector; + this.context = context; this.metricBuilder = new ServiceMetricEvent.Builder(); this.emitter = emitter; } @@ -180,7 +186,7 @@ public DruidNode selfNode() @Override public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager) { - return DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(), serverView); + return DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(), serverView, context); } @Override diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactory.java index f58eb4bfa68d..796c52bb54ba 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactory.java @@ -21,11 +21,12 @@ import org.apache.druid.msq.dart.controller.sql.DartQueryMaker; import org.apache.druid.msq.exec.ControllerContext; +import org.apache.druid.query.QueryContext; /** * Class for creating {@link ControllerContext} in {@link DartQueryMaker}. */ public interface DartControllerContextFactory { - ControllerContext newContext(String queryId); + ControllerContext newContext(QueryContext queryContext); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java index 113714aa9b92..87582d977aa2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java @@ -31,6 +31,8 @@ import org.apache.druid.msq.dart.worker.DartWorkerClientImpl; import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.MemoryIntrospector; +import org.apache.druid.query.QueryContext; +import org.apache.druid.query.QueryContexts; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.server.DruidNode; @@ -68,8 +70,9 @@ public DartControllerContextFactoryImpl( } @Override - public ControllerContext newContext(final String queryId) + public ControllerContext newContext(final QueryContext context) { + final String queryId = context.getString(QueryContexts.CTX_DART_QUERY_ID); return new DartControllerContext( injector, jsonMapper, @@ -77,7 +80,8 @@ public ControllerContext newContext(final String queryId) new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, selfNode.getHostAndPortToUse()), memoryIntrospector, serverView, - emitter + emitter, + context ); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartSegmentAssignment.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartSegmentAssignment.java new file mode 100644 index 000000000000..a9a2f7b3a96e --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartSegmentAssignment.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.dart.controller; + +import org.apache.druid.msq.dart.worker.DartQueryableSegment; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; + +import java.util.ArrayList; +import java.util.List; + +/** + * Represents the set of segments assigned to a particular dart worker, used by {@link DartTableInputSpecSlicer}. + */ +public class DartSegmentAssignment +{ + private final List dartQueryableSegments; + private final List dataServerRequestDescriptor; + + public DartSegmentAssignment( + List dartQueryableSegments, + List dataServerRequestDescriptor + ) + { + this.dartQueryableSegments = dartQueryableSegments; + this.dataServerRequestDescriptor = dataServerRequestDescriptor; + } + + public static DartSegmentAssignment empty() + { + return new DartSegmentAssignment(new ArrayList<>(), new ArrayList<>()); + } + + public void addSegments(DartQueryableSegment segment) + { + dartQueryableSegments.add(segment); + } + + public void addRequest(DataServerRequestDescriptor requestDescriptor) + { + dataServerRequestDescriptor.add(requestDescriptor); + } + + public List getDartQueryableSegments() + { + return dartQueryableSegments; + } + + public List getDataServerRequestDescriptor() + { + return dataServerRequestDescriptor; + } + + public boolean isEmpty() + { + return dataServerRequestDescriptor.isEmpty() && dartQueryableSegments.isEmpty(); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java index 00813d0f2fb7..4b7ea2ed8eeb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java @@ -20,7 +20,6 @@ package org.apache.druid.msq.dart.controller; import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.druid.client.QueryableDruidServer; @@ -36,22 +35,29 @@ import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.NilInputSlice; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; +import org.apache.druid.msq.input.table.DataServerSelector; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.CloneQueryMode; +import org.apache.druid.query.QueryContext; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.TimelineLookup; +import org.joda.time.Interval; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.function.ToIntFunction; +import java.util.stream.Collectors; /** * Slices {@link TableInputSpec} into {@link SegmentsInputSlice} for persistent servers using @@ -71,15 +77,32 @@ public class DartTableInputSpecSlicer implements InputSpecSlicer */ private final TimelineServerView serverView; - DartTableInputSpecSlicer(final Object2IntMap workerIdToNumber, final TimelineServerView serverView) + /** + * Determines the kind of tasks that should be queried. + */ + private final SegmentSource segmentSource; + + /** + * Determines if cloning historicals should be queried. + */ + private final CloneQueryMode cloneQueryMode; + + DartTableInputSpecSlicer( + final Object2IntMap workerIdToNumber, + final TimelineServerView serverView, + final QueryContext queryContext + ) { this.workerIdToNumber = workerIdToNumber; this.serverView = serverView; + this.segmentSource = MultiStageQueryContext.getSegmentSources(queryContext, DartControllerContext.DEFAULT_SEGMENT_SOURCE); + this.cloneQueryMode = queryContext.getCloneQueryMode(); } public static DartTableInputSpecSlicer createFromWorkerIds( final List workerIds, - final TimelineServerView serverView + final TimelineServerView serverView, + final QueryContext queryContext ) { final Object2IntMap reverseWorkers = new Object2IntOpenHashMap<>(); @@ -89,7 +112,7 @@ public static DartTableInputSpecSlicer createFromWorkerIds( reverseWorkers.put(WorkerId.fromString(workerIds.get(i)).getHostAndPort(), i); } - return new DartTableInputSpecSlicer(reverseWorkers, serverView); + return new DartTableInputSpecSlicer(reverseWorkers, serverView, queryContext); } @Override @@ -116,14 +139,20 @@ public List sliceStatic(final InputSpec inputSpec, final int maxNumS serverSelector -> findWorkerForServerSelector(serverSelector, maxNumSlices) ); - final List> assignments = new ArrayList<>(maxNumSlices); + final List assignments = new ArrayList<>(maxNumSlices); while (assignments.size() < maxNumSlices) { - assignments.add(null); + assignments.add(DartSegmentAssignment.empty()); } int nextRoundRobinWorker = 0; + final Map> serverRequestMap = new HashMap<>(); for (final DartQueryableSegment segment : prunedSegments) { final int worker; + DruidServerMetadata realtimeServer = segment.getRealtimeServer(); + if (realtimeServer != null) { + serverRequestMap.computeIfAbsent(realtimeServer, s -> new ArrayList<>()).add(segment); + continue; + } if (segment.getWorkerNumber() == UNKNOWN) { // Segment is not available on any worker. Assign to some worker, round-robin. Today, that server will throw // an error about the segment not being findable, but perhaps one day, it will be able to load the segment @@ -134,11 +163,18 @@ public List sliceStatic(final InputSpec inputSpec, final int maxNumS worker = segment.getWorkerNumber(); } - if (assignments.get(worker) == null) { - assignments.set(worker, new ArrayList<>()); - } + assignments.get(worker).addSegments(segment); + } - assignments.get(worker).add(segment); + for (Map.Entry> entry : serverRequestMap.entrySet()) { + final int worker; + DruidServerMetadata server = entry.getKey(); + worker = nextRoundRobinWorker; + nextRoundRobinWorker = (nextRoundRobinWorker + 1) % maxNumSlices; + List descriptors = serverRequestMap.get(server).stream() + .map(DartTableInputSpecSlicer::toRichSegmentDescriptor) + .collect(Collectors.toList()); + assignments.get(worker).addRequest(new DataServerRequestDescriptor(server, descriptors)); } return makeSegmentSlices(tableInputSpec.getDataSource(), assignments); @@ -164,7 +200,7 @@ public List sliceDynamic( int findWorkerForServerSelector(final ServerSelector serverSelector, final int maxNumSlices) { // Currently, Dart does not support clone query modes, all servers can be queried. - final QueryableDruidServer server = serverSelector.pick(null, CloneQueryMode.INCLUDECLONES); + final QueryableDruidServer server = serverSelector.pick(null, cloneQueryMode); if (server == null) { return UNKNOWN; @@ -186,7 +222,7 @@ int findWorkerForServerSelector(final ServerSelector serverSelector, final int m * Pull the list of {@link DataSegment} that we should query, along with a clipping interval for each one, and * a worker to get it from. */ - static Set findQueryableDataSegments( + private Set findQueryableDataSegments( final TableInputSpec tableInputSpec, final TimelineLookup timeline, final ToIntFunction toWorkersFunction @@ -202,9 +238,7 @@ static Set findQueryableDataSegments( .filter(chunk -> shouldIncludeSegment(chunk.getObject())) .transform(chunk -> { final ServerSelector serverSelector = chunk.getObject(); - final DataSegment dataSegment = serverSelector.getSegment(); - final int worker = toWorkersFunction.applyAsInt(serverSelector); - return new DartQueryableSegment(dataSegment, holder.getInterval(), worker); + return toDartQueryableSegment(serverSelector, holder.getInterval(), toWorkersFunction); }) .filter(segment -> !segment.getSegment().isTombstone()) ); @@ -218,6 +252,27 @@ static Set findQueryableDataSegments( ); } + private DartQueryableSegment toDartQueryableSegment( + ServerSelector serverSelector, + Interval interval, + ToIntFunction toWorkersFunction + ) + { + final DataSegment dataSegment = serverSelector.getSegment(); + if (serverSelector.isRealtimeSegment()) { + final Set servers = + serverSelector.getAllServers(cloneQueryMode) + .stream() + .filter(druidServerMetadata -> segmentSource.getUsedServerTypes() + .contains(druidServerMetadata.getType())) + .collect(Collectors.toSet()); + return new DartQueryableSegment(dataSegment, interval, -1, DataServerSelector.RANDOM.getSelectServerFunction().apply(servers)); + } else { + final int worker = toWorkersFunction.applyAsInt(serverSelector); + return new DartQueryableSegment(dataSegment, interval, worker, null); + } + } + /** * Create a list of {@link SegmentsInputSlice} and {@link NilInputSlice} assignments. * @@ -228,27 +283,26 @@ static Set findQueryableDataSegments( * * @throws IllegalStateException if any provided segments do not match the provided datasource */ - static List makeSegmentSlices( + private List makeSegmentSlices( final String dataSource, - final List> assignments + final List assignments ) { final List retVal = new ArrayList<>(assignments.size()); - for (final List assignment : assignments) { + for (final DartSegmentAssignment assignment : assignments) { if (assignment == null || assignment.isEmpty()) { retVal.add(NilInputSlice.INSTANCE); } else { final List descriptors = new ArrayList<>(); - for (final DartQueryableSegment segment : assignment) { + for (DartQueryableSegment segment : assignment.getDartQueryableSegments()) { if (!dataSource.equals(segment.getSegment().getDataSource())) { throw new ISE("Expected dataSource[%s] but got[%s]", dataSource, segment.getSegment().getDataSource()); } - descriptors.add(toRichSegmentDescriptor(segment)); } - - retVal.add(new SegmentsInputSlice(dataSource, descriptors, ImmutableList.of())); + final List queryableDruidServers = assignment.getDataServerRequestDescriptor(); + retVal.add(new SegmentsInputSlice(dataSource, descriptors, queryableDruidServers)); } } @@ -269,27 +323,17 @@ static RichSegmentDescriptor toRichSegmentDescriptor(final DartQueryableSegment } /** - * Whether to include a segment from the timeline. Segments are included if they are not tombstones, and are also not - * purely realtime segments. + * Whether to include a segment from the timeline. Segments are included if they are not tombstones, and for realtime + * segments, are only included based on the segmentSource. */ - static boolean shouldIncludeSegment(final ServerSelector serverSelector) + private boolean shouldIncludeSegment(final ServerSelector serverSelector) { if (serverSelector.getSegment().isTombstone()) { return false; } - - int numRealtimeServers = 0; - int numOtherServers = 0; - - // Currently, Dart does not support clone query modes, all servers can be queried. - for (final DruidServerMetadata server : serverSelector.getAllServers(CloneQueryMode.INCLUDECLONES)) { - if (SegmentSource.REALTIME.getUsedServerTypes().contains(server.getType())) { - numRealtimeServers++; - } else { - numOtherServers++; - } + if (serverSelector.isRealtimeSegment()) { + return SegmentSource.shouldQueryRealtimeServers(segmentSource); } - - return numOtherServers > 0 || (numOtherServers + numRealtimeServers == 0); + return true; } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java index a40c46f52ff5..080b93034fbb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java @@ -172,33 +172,29 @@ public static ResultsContext makeResultsContext(DruidQuery druidQuery, List runLegacyMSQSpec(LegacyMSQSpec querySpec, QueryContext context, ResultsContext resultsContext) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java index 130a639396e5..7f2f928e6f08 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java @@ -57,6 +57,7 @@ public class DartFrameContext implements FrameContext private final ResourceHolder processingBuffers; private final WorkerMemoryParameters memoryParameters; private final WorkerStorageParameters storageParameters; + private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory; public DartFrameContext( final StageId stageId, @@ -66,7 +67,8 @@ public DartFrameContext( final DataSegmentProvider dataSegmentProvider, @Nullable ResourceHolder processingBuffers, final WorkerMemoryParameters memoryParameters, - final WorkerStorageParameters storageParameters + final WorkerStorageParameters storageParameters, + final DataServerQueryHandlerFactory dataServerQueryHandlerFactory ) { this.stageId = stageId; @@ -77,6 +79,7 @@ public DartFrameContext( this.processingBuffers = processingBuffers; this.memoryParameters = memoryParameters; this.storageParameters = storageParameters; + this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory; } @Override @@ -170,9 +173,7 @@ public WorkerStorageParameters storageParameters() @Override public DataServerQueryHandlerFactory dataServerQueryHandlerFactory() { - // We don't query data servers. This factory won't actually be used, because Dart doesn't allow segmentSource to be - // overridden; it always uses SegmentSource.NONE. (If it is called, some wires got crossed somewhere.) - return null; + return dataServerQueryHandlerFactory; } @Override diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartQueryableSegment.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartQueryableSegment.java index 574601517b44..20cccc55b98e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartQueryableSegment.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartQueryableSegment.java @@ -20,9 +20,11 @@ package org.apache.druid.msq.dart.worker; import com.google.common.base.Preconditions; +import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.Objects; /** @@ -33,12 +35,20 @@ public class DartQueryableSegment private final DataSegment segment; private final Interval interval; private final int workerNumber; + @Nullable + private final DruidServerMetadata realtimeServer; - public DartQueryableSegment(final DataSegment segment, final Interval interval, final int workerNumber) + public DartQueryableSegment( + final DataSegment segment, + final Interval interval, + final int workerNumber, + @Nullable DruidServerMetadata realtimeServer + ) { this.segment = Preconditions.checkNotNull(segment, "segment"); this.interval = Preconditions.checkNotNull(interval, "interval"); this.workerNumber = workerNumber; + this.realtimeServer = realtimeServer; } public DataSegment getSegment() @@ -56,6 +66,12 @@ public int getWorkerNumber() return workerNumber; } + @Nullable + public DruidServerMetadata getRealtimeServer() + { + return realtimeServer; + } + @Override public boolean equals(Object o) { @@ -68,22 +84,15 @@ public boolean equals(Object o) DartQueryableSegment that = (DartQueryableSegment) o; return workerNumber == that.workerNumber && Objects.equals(segment, that.segment) - && Objects.equals(interval, that.interval); + && Objects.equals(interval, that.interval) + && Objects.equals(realtimeServer, that.realtimeServer); } @Override public int hashCode() { - return Objects.hash(segment, interval, workerNumber); + return Objects.hash(segment, interval, workerNumber, realtimeServer); } - @Override - public String toString() - { - return "QueryableDataSegment{" + - "segment=" + segment + - ", interval=" + interval + - ", workerNumber=" + workerNumber + - '}'; - } + } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java index 530a4ec3e2e6..947b85c9efbd 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java @@ -80,6 +80,7 @@ public class DartWorkerContext implements WorkerContext */ @MonotonicNonNull private volatile ResourceHolder processingBuffersSet; + private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory; DartWorkerContext( final String queryId, @@ -97,11 +98,13 @@ public class DartWorkerContext implements WorkerContext final ProcessingBuffersProvider processingBuffersProvider, final Outbox outbox, final File tempDir, - final QueryContext queryContext + final QueryContext queryContext, + final DataServerQueryHandlerFactory dataServerQueryHandlerFactory ) { this.queryId = queryId; this.controllerHost = controllerHost; + this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory; this.workerId = WorkerId.fromDruidNode(selfNode, queryId); this.selfNode = selfNode; this.jsonMapper = jsonMapper; @@ -223,7 +226,8 @@ public FrameContext frameContext(WorkOrder workOrder) dataSegmentProvider, processingBuffersSet.get().acquireForStage(workOrder.getStageDefinition()), memoryParameters, - storageParameters + storageParameters, + dataServerQueryHandlerFactory ); } @@ -236,9 +240,7 @@ public int threadCount() @Override public DataServerQueryHandlerFactory dataServerQueryHandlerFactory() { - // We don't query data servers. Return null so this factory is ignored when the main worker code tries - // to close it. - return null; + return dataServerQueryHandlerFactory; } @Override diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerFactoryImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerFactoryImpl.java index 06b9226bc37e..ad6328234e1e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerFactoryImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerFactoryImpl.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.google.inject.Injector; +import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Self; @@ -29,6 +30,7 @@ import org.apache.druid.messages.server.Outbox; import org.apache.druid.msq.dart.Dart; import org.apache.druid.msq.dart.controller.messages.ControllerMessage; +import org.apache.druid.msq.exec.DataServerQueryHandlerFactory; import org.apache.druid.msq.exec.MemoryIntrospector; import org.apache.druid.msq.exec.ProcessingBuffersProvider; import org.apache.druid.msq.exec.Worker; @@ -37,6 +39,7 @@ import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.QueryContext; +import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.policy.PolicyEnforcer; import org.apache.druid.rpc.ServiceClientFactory; @@ -63,6 +66,8 @@ public class DartWorkerFactoryImpl implements DartWorkerFactory private final MemoryIntrospector memoryIntrospector; private final ProcessingBuffersProvider processingBuffersProvider; private final Outbox outbox; + private final CoordinatorClient coordinatorClient; + private final QueryToolChestWarehouse warehouse; @Inject public DartWorkerFactoryImpl( @@ -78,7 +83,9 @@ public DartWorkerFactoryImpl( @Dart DataSegmentProvider dataSegmentProvider, MemoryIntrospector memoryIntrospector, @Dart ProcessingBuffersProvider processingBuffersProvider, - Outbox outbox + Outbox outbox, + CoordinatorClient coordinatorClient, + QueryToolChestWarehouse warehouse ) { this.selfNode = selfNode; @@ -94,6 +101,8 @@ public DartWorkerFactoryImpl( this.memoryIntrospector = memoryIntrospector; this.processingBuffersProvider = processingBuffersProvider; this.outbox = outbox; + this.coordinatorClient = coordinatorClient; + this.warehouse = warehouse; } @Override @@ -115,7 +124,13 @@ public Worker build(String queryId, String controllerHost, File tempDir, QueryCo processingBuffersProvider, outbox, tempDir, - queryContext + queryContext, + new DataServerQueryHandlerFactory( + coordinatorClient, + serviceClientFactory, + jsonMapper, + warehouse + ) ); return new WorkerImpl(null, workerContext); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index edf4419b0963..3732dbcc0104 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -381,7 +381,7 @@ private MSQTaskReportPayload runInternal(final QueryListener queryListener, fina // Planning-related: convert the native query from MSQSpec into a multi-stage QueryDefinition. this.queryStartTime = DateTimes.nowUtc(); context.registerController(this, closer); - queryDef = initializeQueryDefAndState(closer); + queryDef = initializeQueryDefAndState(); this.netClient = closer.register(new ExceptionWrappingWorkerClient(context.newWorkerClient())); this.workerSketchFetcher = new WorkerSketchFetcher( @@ -657,7 +657,7 @@ public static void ensureExportLocationEmpty(final ControllerContext context, fi } } - private QueryDefinition initializeQueryDefAndState(final Closer closer) + private QueryDefinition initializeQueryDefAndState() { this.selfDruidNode = context.selfNode(); this.queryKernelConfig = context.queryKernelConfig(queryId, querySpec); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java index 920bceb952b3..04458d701705 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java @@ -39,7 +39,6 @@ import org.apache.druid.msq.input.table.DataServerRequestDescriptor; import org.apache.druid.msq.input.table.DataServerSelector; import org.apache.druid.msq.input.table.RichSegmentDescriptor; -import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.Queries; import org.apache.druid.query.Query; import org.apache.druid.query.QueryInterruptedException; @@ -183,9 +182,7 @@ public DataServerQueryResult fetchRowsFromDataServ } pendingRequests = createNextPendingRequests( - missingRichSegmentDescriptors, - MultiStageQueryContext.getSegmentSources(query.context()), - DataServerSelector.RANDOM + missingRichSegmentDescriptors ); if (!pendingRequests.isEmpty()) { @@ -270,9 +267,7 @@ private Yielder createYielder( } private List createNextPendingRequests( - final Set richSegmentDescriptors, - final SegmentSource includeSegmentSource, - final DataServerSelector dataServerSelector + final Set richSegmentDescriptors ) { final Map> serverVsSegmentsMap = new HashMap<>(); @@ -300,16 +295,17 @@ private List createNextPendingRequests( if (segmentLoadInfo.getSegment().toDescriptor().equals(segmentDescriptorWithFullInterval)) { Set servers = segmentLoadInfo.getServers() .stream() - .filter(druidServerMetadata -> includeSegmentSource.getUsedServerTypes() - .contains(druidServerMetadata.getType())) + .filter(druidServerMetadata -> SegmentSource.REALTIME.getUsedServerTypes() + .contains(druidServerMetadata.getType())) .collect(Collectors.toSet()); if (servers.isEmpty()) { throw DruidException.forPersona(DruidException.Persona.OPERATOR) .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Could not find a server matching includeSegmentSource[%s] for segment[%s]. Only found servers [%s]", includeSegmentSource, richSegmentDescriptor, servers); + .build("Could not find a server matching includeSegmentSource[%s] for segment[%s]. Only found servers [%s]", + SegmentSource.REALTIME, richSegmentDescriptor, servers); } - DruidServerMetadata druidServerMetadata = dataServerSelector.getSelectServerFunction().apply(servers); + DruidServerMetadata druidServerMetadata = DataServerSelector.RANDOM.getSelectServerFunction().apply(servers); serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new HashSet<>()); SegmentDescriptor descriptor = segmentLoadInfo.getSegment().toDescriptor(); serverVsSegmentsMap.get(druidServerMetadata) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentSource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentSource.java index 22f3a5df973c..54ac532cc248 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentSource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentSource.java @@ -25,22 +25,23 @@ import java.util.Set; /** - * Decides the types of data servers contacted by MSQ tasks to fetch results. + * Decides the types of data servers contacted by MSQ queries to fetch results. */ public enum SegmentSource { /** - * Include only segments from deep storage. + * Do not include any other segments. */ NONE(ImmutableSet.of()), + /** - * Include segments from realtime tasks as well as segments from deep storage. + * Include segments from realtime tasks as well. */ REALTIME(ImmutableSet.of(ServerType.REALTIME, ServerType.INDEXER_EXECUTOR)); /** - * The type of dataservers (if any) to include. This does not include segments queried from deep storage, which are - * always included in queries. + * The type of dataservers (if any) to include. This does not include segments queried from deep storage, for MSQ Task + * and segments loaded on historicals for MSQ Dart. */ private final Set usedServerTypes; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java index 50cbe781dc7b..c7aa6c1c1a06 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java @@ -101,6 +101,9 @@ public interface WorkerContext */ DruidNode selfNode(); + /** + * Returns the factory for {@link DataServerQueryHandler} from the context. Used to query realtime tasks. + */ DataServerQueryHandlerFactory dataServerQueryHandlerFactory(); /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 1e55bf15f202..667335ad8679 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -72,6 +72,7 @@ public class IndexerControllerContext implements ControllerContext { public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1; + public static final SegmentSource DEFAULT_SEGMENT_SOURCE = SegmentSource.NONE; private static final Logger log = new Logger(IndexerControllerContext.class); @@ -172,7 +173,7 @@ public DruidNode selfNode() public InputSpecSlicer newTableInputSpecSlicer(final WorkerManager workerManager) { final SegmentSource includeSegmentSource = - MultiStageQueryContext.getSegmentSources(taskQuerySpecContext); + MultiStageQueryContext.getSegmentSources(taskQuerySpecContext, DEFAULT_SEGMENT_SOURCE); return new IndexerTableInputSpecSlicer( toolbox.getCoordinatorClient(), toolbox.getTaskActionClient(), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java index 2e3823b3af48..d76264621d51 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java @@ -52,7 +52,7 @@ public QueryKitSpec makeQueryKitSpec( return new QueryKitSpec( queryKit, queryId, - getNumHistoricals(), + getNumWorkers(), queryContext.getInt( DartControllerContext.CTX_MAX_NON_LEAF_WORKER_COUNT, DartControllerContext.DEFAULT_MAX_NON_LEAF_WORKER_COUNT @@ -64,7 +64,7 @@ public QueryKitSpec makeQueryKitSpec( ); } - private int getNumHistoricals() + private int getNumWorkers() { int cnt = 0; for (DruidServerMetadata s : serverView.getDruidServerMetadatas()) { @@ -72,6 +72,8 @@ private int getNumHistoricals() cnt++; } } - return cnt; + + // Even if all segments are realtime, launch at least one worker. + return Math.max(1, cnt); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java index 838c438cbf47..2e9e67a7534e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java @@ -24,6 +24,7 @@ import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.msq.exec.SegmentSource; +import org.apache.druid.msq.indexing.IndexerControllerContext; import org.apache.druid.msq.indexing.MSQControllerTask; import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.msq.indexing.destination.MSQDestination; @@ -100,7 +101,7 @@ public static void validateContextSortOrderColumnsExist( */ public static void validateRealtimeReindex(QueryContext context, MSQDestination destination, Query query) { - final SegmentSource segmentSources = MultiStageQueryContext.getSegmentSources(context); + final SegmentSource segmentSources = MultiStageQueryContext.getSegmentSources(context, IndexerControllerContext.DEFAULT_SEGMENT_SOURCE); if (MSQControllerTask.isReplaceInputDataSourceTask(query, destination) && SegmentSource.REALTIME.equals(segmentSources)) { throw DruidException.forPersona(DruidException.Persona.USER) .ofCategory(DruidException.Category.INVALID_INPUT) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 93d79a7b3f14..3ced82b1756d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -122,7 +122,6 @@ public class MultiStageQueryContext private static final boolean DEFAULT_FINALIZE_AGGREGATIONS = true; public static final String CTX_INCLUDE_SEGMENT_SOURCE = "includeSegmentSource"; - public static final SegmentSource DEFAULT_INCLUDE_SEGMENT_SOURCE = SegmentSource.NONE; public static final String CTX_MAX_CONCURRENT_STAGES = "maxConcurrentStages"; public static final String CTX_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage"; @@ -320,12 +319,12 @@ public static boolean isFinalizeAggregations(final QueryContext queryContext) ); } - public static SegmentSource getSegmentSources(final QueryContext queryContext) + public static SegmentSource getSegmentSources(final QueryContext queryContext, final SegmentSource defaultSource) { return queryContext.getEnum( CTX_INCLUDE_SEGMENT_SOURCE, SegmentSource.class, - DEFAULT_INCLUDE_SEGMENT_SOURCE + defaultSource ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java index da23ead30f82..be90b20bae9e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java @@ -101,7 +101,7 @@ public void tearDown() throws Exception public void test_queryKernelConfig() { final DartControllerContext controllerContext = - new DartControllerContext(null, null, SELF_NODE, null, memoryIntrospector, serverView, null); + new DartControllerContext(null, null, SELF_NODE, null, memoryIntrospector, serverView, null, null); final ControllerQueryKernelConfig queryKernelConfig = controllerContext.queryKernelConfig(QUERY_ID, querySpec); Assertions.assertFalse(queryKernelConfig.isFaultTolerant()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java index a65c1252c6eb..528b2facfa25 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java @@ -34,11 +34,15 @@ import org.apache.druid.data.input.StringTuple; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.msq.dart.worker.WorkerId; +import org.apache.druid.msq.exec.SegmentSource; import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.input.NilInputSlice; +import org.apache.druid.msq.input.table.DataServerRequestDescriptor; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.filter.EqualityFilter; import org.apache.druid.segment.column.ColumnType; @@ -157,7 +161,7 @@ public class DartTableInputSpecSlicerTest extends InitializedNullHandlingTest ); /** - * Segment that should be ignored (for now) because it's realtime-only. + * Segment that's realtime-only. */ private static final DataSegment SEGMENT5 = new DataSegment( DATASOURCE, @@ -172,6 +176,22 @@ public class DartTableInputSpecSlicerTest extends InitializedNullHandlingTest BYTES_PER_SEGMENT ); + /** + * Segment that's realtime and located at the same host as segment 5 + */ + private static final DataSegment SEGMENT6 = new DataSegment( + DATASOURCE, + Intervals.of("2004/2005"), + "1", + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + new NumberedShardSpec(0, 1), + null, + null, + BYTES_PER_SEGMENT + ); + /** * Mapping of segment to servers (indexes in {@link #SERVERS}). */ @@ -182,6 +202,7 @@ public class DartTableInputSpecSlicerTest extends InitializedNullHandlingTest .put(SEGMENT3, IntLists.emptyList()) .put(SEGMENT4, IntList.of(1)) .put(SEGMENT5, IntList.of(2)) + .put(SEGMENT6, IntList.of(2)) .build(); private AutoCloseable mockCloser; @@ -206,7 +227,7 @@ public class DartTableInputSpecSlicerTest extends InitializedNullHandlingTest void setUp() { mockCloser = MockitoAnnotations.openMocks(this); - slicer = DartTableInputSpecSlicer.createFromWorkerIds(WORKER_IDS, serverView); + slicer = DartTableInputSpecSlicer.createFromWorkerIds(WORKER_IDS, serverView, QueryContext.empty()); // Add all segments to the timeline, round-robin across the two servers. timeline = new VersionedIntervalTimeline<>(Ordering.natural()); @@ -296,7 +317,25 @@ public void test_sliceStatic_wholeTable_oneSlice() SEGMENT3.getShardSpec().getPartitionNum() ) ), - ImmutableList.of() + ImmutableList.of( + new DataServerRequestDescriptor( + SERVERS.get(2), + ImmutableList.of( + new RichSegmentDescriptor( + SEGMENT5.getInterval(), + SEGMENT5.getInterval(), + SEGMENT5.getVersion(), + SEGMENT5.getShardSpec().getPartitionNum() + ), + new RichSegmentDescriptor( + SEGMENT6.getInterval(), + SEGMENT6.getInterval(), + SEGMENT6.getVersion(), + SEGMENT6.getShardSpec().getPartitionNum() + ) + ) + ) + ) ) ), inputSlices @@ -340,7 +379,25 @@ public void test_sliceStatic_wholeTable_twoSlices() SEGMENT2.getShardSpec().getPartitionNum() ) ), - ImmutableList.of() + ImmutableList.of( + new DataServerRequestDescriptor( + SERVERS.get(2), + ImmutableList.of( + new RichSegmentDescriptor( + SEGMENT5.getInterval(), + SEGMENT5.getInterval(), + SEGMENT5.getVersion(), + SEGMENT5.getShardSpec().getPartitionNum() + ), + new RichSegmentDescriptor( + SEGMENT6.getInterval(), + SEGMENT6.getInterval(), + SEGMENT6.getVersion(), + SEGMENT6.getShardSpec().getPartitionNum() + ) + ) + ) + ) ) ), inputSlices @@ -384,7 +441,25 @@ public void test_sliceStatic_wholeTable_threeSlices() SEGMENT2.getShardSpec().getPartitionNum() ) ), - ImmutableList.of() + ImmutableList.of( + new DataServerRequestDescriptor( + SERVERS.get(2), + ImmutableList.of( + new RichSegmentDescriptor( + SEGMENT5.getInterval(), + SEGMENT5.getInterval(), + SEGMENT5.getVersion(), + SEGMENT5.getShardSpec().getPartitionNum() + ), + new RichSegmentDescriptor( + SEGMENT6.getInterval(), + SEGMENT6.getInterval(), + SEGMENT6.getVersion(), + SEGMENT6.getShardSpec().getPartitionNum() + ) + ) + ) + ) ), NilInputSlice.INSTANCE ), @@ -437,7 +512,29 @@ public void test_sliceStatic_dimensionFilter_twoSlices() ), ImmutableList.of() ), - NilInputSlice.INSTANCE + new SegmentsInputSlice( + DATASOURCE, + ImmutableList.of(), + ImmutableList.of( + new DataServerRequestDescriptor( + SERVERS.get(2), + ImmutableList.of( + new RichSegmentDescriptor( + SEGMENT5.getInterval(), + SEGMENT5.getInterval(), + SEGMENT5.getVersion(), + SEGMENT5.getShardSpec().getPartitionNum() + ), + new RichSegmentDescriptor( + SEGMENT6.getInterval(), + SEGMENT6.getInterval(), + SEGMENT6.getVersion(), + SEGMENT6.getShardSpec().getPartitionNum() + ) + ) + ) + ) + ) ), inputSlices ); @@ -487,4 +584,52 @@ public void test_sliceStatic_timeFilter_twoSlices() inputSlices ); } + + @Test + void test_withoutRealtime_twoSlices() + { + final QueryContext queryContext = QueryContext.of(Map.of(MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE, SegmentSource.NONE.toString())); + slicer = DartTableInputSpecSlicer.createFromWorkerIds(WORKER_IDS, serverView, queryContext); + + // When 2 slices are requested, we assign segments to the servers that have those segments. + + final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null, null, null); + final List inputSlices = slicer.sliceStatic(inputSpec, 2); + // Expect segment 2 and then the realtime segments 5 and 6 to be assigned round-robin. + Assertions.assertEquals( + ImmutableList.of( + new SegmentsInputSlice( + DATASOURCE, + ImmutableList.of( + new RichSegmentDescriptor( + SEGMENT1.getInterval(), + SEGMENT1.getInterval(), + SEGMENT1.getVersion(), + SEGMENT1.getShardSpec().getPartitionNum() + ), + new RichSegmentDescriptor( + SEGMENT3.getInterval(), + SEGMENT3.getInterval(), + SEGMENT3.getVersion(), + SEGMENT3.getShardSpec().getPartitionNum() + ) + ), + ImmutableList.of() + ), + new SegmentsInputSlice( + DATASOURCE, + ImmutableList.of( + new RichSegmentDescriptor( + SEGMENT2.getInterval(), + SEGMENT2.getInterval(), + SEGMENT2.getVersion(), + SEGMENT2.getShardSpec().getPartitionNum() + ) + ), + ImmutableList.of() + ) + ), + inputSlices + ); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java index 641836f5a265..d4943fc3c5f9 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java @@ -21,6 +21,8 @@ import com.google.inject.Binder; import com.google.inject.Provides; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.guice.LazySingleton; @@ -121,6 +123,7 @@ final DruidNodeDiscoveryProvider getDiscoveryProvider() @Override public void configure(Binder binder) { + binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index a2600999d905..626ce3c1f439 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -48,6 +48,7 @@ import org.apache.druid.msq.exec.Controller; import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.ControllerMemoryParameters; +import org.apache.druid.msq.exec.SegmentSource; import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.exec.WorkerClient; import org.apache.druid.msq.exec.WorkerFailureListener; @@ -324,7 +325,7 @@ public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager) return new IndexerTableInputSpecSlicer( coordinatorClient, taskActionClient, - MultiStageQueryContext.getSegmentSources(queryContext) + MultiStageQueryContext.getSegmentSources(queryContext, SegmentSource.NONE) ); } @@ -371,7 +372,7 @@ public WorkerClient newWorkerClient() } @Override - public ControllerContext newContext(String queryId) + public ControllerContext newContext(QueryContext context) { return this; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java index a002b2779733..95b1a7fc97de 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java @@ -43,6 +43,7 @@ import org.apache.druid.msq.exec.WorkerStorageParameters; import org.apache.druid.msq.kernel.StageId; import org.apache.druid.msq.kernel.WorkOrder; +import org.apache.druid.query.QueryContext; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.server.DruidNode; @@ -76,7 +77,7 @@ public TestDartControllerContextFactoryImpl( } @Override - public ControllerContext newContext(String queryId) + public ControllerContext newContext(QueryContext context) { return new DartControllerContext( injector, @@ -85,7 +86,8 @@ public ControllerContext newContext(String queryId) new DartTestWorkerClient(), memoryIntrospector, serverView, - emitter + emitter, + context ) { @Override diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java index 1d42ff3dc9ee..4a26c0ed6faf 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java @@ -229,4 +229,14 @@ public boolean hasData() { return segment.get().hasData(); } + + /** + * Checks if the segment is currently served by a realtime server, and is not served by a historical. + */ + public boolean isRealtimeSegment() + { + synchronized (this) { + return (!realtimeServers.isEmpty()) && historicalServers.isEmpty(); + } + } }