diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 58b32e96e7fa..42515a1779ad 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -30,7 +30,8 @@ import org.apache.druid.msq.input.table.TableInputSpec; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.msq.querykit.QueryKit; -import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.msq.querykit.QueryKitSpec; +import org.apache.druid.query.Query; import org.apache.druid.server.DruidNode; /** @@ -103,8 +104,13 @@ WorkerManager newWorkerManager( WorkerClient newWorkerClient(); /** - * Default target partitions per worker for {@link QueryKit#makeQueryDefinition}. Can be overridden using - * {@link MultiStageQueryContext#CTX_TARGET_PARTITIONS_PER_WORKER}. + * Create a {@link QueryKitSpec}. This method provides controller contexts a way to customize parameters around the + * number of workers and partitions. */ - int defaultTargetPartitionsPerWorker(); + QueryKitSpec makeQueryKitSpec( + QueryKit> queryKit, + String queryId, + MSQSpec querySpec, + ControllerQueryKernelConfig queryKernelConfig + ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 8457675e8f26..72d8216088fe 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -152,6 +152,7 @@ import org.apache.druid.msq.kernel.controller.WorkerInputs; import org.apache.druid.msq.querykit.MultiQueryKit; import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.querykit.ShuffleSpecFactory; import org.apache.druid.msq.querykit.WindowOperatorQueryKit; @@ -567,14 +568,9 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) final QueryContext queryContext = querySpec.getQuery().context(); final QueryDefinition queryDef = makeQueryDefinition( - queryId(), - makeQueryControllerToolKit(), + context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId, querySpec, queryKernelConfig), querySpec, context.jsonMapper(), - MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault( - queryContext, - context.defaultTargetPartitionsPerWorker() - ), resultsContext ); @@ -1201,7 +1197,7 @@ private Int2ObjectMap makeWorkerFactoryInfosForStage( } @SuppressWarnings("rawtypes") - private QueryKit makeQueryControllerToolKit() + private QueryKit> makeQueryControllerToolKit() { final Map, QueryKit> kitMap = ImmutableMap., QueryKit>builder() @@ -1725,11 +1721,9 @@ private void cleanUpDurableStorageIfNeeded() @SuppressWarnings("unchecked") private static QueryDefinition makeQueryDefinition( - final String queryId, - @SuppressWarnings("rawtypes") final QueryKit toolKit, + final QueryKitSpec queryKitSpec, final MSQSpec querySpec, final ObjectMapper jsonMapper, - final int targetPartitionsPerWorker, final ResultsContext resultsContext ) { @@ -1773,13 +1767,10 @@ private static QueryDefinition makeQueryDefinition( final QueryDefinition queryDef; try { - queryDef = toolKit.makeQueryDefinition( - queryId, + queryDef = queryKitSpec.getQueryKit().makeQueryDefinition( + queryKitSpec, queryToPlan, - toolKit, resultShuffleSpecFactory, - tuningConfig.getMaxNumWorkers(), - targetPartitionsPerWorker, 0 ); } @@ -1808,7 +1799,7 @@ private static QueryDefinition makeQueryDefinition( // Add all query stages. // Set shuffleCheckHasMultipleValues on the stage that serves as input to the final segment-generation stage. - final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId()); for (final StageDefinition stageDef : queryDef.getStageDefinitions()) { if (stageDef.equals(finalShuffleStageDef)) { @@ -1834,7 +1825,7 @@ private static QueryDefinition makeQueryDefinition( // attaching new query results stage if the final stage does sort during shuffle so that results are ordered. StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); if (finalShuffleStageDef.doesSortDuringShuffle()) { - final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId()); builder.addAll(queryDef); builder.add(StageDefinition.builder(queryDef.getNextStageNumber()) .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) @@ -1871,7 +1862,7 @@ private static QueryDefinition makeQueryDefinition( } final ResultFormat resultFormat = exportMSQDestination.getResultFormat(); - final QueryDefinitionBuilder builder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder builder = QueryDefinition.builder(queryKitSpec.getQueryId()); builder.addAll(queryDef); builder.add(StageDefinition.builder(queryDef.getNextStageNumber()) .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) @@ -1879,7 +1870,7 @@ private static QueryDefinition makeQueryDefinition( .signature(queryDef.getFinalStageDefinition().getSignature()) .shuffleSpec(null) .processorFactory(new ExportResultsFrameProcessorFactory( - queryId, + queryKitSpec.getQueryId(), exportStorageProvider, resultFormat, columnMappings, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 0e2cc03fda74..c148e7fc1bbf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -47,8 +47,11 @@ import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; +import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.DruidMetrics; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.indexing.OverlordClient; @@ -203,11 +206,26 @@ public WorkerManager newWorkerManager( } @Override - public int defaultTargetPartitionsPerWorker() + public QueryKitSpec makeQueryKitSpec( + final QueryKit> queryKit, + final String queryId, + final MSQSpec querySpec, + final ControllerQueryKernelConfig queryKernelConfig + ) { - // Assume tasks are symmetric: workers have the same number of processors available as a controller. - // Create one partition per processor per task, for maximum parallelism. - return memoryIntrospector.numProcessingThreads(); + return new QueryKitSpec( + queryKit, + queryId, + querySpec.getTuningConfig().getMaxNumWorkers(), + querySpec.getTuningConfig().getMaxNumWorkers(), + + // Assume tasks are symmetric: workers have the same number of processors available as a controller. + // Create one partition per processor per task, for maximum parallelism. + MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault( + querySpec.getQuery().context(), + memoryIntrospector.numProcessingThreads() + ) + ); } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 15fe6263ed83..21848813e5d3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -123,8 +123,7 @@ public class DataSourcePlan /** * Build a plan. * - * @param queryKit query kit reference for recursive planning - * @param queryId query ID + * @param queryKitSpec reference for recursive planning * @param queryContext query context * @param dataSource datasource to plan * @param querySegmentSpec intervals for mandatory pruning. Must be {@link MultipleIntervalSegmentSpec}. The returned @@ -132,22 +131,17 @@ public class DataSourcePlan * @param filter filter for best-effort pruning. The returned plan may or may not be filtered to this * filter. Query processing must still apply the filter to generated correct results. * @param filterFields which fields from the filter to consider for pruning, or null to consider all fields. - * @param maxWorkerCount maximum number of workers for subqueries * @param minStageNumber starting stage number for subqueries * @param broadcast whether the plan should broadcast data for this datasource - * @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries */ @SuppressWarnings("rawtypes") public static DataSourcePlan forDataSource( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final DataSource dataSource, final QuerySegmentSpec querySegmentSpec, @Nullable DimFilter filter, @Nullable Set filterFields, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -182,51 +176,38 @@ public static DataSourcePlan forDataSource( return forLookup((LookupDataSource) dataSource, broadcast); } else if (dataSource instanceof FilteredDataSource) { return forFilteredDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, (FilteredDataSource) dataSource, querySegmentSpec, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); } else if (dataSource instanceof UnnestDataSource) { return forUnnest( - queryKit, - queryId, + queryKitSpec, queryContext, (UnnestDataSource) dataSource, querySegmentSpec, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); } else if (dataSource instanceof QueryDataSource) { checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); return forQuery( - queryKit, - queryId, + queryKitSpec, (QueryDataSource) dataSource, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, - broadcast, - queryContext + broadcast ); } else if (dataSource instanceof UnionDataSource) { return forUnion( - queryKit, - queryId, + queryKitSpec, queryContext, (UnionDataSource) dataSource, querySegmentSpec, filter, filterFields, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -240,27 +221,21 @@ public static DataSourcePlan forDataSource( switch (deducedJoinAlgorithm) { case BROADCAST: return forBroadcastHashJoin( - queryKit, - queryId, + queryKitSpec, queryContext, (JoinDataSource) dataSource, querySegmentSpec, filter, filterFields, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); case SORT_MERGE: return forSortMergeJoin( - queryKit, - queryId, + queryKitSpec, (JoinDataSource) dataSource, querySegmentSpec, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -422,25 +397,18 @@ private static DataSourcePlan forLookup( } private static DataSourcePlan forQuery( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryDataSource dataSource, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, - final boolean broadcast, - @Nullable final QueryContext parentContext + final boolean broadcast ) { - final QueryDefinition subQueryDef = queryKit.makeQueryDefinition( - queryId, + final QueryDefinition subQueryDef = queryKitSpec.getQueryKit().makeQueryDefinition( + queryKitSpec, // Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the // outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong. dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY), - queryKit, - ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount * targetPartitionsPerWorker), - maxWorkerCount, - targetPartitionsPerWorker, + ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle()), minStageNumber ); @@ -455,27 +423,21 @@ private static DataSourcePlan forQuery( } private static DataSourcePlan forFilteredDataSource( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final FilteredDataSource dataSource, final QuerySegmentSpec querySegmentSpec, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) { final DataSourcePlan basePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, dataSource.getBase(), querySegmentSpec, null, null, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -497,28 +459,22 @@ private static DataSourcePlan forFilteredDataSource( * Build a plan for Unnest data source */ private static DataSourcePlan forUnnest( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final UnnestDataSource dataSource, final QuerySegmentSpec querySegmentSpec, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) { // Find the plan for base data source by recursing final DataSourcePlan basePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, dataSource.getBase(), querySegmentSpec, null, null, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -543,15 +499,12 @@ private static DataSourcePlan forUnnest( } private static DataSourcePlan forUnion( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final UnionDataSource unionDataSource, final QuerySegmentSpec querySegmentSpec, @Nullable DimFilter filter, @Nullable Set filterFields, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -559,22 +512,19 @@ private static DataSourcePlan forUnion( // This is done to prevent loss of generality since MSQ can plan any type of DataSource. List children = unionDataSource.getDataSources(); - final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder subqueryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); final List newChildren = new ArrayList<>(); final List inputSpecs = new ArrayList<>(); final IntSet broadcastInputs = new IntOpenHashSet(); for (DataSource child : children) { DataSourcePlan childDataSourcePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, child, querySegmentSpec, filter, filterFields, - maxWorkerCount, - targetPartitionsPerWorker, Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()), broadcast ); @@ -598,32 +548,26 @@ private static DataSourcePlan forUnion( * Build a plan for broadcast hash-join. */ private static DataSourcePlan forBroadcastHashJoin( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final QueryContext queryContext, final JoinDataSource dataSource, final QuerySegmentSpec querySegmentSpec, @Nullable final DimFilter filter, @Nullable final Set filterFields, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) { - final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); final DataSourceAnalysis analysis = dataSource.getAnalysis(); final DataSourcePlan basePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, analysis.getBaseDataSource(), querySegmentSpec, filter, filter == null ? null : DimFilterUtils.onlyBaseFields(filterFields, analysis), - maxWorkerCount, - targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), broadcast ); @@ -636,15 +580,12 @@ private static DataSourcePlan forBroadcastHashJoin( for (int i = 0; i < analysis.getPreJoinableClauses().size(); i++) { final PreJoinableClause clause = analysis.getPreJoinableClauses().get(i); final DataSourcePlan clausePlan = forDataSource( - queryKit, - queryId, + queryKitSpec, queryContext, clause.getDataSource(), new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY), null, // Don't push down query filters for right-hand side: needs some work to ensure it works properly. null, - maxWorkerCount, - targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), true // Always broadcast right-hand side of the join. ); @@ -674,12 +615,9 @@ private static DataSourcePlan forBroadcastHashJoin( * Build a plan for sort-merge join. */ private static DataSourcePlan forSortMergeJoin( - final QueryKit queryKit, - final String queryId, + final QueryKitSpec queryKitSpec, final JoinDataSource dataSource, final QuerySegmentSpec querySegmentSpec, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -692,20 +630,16 @@ private static DataSourcePlan forSortMergeJoin( SortMergeJoinFrameProcessorFactory.validateCondition(dataSource.getConditionAnalysis()) ); - final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder subQueryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); // Plan the left input. // We're confident that we can cast dataSource.getLeft() to QueryDataSource, because DruidJoinQueryRel creates // subqueries when the join algorithm is sortMerge. final DataSourcePlan leftPlan = forQuery( - queryKit, - queryId, + queryKitSpec, (QueryDataSource) dataSource.getLeft(), - maxWorkerCount, - targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), - false, - null + false ); leftPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); @@ -713,14 +647,10 @@ private static DataSourcePlan forSortMergeJoin( // We're confident that we can cast dataSource.getRight() to QueryDataSource, because DruidJoinQueryRel creates // subqueries when the join algorithm is sortMerge. final DataSourcePlan rightPlan = forQuery( - queryKit, - queryId, + queryKitSpec, (QueryDataSource) dataSource.getRight(), - maxWorkerCount, - targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), - false, - null + false ); rightPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll); @@ -729,7 +659,7 @@ private static DataSourcePlan forSortMergeJoin( ((StageInputSpec) Iterables.getOnlyElement(leftPlan.getInputSpecs())).getStageNumber() ); - final int hashPartitionCount = maxWorkerCount * targetPartitionsPerWorker; + final int hashPartitionCount = queryKitSpec.getNumPartitionsForShuffle(); final List leftPartitionKey = partitionKeys.get(0); leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), hashPartitionCount)); leftBuilder.signature(QueryKitUtils.sortableSignature(leftBuilder.getSignature(), leftPartitionKey)); @@ -768,7 +698,7 @@ private static DataSourcePlan forSortMergeJoin( Iterables.getOnlyElement(rightPlan.getInputSpecs()) ) ) - .maxWorkerCount(maxWorkerCount) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) .signature(joinSignatureBuilder.build()) .processorFactory( new SortMergeJoinFrameProcessorFactory( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java index 37f453f6c060..3129bbfacb9c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java @@ -41,12 +41,9 @@ public MultiQueryKit(final Map, QueryKit> toolKitMap) @Override public QueryDefinition makeQueryDefinition( - String queryId, + QueryKitSpec queryKitSpec, Query query, - QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, - int maxWorkerCount, - int targetPartitionsPerWorker, int minStageNumber ) { @@ -55,12 +52,9 @@ public QueryDefinition makeQueryDefinition( if (specificToolKit != null) { //noinspection unchecked return specificToolKit.makeQueryDefinition( - queryId, + queryKitSpec, query, - this, resultShuffleSpecFactory, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber ); } else { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java index 2bc0ad0725a8..118091ccbd49 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java @@ -30,25 +30,17 @@ public interface QueryKit> /** * Creates a {@link QueryDefinition} from a {@link Query}. * - * @param queryId query ID of the resulting {@link QueryDefinition} + * @param queryKitSpec collection of parameters necessary for planning {@link QueryDefinition} * @param query native query to translate - * @param toolKitForSubQueries kit that is used to translate native subqueries; i.e., - * {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}. * @param resultShuffleSpecFactory shuffle spec factory for the final output of this query. - * @param maxWorkerCount maximum number of workers: becomes - * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()} * @param minStageNumber lowest stage number to use for any generated stages. Useful if the resulting * {@link QueryDefinition} is going to be added to an existing * {@link org.apache.druid.msq.kernel.QueryDefinitionBuilder}. - * @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries */ QueryDefinition makeQueryDefinition( - String queryId, + QueryKitSpec queryKitSpec, QueryType query, - QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, - int maxWorkerCount, - int targetPartitionsPerWorker, int minStageNumber ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java new file mode 100644 index 000000000000..7cae4ed7d7bf --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import org.apache.druid.msq.input.InputSpec; +import org.apache.druid.msq.input.InputSpecs; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.query.Query; + +import java.util.List; + +/** + * Collection of parameters for {@link QueryKit#makeQueryDefinition}. + */ +public class QueryKitSpec +{ + private final QueryKit> queryKit; + private final String queryId; + private final int maxLeafWorkerCount; + private final int maxNonLeafWorkerCount; + private final int targetPartitionsPerWorker; + + /** + * @param queryKit kit that is used to translate native subqueries; i.e., + * {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}. + * @param queryId queryId of the resulting {@link QueryDefinition} + * @param maxLeafWorkerCount maximum number of workers for leaf stages: becomes + * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()} + * @param maxNonLeafWorkerCount maximum number of workers for non-leaf stages: becomes + * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()} + * @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries + */ + public QueryKitSpec( + QueryKit> queryKit, + String queryId, + int maxLeafWorkerCount, + int maxNonLeafWorkerCount, + int targetPartitionsPerWorker + ) + { + this.queryId = queryId; + this.queryKit = queryKit; + this.maxLeafWorkerCount = maxLeafWorkerCount; + this.maxNonLeafWorkerCount = maxNonLeafWorkerCount; + this.targetPartitionsPerWorker = targetPartitionsPerWorker; + } + + /** + * Instance of {@link QueryKit} for recursive calls. + */ + public QueryKit> getQueryKit() + { + return queryKit; + } + + /** + * Query ID to use when building {@link QueryDefinition}. + */ + public String getQueryId() + { + return queryId; + } + + /** + * Maximum worker count for a stage with the given inputs. Will use {@link #maxNonLeafWorkerCount} if there are + * any stage inputs, {@link #maxLeafWorkerCount} otherwise. + */ + public int getMaxWorkerCount(final List inputSpecs) + { + if (InputSpecs.getStageNumbers(inputSpecs).isEmpty()) { + return maxLeafWorkerCount; + } else { + return maxNonLeafWorkerCount; + } + } + + /** + * Maximum number of workers for non-leaf stages (where there are some stage inputs). + */ + public int getMaxNonLeafWorkerCount() + { + return maxNonLeafWorkerCount; + } + + /** + * Number of partitions to generate during a shuffle. + */ + public int getNumPartitionsForShuffle() + { + return maxNonLeafWorkerCount * targetPartitionsPerWorker; + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index b1af153fafde..02542f8e7366 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -35,7 +35,6 @@ import org.apache.druid.msq.kernel.ShuffleSpec; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.util.MultiStageQueryContext; -import org.apache.druid.query.Query; import org.apache.druid.query.operator.ColumnWithDirection; import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; import org.apache.druid.query.operator.NaiveSortOperatorFactory; @@ -63,12 +62,9 @@ public WindowOperatorQueryKit(ObjectMapper jsonMapper) @Override public QueryDefinition makeQueryDefinition( - String queryId, + QueryKitSpec queryKitSpec, WindowOperatorQuery originalQuery, - QueryKit> queryKit, ShuffleSpecFactory resultShuffleSpecFactory, - int maxWorkerCount, - int targetPartitionsPerWorker, int minStageNumber ) { @@ -90,22 +86,22 @@ public QueryDefinition makeQueryDefinition( log.info("Created operatorList with operator factories: [%s]", operatorList); final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( - queryKit, - queryId, + queryKitSpec, originalQuery.context(), originalQuery.getDataSource(), originalQuery.getQuerySegmentSpec(), originalQuery.getFilter(), null, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, false ); - ShuffleSpec nextShuffleSpec = - findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount * targetPartitionsPerWorker); - final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryId, dataSourcePlan, nextShuffleSpec); + ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow( + operatorList.get(0), + queryKitSpec.getNumPartitionsForShuffle() + ); + final QueryDefinitionBuilder queryDefBuilder = + makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec); final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); @@ -133,7 +129,7 @@ public QueryDefinition makeQueryDefinition( StageDefinition.builder(firstStageNumber) .inputs(new StageInputSpec(firstStageNumber - 1)) .signature(finalWindowStageRowSignature) - .maxWorkerCount(maxWorkerCount) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) .shuffleSpec(finalWindowStageShuffleSpec) .processorFactory(new WindowOperatorQueryFrameProcessorFactory( queryToRun, @@ -196,7 +192,7 @@ public QueryDefinition makeQueryDefinition( nextShuffleSpec = finalWindowStageShuffleSpec; } else { nextShuffleSpec = - findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount * targetPartitionsPerWorker); + findShuffleSpecForNextWindow(operatorList.get(i + 1), queryKitSpec.getNumPartitionsForShuffle()); if (nextShuffleSpec == null) { stageRowSignature = intermediateSignature; } else { @@ -233,7 +229,7 @@ public QueryDefinition makeQueryDefinition( StageDefinition.builder(firstStageNumber + i) .inputs(new StageInputSpec(firstStageNumber + i - 1)) .signature(stageRowSignature) - .maxWorkerCount(maxWorkerCount) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) .shuffleSpec(nextShuffleSpec) .processorFactory(new WindowOperatorQueryFrameProcessorFactory( queryToRun, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index 45a91a3d8870..db56bd02f742 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -34,12 +34,12 @@ import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.querykit.DataSourcePlan; import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.querykit.ShuffleSpecFactories; import org.apache.druid.msq.querykit.ShuffleSpecFactory; import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; import org.apache.druid.query.DimensionComparisonUtils; -import org.apache.druid.query.Query; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.having.AlwaysHavingSpec; @@ -66,28 +66,22 @@ public GroupByQueryKit(ObjectMapper jsonMapper) @Override public QueryDefinition makeQueryDefinition( - final String queryId, + final QueryKitSpec queryKitSpec, final GroupByQuery originalQuery, - final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber ) { validateQuery(originalQuery); - final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( - queryKit, - queryId, + queryKitSpec, originalQuery.context(), originalQuery.getDataSource(), originalQuery.getQuerySegmentSpec(), originalQuery.getFilter(), null, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, false ); @@ -144,7 +138,7 @@ public QueryDefinition makeQueryDefinition( shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() ? ShuffleSpecFactories.singlePartition() - : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount * targetPartitionsPerWorker); + : ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle()); if (doLimitOrOffset) { shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint); @@ -169,7 +163,10 @@ public QueryDefinition makeQueryDefinition( .broadcastInputs(dataSourcePlan.getBroadcastInputs()) .signature(intermediateSignature) .shuffleSpec(shuffleSpecFactoryPreAggregation.build(intermediateClusterBy, true)) - .maxWorkerCount(dataSourcePlan.isSingleWorker() ? 1 : maxWorkerCount) + .maxWorkerCount( + dataSourcePlan.isSingleWorker() + ? 1 + : queryKitSpec.getMaxWorkerCount(dataSourcePlan.getInputSpecs())) .processorFactory(new GroupByPreShuffleFrameProcessorFactory(queryToRun)) ); @@ -189,7 +186,7 @@ public QueryDefinition makeQueryDefinition( StageDefinition.builder(firstStageNumber + 1) .inputs(new StageInputSpec(firstStageNumber)) .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) + .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) .shuffleSpec( shuffleSpecFactoryPostAggregation != null ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) @@ -390,7 +387,10 @@ private static void validateQuery(final GroupByQuery query) for (final OrderByColumnSpec column : defaultLimitSpec.getColumns()) { final Optional type = resultSignature.getColumnType(column.getDimension()); - if (!type.isPresent() || !DimensionComparisonUtils.isNaturalComparator(type.get().getType(), column.getDimensionComparator())) { + if (!type.isPresent() || !DimensionComparisonUtils.isNaturalComparator( + type.get().getType(), + column.getDimensionComparator() + )) { throw new ISE( "Must use natural comparator for column [%s] of type [%s]", column.getDimension(), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index 051caeb0e718..8d23e289bb67 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -33,13 +33,13 @@ import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.querykit.DataSourcePlan; import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.querykit.ShuffleSpecFactories; import org.apache.druid.msq.querykit.ShuffleSpecFactory; import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory; import org.apache.druid.query.Order; import org.apache.druid.query.OrderBy; -import org.apache.druid.query.Query; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -86,26 +86,20 @@ public static RowSignature getAndValidateSignature(final ScanQuery scanQuery, fi // partition without a ClusterBy, we don't need to necessarily create it via the resultShuffleSpecFactory provided @Override public QueryDefinition makeQueryDefinition( - final String queryId, + final QueryKitSpec queryKitSpec, final ScanQuery originalQuery, - final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, - final int maxWorkerCount, - final int targetPartitionsPerWorker, final int minStageNumber ) { - final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryId); + final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder(queryKitSpec.getQueryId()); final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( - queryKit, - queryId, + queryKitSpec, originalQuery.context(), originalQuery.getDataSource(), originalQuery.getQuerySegmentSpec(), originalQuery.getFilter(), null, - maxWorkerCount, - targetPartitionsPerWorker, minStageNumber, false ); @@ -179,7 +173,10 @@ public QueryDefinition makeQueryDefinition( .broadcastInputs(dataSourcePlan.getBroadcastInputs()) .shuffleSpec(scanShuffleSpec) .signature(signatureToUse) - .maxWorkerCount(dataSourcePlan.isSingleWorker() ? 1 : maxWorkerCount) + .maxWorkerCount( + dataSourcePlan.isSingleWorker() + ? 1 + : queryKitSpec.getMaxWorkerCount(dataSourcePlan.getInputSpecs())) .processorFactory(new ScanQueryFrameProcessorFactory(queryToRun)) ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index cd20f24d244f..a7ec6054b566 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -60,7 +60,10 @@ import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher; import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; +import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.DruidNode; @@ -273,6 +276,23 @@ public ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec que return IndexerControllerContext.makeQueryKernelConfig(querySpec, new ControllerMemoryParameters(100_000_000)); } + @Override + public QueryKitSpec makeQueryKitSpec( + final QueryKit> queryKit, + final String queryId, + final MSQSpec querySpec, + final ControllerQueryKernelConfig queryKernelConfig + ) + { + return new QueryKitSpec( + queryKit, + queryId, + querySpec.getTuningConfig().getMaxNumWorkers(), + querySpec.getTuningConfig().getMaxNumWorkers(), + 1 + ); + } + @Override public void emitMetric(String metric, Number value) { @@ -341,10 +361,4 @@ public WorkerClient newWorkerClient() { return new MSQTestWorkerClient(inMemoryWorkers); } - - @Override - public int defaultTargetPartitionsPerWorker() - { - return 1; - } }