From 2951c307ecf2a72f68a2c3eccc2a7cc206494bc5 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 16 Sep 2024 00:50:25 -0700 Subject: [PATCH 1/2] MSQ: Add QueryKitSpec to encapsulate QueryKit params. This patch introduces QueryKitSpec, an object that encapsulates the parameters to makeQueryDefinition that are consistent from call to call. This simplifies things because we avoid passing around all the components individually. This patch also splits "maxWorkerCount" into "maxLeafWorkerCount" and "maxNonLeafWorkerCount", which apply to leaf stages (no other stages as inputs) and nonleaf stages respectively. Finally, this patch also rovides a way for ControllerContext to supply a QueryKitSpec to its liking. It is expected that this will be used by controllers of quick interactive queries to set maxNonLeafWorkerCount = 1, which will generate fanning-in query plans. --- .../druid/msq/exec/ControllerContext.java | 14 +- .../apache/druid/msq/exec/ControllerImpl.java | 29 ++-- .../indexing/IndexerControllerContext.java | 27 +++- .../druid/msq/querykit/DataSourcePlan.java | 136 +++++------------- .../druid/msq/querykit/MultiQueryKit.java | 10 +- .../apache/druid/msq/querykit/QueryKit.java | 12 +- .../druid/msq/querykit/QueryKitSpec.java | 109 ++++++++++++++ .../msq/querykit/WindowOperatorQueryKit.java | 26 ++-- .../msq/querykit/groupby/GroupByQueryKit.java | 28 ++-- .../druid/msq/querykit/scan/ScanQueryKit.java | 19 ++- .../msq/test/MSQTestControllerContext.java | 26 +++- 11 files changed, 242 insertions(+), 194 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index bc449d141203..3c9e6de1d7ee 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -31,7 +31,8 @@ import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.querykit.QueryKit; -import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.msq.querykit.QueryKitSpec; +import org.apache.druid.query.Query; import org.apache.druid.server.DruidNode; /** @@ -104,8 +105,13 @@ WorkerManager newWorkerManager( WorkerClient newWorkerClient(); /** - * Default target partitions per worker for {@link QueryKit#makeQueryDefinition}. Can be overridden using - * {@link MultiStageQueryContext#CTX_TARGET_PARTITIONS_PER_WORKER}. + * Create a {@link QueryKitSpec}. This method provides controller contexts a way to customize parameters around the + * number of workers and partitions. */ - int defaultTargetPartitionsPerWorker(); + QueryKitSpec makeQueryKitSpec( + QueryKit> queryKit, + String queryId, + MSQSpec querySpec, + ControllerQueryKernelConfig queryKernelConfig + ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 6d1ef21abbf2..1ccd43989b7f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -151,6 +151,7 @@ import org.apache.druid.msq.kernel.controller.WorkerInputs; import org.apache.druid.msq.querykit.MultiQueryKit; import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.querykit.ShuffleSpecFactory; import org.apache.druid.msq.querykit.WindowOperatorQueryKit; @@ -566,14 +567,9 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) final QueryContext queryContext = querySpec.getQuery().context(); final QueryDefinition queryDef = makeQueryDefinition( - queryId(), - makeQueryControllerToolKit(), + context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId, querySpec, queryKernelConfig), querySpec, context.jsonMapper(), - MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault( - queryContext, - context.defaultTargetPartitionsPerWorker() - ), resultsContext ); @@ -1196,7 +1192,7 @@ private Int2ObjectMap makeWorkerFactoryInfosForStage( } @SuppressWarnings("rawtypes") - private QueryKit makeQueryControllerToolKit() + private QueryKit> makeQueryControllerToolKit() { final Map, QueryKit> kitMap = ImmutableMap., QueryKit>builder() @@ -1720,11 +1716,9 @@ private void cleanUpDurableStorageIfNeeded() @SuppressWarnings("unchecked") private static QueryDefinition makeQueryDefinition( - final String queryId, - @SuppressWarnings("rawtypes") final QueryKit toolKit, + final QueryKitSpec queryKitSpec, final MSQSpec querySpec, final ObjectMapper jsonMapper, - final int targetPartitionsPerWorker, final ResultsContext resultsContext ) { @@ -1768,13 +1762,10 @@ private static QueryDefinition makeQueryDefinition( final QueryDefinition queryDef; try { - queryDef = toolKit.makeQueryDefinition( - queryId, + queryDef = queryKitSpec.getQueryKit().makeQueryDefinition( + queryKitSpec, queryToPlan, - toolKit, resultShuffleSpecFactory, - tuningConfig.getMaxNumWorkers(), - targetPartitionsPerWorker, 0 ); } @@ -1803,7 +1794,7 @@ private static QueryDefinition makeQueryDefinition( // Add all query stages. // Set shuffleCheckHasMultipleValues on the stage that serves as input to the final segment-generation stage. - final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId()); for (final StageDefinition stageDef : queryDef.getStageDefinitions()) { if (stageDef.equals(finalShuffleStageDef)) { @@ -1829,7 +1820,7 @@ private static QueryDefinition makeQueryDefinition( // attaching new query results stage if the final stage does sort during shuffle so that results are ordered. StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); if (finalShuffleStageDef.doesSortDuringShuffle()) { - final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId()); builder.addAll(queryDef); builder.add(StageDefinition.builder(queryDef.getNextStageNumber()) .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) @@ -1866,7 +1857,7 @@ private static QueryDefinition makeQueryDefinition( } final ResultFormat resultFormat = exportMSQDestination.getResultFormat(); - final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId()); builder.addAll(queryDef); builder.add(StageDefinition.builder(queryDef.getNextStageNumber()) .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) @@ -1874,7 +1865,7 @@ private static QueryDefinition makeQueryDefinition( .signature(queryDef.getFinalStageDefinition().getSignature()) .shuffleSpec(null) .processorFactory(new ExportResultsFrameProcessorFactory( - queryId, + queryKitSpec.getQueryId(), exportStorageProvider, resultFormat, columnMappings, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 42808f647426..7f00a9f82a49 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -47,9 +47,13 @@ import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.table.TableInputSpecSlicer; import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; +import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.DruidMetrics; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.indexing.OverlordClient; @@ -202,11 +206,26 @@ public WorkerManager newWorkerManager( } @Override - public int defaultTargetPartitionsPerWorker() + public QueryKitSpec makeQueryKitSpec( + final QueryKit> queryKit, + final String queryId, + final MSQSpec querySpec, + final ControllerQueryKernelConfig queryKernelConfig + ) { - // Assume tasks are symmetric: workers have the same number of processors available as a controller. - // Create one partition per processor per task, for maximum parallelism. - return memoryIntrospector.numProcessingThreads(); + return new QueryKitSpec( + queryKit, + queryId, + querySpec.getTuningConfig().getMaxNumWorkers(), + querySpec.getTuningConfig().getMaxNumWorkers(), + + // Assume tasks are symmetric: workers have the same number of processors available as a controller. + // Create one partition per processor per task, for maximum parallelism. + MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault( + querySpec.getQuery().context(), + memoryIntrospector.numProcessingThreads() + ) + ); } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 15fe6263ed83..21848813e5d3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -123,8 +123,7 @@ public class DataSourcePlan /** * Build a plan. * - * @param queryKit query kit reference for recursive planning - * @param queryId query ID + * @param queryKitSpec reference for recursive planning * @param queryContext query context * @param dataSource datasource to plan * @param querySegmentSpec intervals for mandatory pruning. Must be {@link MultipleIntervalSegmentSpec}. The returned @@ -132,22 +131,17 @@ public class DataSourcePlan * @param filter filter for best-effort pruning. The returned plan may or may not be filtered to this * filter. Query processing must still apply the filter to generated correct results. * @param filterFields which fields from the filter to consider for pruning, or null to consider all fields. - * @param maxWorkerCount maximum number of workers for subqueries * @param minStageNumber starting stage number for subqueries * @param broadcast whether the plan should broadcast data for this datasource - * @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries */ @SuppressWarnings("rawtypes") public static DataSourcePlan forDataSource( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final DataSource dataSource, final QuerySegmentSpec querySegmentSpec, @Nullable DimFilter filter, @Nullable Set filterFields, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -182,51 +176,38 @@ public static DataSourcePlan forDataSource( return forLookup((LookupDataSource) dataSource, broadcast); } else if (dataSource instanceof FilteredDataSource) { return forFilteredDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, (FilteredDataSource) dataSource, querySegmentSpec, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); } else if (dataSource instanceof UnnestDataSource) { return forUnnest( - queryKit, - queryId, + queryKitSpec, queryContext, (UnnestDataSource) dataSource, querySegmentSpec, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); } else if (dataSource instanceof QueryDataSource) { checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forQuery( - queryKit, - queryId, + queryKitSpec, (QueryDataSource) dataSource, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, - broadcast, - queryContext + broadcast ); } else if (dataSource instanceof UnionDataSource) { return forUnion( - queryKit, - queryId, + queryKitSpec, queryContext, (UnionDataSource) dataSource, querySegmentSpec, filter, filterFields, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -240,27 +221,21 @@ public static DataSourcePlan forDataSource( switch (deducedJoinAlgorithm) { case BROADCAST: return forBroadcastHashJoin( - queryKit, - queryId, + queryKitSpec, queryContext, (JoinDataSource) dataSource, querySegmentSpec, filter, filterFields, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); case SORT_MERGE: return forSortMergeJoin( - queryKit, - queryId, + queryKitSpec, (JoinDataSource) dataSource, querySegmentSpec, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -422,25 +397,18 @@ private static DataSourcePlan forLookup( } private static DataSourcePlan forQuery( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryDataSource dataSource, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, - final boolean broadcast, - @Nullable final QueryContext parentContext + final boolean broadcast ) { - final QueryDefinition subQueryDef = queryKit.makeQueryDefinition( - queryId, + final QueryDefinition subQueryDef = queryKitSpec.getQueryKit().makeQueryDefinition( + queryKitSpec, // Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the // outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong. dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY), - queryKit, - ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount * targetPartitionsPerWorker), - maxWorkerCount, - targetPartitionsPerWorker, + ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle()), minStageNumber ); @@ -455,27 +423,21 @@ private static DataSourcePlan forQuery( } private static DataSourcePlan forFilteredDataSource( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final FilteredDataSource dataSource, final QuerySegmentSpec querySegmentSpec, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) { final DataSourcePlan basePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, dataSource.getBase(), querySegmentSpec, null, null, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -497,28 +459,22 @@ private static DataSourcePlan forFilteredDataSource( * Build a plan for Unnest data source */ private static DataSourcePlan forUnnest( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final UnnestDataSource dataSource, final QuerySegmentSpec querySegmentSpec, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) { // Find the plan for base data source by recursing final DataSourcePlan basePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, dataSource.getBase(), querySegmentSpec, null, null, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -543,15 +499,12 @@ private static DataSourcePlan forUnnest( } private static DataSourcePlan forUnion( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final UnionDataSource unionDataSource, final QuerySegmentSpec querySegmentSpec, @Nullable DimFilter filter, @Nullable Set filterFields, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -559,22 +512,19 @@ private static DataSourcePlan forUnion( // This is done to prevent loss of generality since MSQ can plan any type of DataSource. List children = unionDataSource.getDataSources(); - final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); final List newChildren = new ArrayList<>(); final List inputSpecs = new ArrayList<>(); final IntSet broadcastInputs = new IntOpenHashSet(); for (DataSource child : children) { DataSourcePlan childDataSourcePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, child, querySegmentSpec, filter, filterFields, - maxWorkerCount, - targetPartitionsPerWorker, Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()), broadcast ); @@ -598,32 +548,26 @@ private static DataSourcePlan forUnion( * Build a plan for broadcast hash-join. */ private static DataSourcePlan forBroadcastHashJoin( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final JoinDataSource dataSource, final QuerySegmentSpec querySegmentSpec, @Nullable final DimFilter filter, @Nullable final Set filterFields, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) { - final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); final DataSourceAnalysis analysis = dataSource.getAnalysis(); final DataSourcePlan basePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, analysis.getBaseDataSource(), querySegmentSpec, filter, filter == null ? null : DimFilterUtils.onlyBaseFields(filterFields, analysis), - maxWorkerCount, - targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), broadcast ); @@ -636,15 +580,12 @@ private static DataSourcePlan forBroadcastHashJoin( for (int i = 0; i < analysis.getPreJoinableClauses().size(); i++) { final PreJoinableClause clause = analysis.getPreJoinableClauses().get(i); final DataSourcePlan clausePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, clause.getDataSource(), new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY), null, // Don't push down query filters for right-hand side: needs some work to ensure it works properly. null, - maxWorkerCount, - targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), true // Always broadcast right-hand side of the join. ); @@ -674,12 +615,9 @@ private static DataSourcePlan forBroadcastHashJoin( * Build a plan for sort-merge join. */ private static DataSourcePlan forSortMergeJoin( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final JoinDataSource dataSource, final QuerySegmentSpec querySegmentSpec, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -692,20 +630,16 @@ private static DataSourcePlan forSortMergeJoin( SortMergeJoinFrameProcessorFactory.validateCondition(dataSource.getConditionAnalysis()) ); - final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); // Plan the left input. // We're confident that we can cast dataSource.getLeft() to QueryDataSource, because DruidJoinQueryRel creates // subqueries when the join algorithm is sortMerge. final DataSourcePlan leftPlan = forQuery( - queryKit, - queryId, + queryKitSpec, (QueryDataSource) dataSource.getLeft(), - maxWorkerCount, - targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), - false, - null + false ); leftPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); @@ -713,14 +647,10 @@ private static DataSourcePlan forSortMergeJoin( // We're confident that we can cast dataSource.getRight() to QueryDataSource, because DruidJoinQueryRel creates // subqueries when the join algorithm is sortMerge. final DataSourcePlan rightPlan = forQuery( - queryKit, - queryId, + queryKitSpec, (QueryDataSource) dataSource.getRight(), - maxWorkerCount, - targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), - false, - null + false ); rightPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); @@ -729,7 +659,7 @@ private static DataSourcePlan forSortMergeJoin( ((StageInputSpec) Iterables.getOnlyElement(leftPlan.getInputSpecs())).getStageNumber() ); - final int hashPartitionCount = maxWorkerCount * targetPartitionsPerWorker; + final int hashPartitionCount = queryKitSpec.getNumPartitionsForShuffle(); final List leftPartitionKey = partitionKeys.get(0); leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), hashPartitionCount)); leftBuilder.signature(QueryKitUtils.sortableSignature(leftBuilder.getSignature(), leftPartitionKey)); @@ -768,7 +698,7 @@ private static DataSourcePlan forSortMergeJoin( Iterables.getOnlyElement(rightPlan.getInputSpecs()) ) ) - .maxWorkerCount(maxWorkerCount) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) .signature(joinSignatureBuilder.build()) .processorFactory( new SortMergeJoinFrameProcessorFactory( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java index 37f453f6c060..3129bbfacb9c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java @@ -41,12 +41,9 @@ public MultiQueryKit(final Map, QueryKit> toolKitMap) @Override public QueryDefinition makeQueryDefinition( - String queryId, + QueryKitSpec queryKitSpec, Query query, - QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, - int maxWorkerCount, - int targetPartitionsPerWorker, int minStageNumber ) { @@ -55,12 +52,9 @@ public QueryDefinition makeQueryDefinition( if (specificToolKit != null) { //noinspection unchecked return specificToolKit.makeQueryDefinition( - queryId, + queryKitSpec, query, - this, resultShuffleSpecFactory, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber ); } else { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java index 2bc0ad0725a8..118091ccbd49 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java @@ -30,25 +30,17 @@ public interface QueryKit> /** * Creates a {@link QueryDefinition} from a {@link Query}. * - * @param queryId query ID of the resulting {@link QueryDefinition} + * @param queryKitSpec collection of parameters necessary for planning {@link QueryDefinition} * @param query native query to translate - * @param toolKitForSubQueries kit that is used to translate native subqueries; i.e., - * {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}. * @param resultShuffleSpecFactory shuffle spec factory for the final output of this query. - * @param maxWorkerCount maximum number of workers: becomes - * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()} * @param minStageNumber lowest stage number to use for any generated stages. Useful if the resulting * {@link QueryDefinition} is going to be added to an existing * {@link org.apache.druid.msq.kernel.QueryDefinitionBuilder}. - * @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries */ QueryDefinition makeQueryDefinition( - String queryId, + QueryKitSpec queryKitSpec, QueryType query, - QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, - int maxWorkerCount, - int targetPartitionsPerWorker, int minStageNumber ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java new file mode 100644 index 000000000000..0596af10bb9d --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import org.apache.druid.msq.input.InputSpec; +import org.apache.druid.msq.input.InputSpecs; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.query.Query; + +import java.util.List; + +/** + * Collection of parameters for {@link QueryKit#makeQueryDefinition}. + */ +public class QueryKitSpec +{ + private final QueryKit> queryKit; + private final String queryId; + private final int maxLeafWorkerCount; + private final int maxNonLeafWorkerCount; + private final int targetPartitionsPerWorker; + + /** + * @param queryKit kit that is used to translate native subqueries; i.e., + * @param queryId queryId of the resulting {@link QueryDefinition} + * {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}. + * @param maxLeafWorkerCount maximum number of workers for leaf stages: becomes + * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()} + * @param maxNonLeafWorkerCount maximum number of workers for non-leaf stages: becomes + * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()} + * @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries + */ + public QueryKitSpec( + QueryKit> queryKit, + String queryId, + int maxLeafWorkerCount, + int maxNonLeafWorkerCount, + int targetPartitionsPerWorker + ) + { + this.queryId = queryId; + this.queryKit = queryKit; + this.maxLeafWorkerCount = maxLeafWorkerCount; + this.maxNonLeafWorkerCount = maxNonLeafWorkerCount; + this.targetPartitionsPerWorker = targetPartitionsPerWorker; + } + + /** + * Instance of {@link QueryKit} for recursive calls. + */ + public QueryKit> getQueryKit() + { + return queryKit; + } + + /** + * Query ID to use when building {@link QueryDefinition}. + */ + public String getQueryId() + { + return queryId; + } + + /** + * Maximum worker count for a stage with the given inputs. Will use {@link #maxNonLeafWorkerCount} if there are + * any stage inputs, {@link #maxLeafWorkerCount} otherwise. + */ + public int getMaxWorkerCount(final List inputSpecs) + { + if (InputSpecs.getStageNumbers(inputSpecs).isEmpty()) { + return maxLeafWorkerCount; + } else { + return maxNonLeafWorkerCount; + } + } + + /** + * Maximum number of workers for non-leaf stages (where there are some stage inputs). + */ + public int getMaxNonLeafWorkerCount() + { + return maxNonLeafWorkerCount; + } + + /** + * Number of partitions to generate during a shuffle. + */ + public int getNumPartitionsForShuffle() + { + return maxNonLeafWorkerCount * targetPartitionsPerWorker; + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index b1af153fafde..02542f8e7366 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -35,7 +35,6 @@ import org.apache.druid.msq.kernel.ShuffleSpec; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.util.MultiStageQueryContext; -import org.apache.druid.query.Query; import org.apache.druid.query.operator.ColumnWithDirection; import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; import org.apache.druid.query.operator.NaiveSortOperatorFactory; @@ -63,12 +62,9 @@ public WindowOperatorQueryKit(ObjectMapper jsonMapper) @Override public QueryDefinition makeQueryDefinition( - String queryId, + QueryKitSpec queryKitSpec, WindowOperatorQuery originalQuery, - QueryKit> queryKit, ShuffleSpecFactory resultShuffleSpecFactory, - int maxWorkerCount, - int targetPartitionsPerWorker, int minStageNumber ) { @@ -90,22 +86,22 @@ public QueryDefinition makeQueryDefinition( log.info("Created operatorList with operator factories: [%s]", operatorList); final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( - queryKit, - queryId, + queryKitSpec, originalQuery.context(), originalQuery.getDataSource(), originalQuery.getQuerySegmentSpec(), originalQuery.getFilter(), null, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, false ); - ShuffleSpec nextShuffleSpec = - findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount * targetPartitionsPerWorker); - final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryId, dataSourcePlan, nextShuffleSpec); + ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow( + operatorList.get(0), + queryKitSpec.getNumPartitionsForShuffle() + ); + final QueryDefinitionBuilder queryDefBuilder = + makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec); final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); @@ -133,7 +129,7 @@ public QueryDefinition makeQueryDefinition( StageDefinition.builder(firstStageNumber) .inputs(new StageInputSpec(firstStageNumber - 1)) .signature(finalWindowStageRowSignature) - .maxWorkerCount(maxWorkerCount) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) .shuffleSpec(finalWindowStageShuffleSpec) .processorFactory(new WindowOperatorQueryFrameProcessorFactory( queryToRun, @@ -196,7 +192,7 @@ public QueryDefinition makeQueryDefinition( nextShuffleSpec = finalWindowStageShuffleSpec; } else { nextShuffleSpec = - findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount * targetPartitionsPerWorker); + findShuffleSpecForNextWindow(operatorList.get(i + 1), queryKitSpec.getNumPartitionsForShuffle()); if (nextShuffleSpec == null) { stageRowSignature = intermediateSignature; } else { @@ -233,7 +229,7 @@ public QueryDefinition makeQueryDefinition( StageDefinition.builder(firstStageNumber + i) .inputs(new StageInputSpec(firstStageNumber + i - 1)) .signature(stageRowSignature) - .maxWorkerCount(maxWorkerCount) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) .shuffleSpec(nextShuffleSpec) .processorFactory(new WindowOperatorQueryFrameProcessorFactory( queryToRun, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index 45a91a3d8870..db56bd02f742 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -34,12 +34,12 @@ import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.querykit.DataSourcePlan; import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.querykit.ShuffleSpecFactories; import org.apache.druid.msq.querykit.ShuffleSpecFactory; import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; import org.apache.druid.query.DimensionComparisonUtils; -import org.apache.druid.query.Query; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.having.AlwaysHavingSpec; @@ -66,28 +66,22 @@ public GroupByQueryKit(ObjectMapper jsonMapper) @Override public QueryDefinition makeQueryDefinition( - final String queryId, + final QueryKitSpec queryKitSpec, final GroupByQuery originalQuery, - final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber ) { validateQuery(originalQuery); - final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( - queryKit, - queryId, + queryKitSpec, originalQuery.context(), originalQuery.getDataSource(), originalQuery.getQuerySegmentSpec(), originalQuery.getFilter(), null, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, false ); @@ -144,7 +138,7 @@ public QueryDefinition makeQueryDefinition( shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() ? ShuffleSpecFactories.singlePartition() - : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount * targetPartitionsPerWorker); + : ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle()); if (doLimitOrOffset) { shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint); @@ -169,7 +163,10 @@ public QueryDefinition makeQueryDefinition( .broadcastInputs(dataSourcePlan.getBroadcastInputs()) .signature(intermediateSignature) .shuffleSpec(shuffleSpecFactoryPreAggregation.build(intermediateClusterBy, true)) - .maxWorkerCount(dataSourcePlan.isSingleWorker() ? 1 : maxWorkerCount) + .maxWorkerCount( + dataSourcePlan.isSingleWorker() + ? 1 + : queryKitSpec.getMaxWorkerCount(dataSourcePlan.getInputSpecs())) .processorFactory(new GroupByPreShuffleFrameProcessorFactory(queryToRun)) ); @@ -189,7 +186,7 @@ public QueryDefinition makeQueryDefinition( StageDefinition.builder(firstStageNumber + 1) .inputs(new StageInputSpec(firstStageNumber)) .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) .shuffleSpec( shuffleSpecFactoryPostAggregation != null ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) @@ -390,7 +387,10 @@ private static void validateQuery(final GroupByQuery query) for (final OrderByColumnSpec column : defaultLimitSpec.getColumns()) { final Optional type = resultSignature.getColumnType(column.getDimension()); - if (!type.isPresent() || !DimensionComparisonUtils.isNaturalComparator(type.get().getType(), column.getDimensionComparator())) { + if (!type.isPresent() || !DimensionComparisonUtils.isNaturalComparator( + type.get().getType(), + column.getDimensionComparator() + )) { throw new ISE( "Must use natural comparator for column [%s] of type [%s]", column.getDimension(), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index 051caeb0e718..8d23e289bb67 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -33,13 +33,13 @@ import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.querykit.DataSourcePlan; import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.querykit.ShuffleSpecFactories; import org.apache.druid.msq.querykit.ShuffleSpecFactory; import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; import org.apache.druid.query.Order; import org.apache.druid.query.OrderBy; -import org.apache.druid.query.Query; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -86,26 +86,20 @@ public static RowSignature getAndValidateSignature(final ScanQuery scanQuery, fi // partition without a ClusterBy, we don't need to necessarily create it via the resultShuffleSpecFactory provided @Override public QueryDefinition makeQueryDefinition( - final String queryId, + final QueryKitSpec queryKitSpec, final ScanQuery originalQuery, - final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber ) { - final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( - queryKit, - queryId, + queryKitSpec, originalQuery.context(), originalQuery.getDataSource(), originalQuery.getQuerySegmentSpec(), originalQuery.getFilter(), null, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, false ); @@ -179,7 +173,10 @@ public QueryDefinition makeQueryDefinition( .broadcastInputs(dataSourcePlan.getBroadcastInputs()) .shuffleSpec(scanShuffleSpec) .signature(signatureToUse) - .maxWorkerCount(dataSourcePlan.isSingleWorker() ? 1 : maxWorkerCount) + .maxWorkerCount( + dataSourcePlan.isSingleWorker() + ? 1 + : queryKitSpec.getMaxWorkerCount(dataSourcePlan.getInputSpecs())) .processorFactory(new ScanQueryFrameProcessorFactory(queryToRun)) ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index 3034be399849..f844268cc365 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -61,7 +61,10 @@ import org.apache.druid.msq.input.table.TableInputSpecSlicer; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; +import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.DruidNode; @@ -274,6 +277,23 @@ public ControllerQueryKernelConfig queryKernelConfig(MSQSpec querySpec, QueryDef return IndexerControllerContext.makeQueryKernelConfig(querySpec, new ControllerMemoryParameters(100_000_000)); } + @Override + public QueryKitSpec makeQueryKitSpec( + final QueryKit> queryKit, + final String queryId, + final MSQSpec querySpec, + final ControllerQueryKernelConfig queryKernelConfig + ) + { + return new QueryKitSpec( + queryKit, + queryId, + querySpec.getTuningConfig().getMaxNumWorkers(), + querySpec.getTuningConfig().getMaxNumWorkers(), + 1 + ); + } + @Override public void emitMetric(String metric, Number value) { @@ -342,10 +362,4 @@ public WorkerClient newWorkerClient() { return new MSQTestWorkerClient(inMemoryWorkers); } - - @Override - public int defaultTargetPartitionsPerWorker() - { - return 1; - } } From 921ffe0366dbfe5d80b02992b7e08db12a193627 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 17 Sep 2024 10:24:23 -0700 Subject: [PATCH 2/2] Fix javadoc. --- .../main/java/org/apache/druid/msq/querykit/QueryKitSpec.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java index 0596af10bb9d..7cae4ed7d7bf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java @@ -39,8 +39,8 @@ public class QueryKitSpec /** * @param queryKit kit that is used to translate native subqueries; i.e., - * @param queryId queryId of the resulting {@link QueryDefinition} * {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}. + * @param queryId queryId of the resulting {@link QueryDefinition} * @param maxLeafWorkerCount maximum number of workers for leaf stages: becomes * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()} * @param maxNonLeafWorkerCount maximum number of workers for non-leaf stages: becomes