From a6c9041d5fd65075f7bbb46d0af6deee100a67e3 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 12 Sep 2024 09:08:38 -0700 Subject: [PATCH] Add "targetPartitionsPerWorker" setting for MSQ. As we move towards multi-threaded MSQ workers, it helps for parallelism to generate more than one partition per worker. That way, we can fully utilize all worker threads throughout all stages. The default value is the number of processing threads. Currently, this is hard-coded to 1 for peons, but that is expected to change in the future. --- .../druid/msq/exec/ControllerContext.java | 8 +++++ .../apache/druid/msq/exec/ControllerImpl.java | 21 +++++++++----- .../indexing/IndexerControllerContext.java | 11 ++++++- .../druid/msq/querykit/DataSourcePlan.java | 29 +++++++++++++++++-- .../druid/msq/querykit/MultiQueryKit.java | 2 ++ .../apache/druid/msq/querykit/QueryKit.java | 2 ++ .../msq/querykit/WindowOperatorQueryKit.java | 12 +++++--- .../msq/querykit/groupby/GroupByQueryKit.java | 9 ++++-- .../druid/msq/querykit/scan/ScanQueryKit.java | 2 ++ .../msq/util/MultiStageQueryContext.java | 14 +++++++++ .../msq/test/MSQTestControllerContext.java | 6 ++++ 11 files changed, 98 insertions(+), 18 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 40b114511c28..bc449d141203 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -30,6 +30,8 @@ import org.apache.druid.msq.input.table.TableInputSpec; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; +import org.apache.druid.msq.querykit.QueryKit; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.server.DruidNode; /** @@ -100,4 +102,10 @@ WorkerManager newWorkerManager( * Client for communicating with workers. */ WorkerClient newWorkerClient(); + + /** + * Default target partitions per worker for {@link QueryKit#makeQueryDefinition}. Can be overridden using + * {@link MultiStageQueryContext#CTX_TARGET_PARTITIONS_PER_WORKER}. + */ + int defaultTargetPartitionsPerWorker(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 1ab7460156dc..4b63d85cda7b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -563,11 +563,16 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) this.netClient = new ExceptionWrappingWorkerClient(context.newWorkerClient()); closer.register(netClient); + final QueryContext queryContext = querySpec.getQuery().context(); final QueryDefinition queryDef = makeQueryDefinition( queryId(), makeQueryControllerToolKit(), querySpec, context.jsonMapper(), + MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault( + queryContext, + context.defaultTargetPartitionsPerWorker() + ), resultsContext ); @@ -612,7 +617,7 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) ); } - final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(querySpec.getQuery().context()); + final long maxParseExceptions = MultiStageQueryContext.getMaxParseExceptions(queryContext); this.faultsExceededChecker = new FaultsExceededChecker( ImmutableMap.of(CannotParseExternalDataFault.CODE, maxParseExceptions) ); @@ -624,7 +629,7 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) stageDefinition.getId().getStageNumber(), finalizeClusterStatisticsMergeMode( stageDefinition, - MultiStageQueryContext.getClusterStatisticsMergeMode(querySpec.getQuery().context()) + MultiStageQueryContext.getClusterStatisticsMergeMode(queryContext) ) ) ); @@ -1718,17 +1723,18 @@ private static QueryDefinition makeQueryDefinition( @SuppressWarnings("rawtypes") final QueryKit toolKit, final MSQSpec querySpec, final ObjectMapper jsonMapper, + final int targetPartitionsPerWorker, final ResultsContext resultsContext ) { final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); final ColumnMappings columnMappings = querySpec.getColumnMappings(); final Query queryToPlan; - final ShuffleSpecFactory shuffleSpecFactory; + final ShuffleSpecFactory resultShuffleSpecFactory; if (MSQControllerTask.isIngestion(querySpec)) { - shuffleSpecFactory = querySpec.getDestination() - .getShuffleSpecFactory(tuningConfig.getRowsPerSegment()); + resultShuffleSpecFactory = querySpec.getDestination() + .getShuffleSpecFactory(tuningConfig.getRowsPerSegment()); if (!columnMappings.hasUniqueOutputColumnNames()) { // We do not expect to hit this case in production, because the SQL validator checks that column names @@ -1752,7 +1758,7 @@ private static QueryDefinition makeQueryDefinition( queryToPlan = querySpec.getQuery(); } } else { - shuffleSpecFactory = + resultShuffleSpecFactory = querySpec.getDestination() .getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())); queryToPlan = querySpec.getQuery(); @@ -1765,8 +1771,9 @@ private static QueryDefinition makeQueryDefinition( queryId, queryToPlan, toolKit, - shuffleSpecFactory, + resultShuffleSpecFactory, tuningConfig.getMaxNumWorkers(), + targetPartitionsPerWorker, 0 ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 1037aa6c2af0..e60f1c5c9622 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -74,6 +74,7 @@ public class IndexerControllerContext implements ControllerContext private final ServiceClientFactory clientFactory; private final OverlordClient overlordClient; private final ServiceMetricEvent.Builder metricBuilder; + private final MemoryIntrospector memoryIntrospector; public IndexerControllerContext( final MSQControllerTask task, @@ -89,6 +90,7 @@ public IndexerControllerContext( this.clientFactory = clientFactory; this.overlordClient = overlordClient; this.metricBuilder = new ServiceMetricEvent.Builder(); + this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class); IndexTaskUtils.setTaskDimensions(metricBuilder, task); } @@ -98,7 +100,6 @@ public ControllerQueryKernelConfig queryKernelConfig( final QueryDefinition queryDef ) { - final MemoryIntrospector memoryIntrospector = injector.getInstance(MemoryIntrospector.class); final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance( memoryIntrospector, @@ -200,6 +201,14 @@ public WorkerManager newWorkerManager( ); } + @Override + public int defaultTargetPartitionsPerWorker() + { + // Assume tasks are symmetric: workers have the same number of processors available as a controller. + // Create one partition per processor per task, for maximum parallelism. + return memoryIntrospector.numProcessorsInJvm(); + } + /** * Helper method for {@link #queryKernelConfig(MSQSpec, QueryDefinition)}. Also used in tests. */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index e6ddb4d723dc..15fe6263ed83 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -135,6 +135,7 @@ public class DataSourcePlan * @param maxWorkerCount maximum number of workers for subqueries * @param minStageNumber starting stage number for subqueries * @param broadcast whether the plan should broadcast data for this datasource + * @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries */ @SuppressWarnings("rawtypes") public static DataSourcePlan forDataSource( @@ -146,6 +147,7 @@ public static DataSourcePlan forDataSource( @Nullable DimFilter filter, @Nullable Set filterFields, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -186,6 +188,7 @@ public static DataSourcePlan forDataSource( (FilteredDataSource) dataSource, querySegmentSpec, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -197,6 +200,7 @@ public static DataSourcePlan forDataSource( (UnnestDataSource) dataSource, querySegmentSpec, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -207,6 +211,7 @@ public static DataSourcePlan forDataSource( queryId, (QueryDataSource) dataSource, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast, queryContext @@ -221,6 +226,7 @@ public static DataSourcePlan forDataSource( filter, filterFields, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -242,6 +248,7 @@ public static DataSourcePlan forDataSource( filter, filterFields, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -253,6 +260,7 @@ public static DataSourcePlan forDataSource( (JoinDataSource) dataSource, querySegmentSpec, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -418,6 +426,7 @@ private static DataSourcePlan forQuery( final String queryId, final QueryDataSource dataSource, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast, @Nullable final QueryContext parentContext @@ -429,8 +438,9 @@ private static DataSourcePlan forQuery( // outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong. dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY), queryKit, - ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount), + ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount * targetPartitionsPerWorker), maxWorkerCount, + targetPartitionsPerWorker, minStageNumber ); @@ -451,6 +461,7 @@ private static DataSourcePlan forFilteredDataSource( final FilteredDataSource dataSource, final QuerySegmentSpec querySegmentSpec, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -464,6 +475,7 @@ private static DataSourcePlan forFilteredDataSource( null, null, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -491,6 +503,7 @@ private static DataSourcePlan forUnnest( final UnnestDataSource dataSource, final QuerySegmentSpec querySegmentSpec, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -505,6 +518,7 @@ private static DataSourcePlan forUnnest( null, null, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, broadcast ); @@ -537,6 +551,7 @@ private static DataSourcePlan forUnion( @Nullable DimFilter filter, @Nullable Set filterFields, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -559,6 +574,7 @@ private static DataSourcePlan forUnion( filter, filterFields, maxWorkerCount, + targetPartitionsPerWorker, Math.max(minStageNumber, subqueryDefBuilder.getNextStageNumber()), broadcast ); @@ -590,6 +606,7 @@ private static DataSourcePlan forBroadcastHashJoin( @Nullable final DimFilter filter, @Nullable final Set filterFields, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -606,6 +623,7 @@ private static DataSourcePlan forBroadcastHashJoin( filter, filter == null ? null : DimFilterUtils.onlyBaseFields(filterFields, analysis), maxWorkerCount, + targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), broadcast ); @@ -626,6 +644,7 @@ private static DataSourcePlan forBroadcastHashJoin( null, // Don't push down query filters for right-hand side: needs some work to ensure it works properly. null, maxWorkerCount, + targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), true // Always broadcast right-hand side of the join. ); @@ -660,6 +679,7 @@ private static DataSourcePlan forSortMergeJoin( final JoinDataSource dataSource, final QuerySegmentSpec querySegmentSpec, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber, final boolean broadcast ) @@ -682,6 +702,7 @@ private static DataSourcePlan forSortMergeJoin( queryId, (QueryDataSource) dataSource.getLeft(), maxWorkerCount, + targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), false, null @@ -696,6 +717,7 @@ private static DataSourcePlan forSortMergeJoin( queryId, (QueryDataSource) dataSource.getRight(), maxWorkerCount, + targetPartitionsPerWorker, Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()), false, null @@ -707,8 +729,9 @@ private static DataSourcePlan forSortMergeJoin( ((StageInputSpec) Iterables.getOnlyElement(leftPlan.getInputSpecs())).getStageNumber() ); + final int hashPartitionCount = maxWorkerCount * targetPartitionsPerWorker; final List leftPartitionKey = partitionKeys.get(0); - leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), maxWorkerCount)); + leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), hashPartitionCount)); leftBuilder.signature(QueryKitUtils.sortableSignature(leftBuilder.getSignature(), leftPartitionKey)); // Build up the right stage. @@ -717,7 +740,7 @@ private static DataSourcePlan forSortMergeJoin( ); final List rightPartitionKey = partitionKeys.get(1); - rightBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(rightPartitionKey, 0), maxWorkerCount)); + rightBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(rightPartitionKey, 0), hashPartitionCount)); rightBuilder.signature(QueryKitUtils.sortableSignature(rightBuilder.getSignature(), rightPartitionKey)); // Compute join signature. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java index a795f6496053..37f453f6c060 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/MultiQueryKit.java @@ -46,6 +46,7 @@ public QueryDefinition makeQueryDefinition( QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, + int targetPartitionsPerWorker, int minStageNumber ) { @@ -59,6 +60,7 @@ public QueryDefinition makeQueryDefinition( this, resultShuffleSpecFactory, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber ); } else { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java index b259022bba5b..2bc0ad0725a8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKit.java @@ -40,6 +40,7 @@ public interface QueryKit> * @param minStageNumber lowest stage number to use for any generated stages. Useful if the resulting * {@link QueryDefinition} is going to be added to an existing * {@link org.apache.druid.msq.kernel.QueryDefinitionBuilder}. + * @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries */ QueryDefinition makeQueryDefinition( String queryId, @@ -47,6 +48,7 @@ QueryDefinition makeQueryDefinition( QueryKit> toolKitForSubQueries, ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, + int targetPartitionsPerWorker, int minStageNumber ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index b3686359d2a4..b1af153fafde 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -68,6 +68,7 @@ public QueryDefinition makeQueryDefinition( QueryKit> queryKit, ShuffleSpecFactory resultShuffleSpecFactory, int maxWorkerCount, + int targetPartitionsPerWorker, int minStageNumber ) { @@ -97,11 +98,13 @@ public QueryDefinition makeQueryDefinition( originalQuery.getFilter(), null, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, false ); - ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount); + ShuffleSpec nextShuffleSpec = + findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount * targetPartitionsPerWorker); final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryId, dataSourcePlan, nextShuffleSpec); final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); @@ -192,7 +195,8 @@ public QueryDefinition makeQueryDefinition( stageRowSignature = finalWindowStageRowSignature; nextShuffleSpec = finalWindowStageShuffleSpec; } else { - nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount); + nextShuffleSpec = + findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount * targetPartitionsPerWorker); if (nextShuffleSpec == null) { stageRowSignature = intermediateSignature; } else { @@ -285,7 +289,7 @@ private List> getOperatorListFromQuery(WindowOperatorQuery return operatorList; } - private ShuffleSpec findShuffleSpecForNextWindow(List operatorFactories, int maxWorkerCount) + private ShuffleSpec findShuffleSpecForNextWindow(List operatorFactories, int partitionCount) { NaivePartitioningOperatorFactory partition = null; NaiveSortOperatorFactory sort = null; @@ -325,7 +329,7 @@ private ShuffleSpec findShuffleSpecForNextWindow(List operatorF keyColsOfWindow.add(kc); } - return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), maxWorkerCount); + return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), partitionCount); } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index 7e4ebf5e7fab..45a91a3d8870 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -71,6 +71,7 @@ public QueryDefinition makeQueryDefinition( final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber ) { @@ -86,6 +87,7 @@ public QueryDefinition makeQueryDefinition( originalQuery.getFilter(), null, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, false ); @@ -139,9 +141,10 @@ public QueryDefinition makeQueryDefinition( // __time in such queries is generated using either an aggregator (e.g. sum(metric) as __time) or using a // post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time) // For example: INSERT INTO foo SELECT COUNT(*), TIMESTAMP '2000-01-01' AS __time FROM bar PARTITIONED BY DAY - shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() - ? ShuffleSpecFactories.singlePartition() - : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount); + shuffleSpecFactoryPreAggregation = + intermediateClusterBy.isEmpty() + ? ShuffleSpecFactories.singlePartition() + : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount * targetPartitionsPerWorker); if (doLimitOrOffset) { shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index f4f50106e813..051caeb0e718 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -91,6 +91,7 @@ public QueryDefinition makeQueryDefinition( final QueryKit> queryKit, final ShuffleSpecFactory resultShuffleSpecFactory, final int maxWorkerCount, + final int targetPartitionsPerWorker, final int minStageNumber ) { @@ -104,6 +105,7 @@ public QueryDefinition makeQueryDefinition( originalQuery.getFilter(), null, maxWorkerCount, + targetPartitionsPerWorker, minStageNumber, false ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index ed6a7c0e7b9b..63601c907a24 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -190,6 +190,12 @@ public class MultiStageQueryContext public static final String CTX_SKIP_TYPE_VERIFICATION = "skipTypeVerification"; + /** + * Number of partitions to target per worker when creating shuffle specs that involve specific numbers of + * partitions. This helps us utilize more parallelism when workers are multi-threaded. + */ + public static final String CTX_TARGET_PARTITIONS_PER_WORKER = "targetPartitionsPerWorker"; + private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL); public static String getMSQMode(final QueryContext queryContext) @@ -380,6 +386,14 @@ public static ArrayIngestMode getArrayIngestMode(final QueryContext queryContext return queryContext.getEnum(CTX_ARRAY_INGEST_MODE, ArrayIngestMode.class, DEFAULT_ARRAY_INGEST_MODE); } + public static int getTargetPartitionsPerWorkerWithDefault( + final QueryContext queryContext, + final int defaultValue + ) + { + return queryContext.getInt(CTX_TARGET_PARTITIONS_PER_WORKER, defaultValue); + } + /** * See {@link #CTX_INCLUDE_ALL_COUNTERS}. */ diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index e65104302032..3034be399849 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -342,4 +342,10 @@ public WorkerClient newWorkerClient() { return new MSQTestWorkerClient(inMemoryWorkers); } + + @Override + public int defaultTargetPartitionsPerWorker() + { + return 1; + } }