-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Limit pages size to a configurable limit #14994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
90564dc
1624db3
6cd5b36
1cf1aa0
be9b736
65cea5c
cb4101f
79a4eb0
6766b34
0def1aa
00e0829
2f0fba8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -145,7 +145,6 @@ | |
| import org.apache.druid.msq.input.table.DataSegmentWithLocation; | ||
| import org.apache.druid.msq.input.table.TableInputSpec; | ||
| import org.apache.druid.msq.input.table.TableInputSpecSlicer; | ||
| import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec; | ||
| import org.apache.druid.msq.kernel.QueryDefinition; | ||
| import org.apache.druid.msq.kernel.QueryDefinitionBuilder; | ||
| import org.apache.druid.msq.kernel.StageDefinition; | ||
|
|
@@ -1663,12 +1662,7 @@ private static QueryDefinition makeQueryDefinition( | |
| final ShuffleSpecFactory shuffleSpecFactory; | ||
|
|
||
| if (MSQControllerTask.isIngestion(querySpec)) { | ||
| shuffleSpecFactory = (clusterBy, aggregate) -> | ||
| new GlobalSortTargetSizeShuffleSpec( | ||
| clusterBy, | ||
| tuningConfig.getRowsPerSegment(), | ||
| aggregate | ||
| ); | ||
| shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(tuningConfig.getRowsPerSegment()); | ||
|
|
||
| if (!columnMappings.hasUniqueOutputColumnNames()) { | ||
| // We do not expect to hit this case in production, because the SQL validator checks that column names | ||
|
|
@@ -1693,8 +1687,9 @@ private static QueryDefinition makeQueryDefinition( | |
| shuffleSpecFactory = ShuffleSpecFactories.singlePartition(); | ||
| queryToPlan = querySpec.getQuery(); | ||
| } else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) { | ||
| // we add a final stage which generates one partition per worker. | ||
| shuffleSpecFactory = ShuffleSpecFactories.globalSortWithMaxPartitionCount(tuningConfig.getMaxNumWorkers()); | ||
| shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize( | ||
| MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might be missing something in somewhere else in this PR, but doesn't GlobalSortTargetSizeShuffleSpec enforce the limit on the total partition size summed across all workers? Since we create a new page for each worker parition combination, would the limit be enforced?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GlobalSortTargetSizeShuffleSpec enforces a limit on partition size globally yes. So there can be 2 cases:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added a new testcase |
||
| ); | ||
| queryToPlan = querySpec.getQuery(); | ||
| } else { | ||
| throw new ISE("Unsupported destination [%s]", querySpec.getDestination()); | ||
|
|
@@ -1772,27 +1767,29 @@ private static QueryDefinition makeQueryDefinition( | |
| return queryDef; | ||
| } else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) { | ||
|
|
||
| // attaching new query results stage always. | ||
| // attaching new query results stage if the final stage does sort during shuffle so that results are ordered. | ||
| StageDefinition finalShuffleStageDef = queryDef.getFinalStageDefinition(); | ||
| final QueryDefinitionBuilder builder = QueryDefinition.builder(); | ||
| for (final StageDefinition stageDef : queryDef.getStageDefinitions()) { | ||
| builder.add(StageDefinition.builder(stageDef)); | ||
| if (finalShuffleStageDef.doesSortDuringShuffle()) { | ||
| final QueryDefinitionBuilder builder = QueryDefinition.builder(); | ||
| builder.addAll(queryDef); | ||
| builder.add(StageDefinition.builder(queryDef.getNextStageNumber()) | ||
| .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) | ||
| .maxWorkerCount(tuningConfig.getMaxNumWorkers()) | ||
| .signature(finalShuffleStageDef.getSignature()) | ||
| .shuffleSpec(null) | ||
| .processorFactory(new QueryResultFrameProcessorFactory()) | ||
| ); | ||
| return builder.build(); | ||
| } else { | ||
| return queryDef; | ||
| } | ||
|
|
||
| builder.add(StageDefinition.builder(queryDef.getNextStageNumber()) | ||
| .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) | ||
| .maxWorkerCount(tuningConfig.getMaxNumWorkers()) | ||
| .signature(finalShuffleStageDef.getSignature()) | ||
| .shuffleSpec(null) | ||
| .processorFactory(new QueryResultFrameProcessorFactory()) | ||
| ); | ||
|
|
||
| return builder.build(); | ||
| } else { | ||
| throw new ISE("Unsupported destination [%s]", querySpec.getDestination()); | ||
| } | ||
| } | ||
|
|
||
|
|
||
|
|
||
| private static DataSchema generateDataSchema( | ||
| MSQSpec querySpec, | ||
| RowSignature querySignature, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.