Fixes a bug when running queries with a limit clause#16643
Fixes a bug when running queries with a limit clause#16643LakshSingla merged 15 commits intoapache:masterfrom
Conversation
LakshSingla
left a comment
There was a problem hiding this comment.
Thanks for the patch. The finalShuffleSpec must also be set in the case of ingest queries. Otherwise, MSQ will disregard the rowsPerSegment parameter, if we have a LIMIT clause.
I see, the case you mean is with ALL partitioning and LIMIT, should create more than 1 partitions. I think this is an existing bug. I have included this case as well. Thanks for the catch! |
|
|
||
| @MethodSource("data") | ||
| @ParameterizedTest(name = "{index}:with context {0}") | ||
| public void testInsertOnFoo1WithLimit(String contextName, Map<String, Object> context) |
Check notice
Code scanning / CodeQL
Useless parameter
| ? ShuffleSpecFactories.singlePartition() | ||
| : ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount); | ||
| shuffleSpecFactoryPostAggregation = doLimitOrOffset | ||
| shuffleSpecFactoryPostAggregation = doLimitOrOffset && !needsFinalShuffling |
There was a problem hiding this comment.
Same reason as on the other line - Consider a query like the following where LIMIT is not the final stage (example query, assuming the inner limit is not in the final stage):
SELECT col1 FROM (SELECT col1 FROM foo GROUP BY col1 LIMIT 1) LIMIT 20If the group by returns 10 partitions, the limit would work on them individually, which is not desirable. I think this change isn't required as we are always setting up limit at the end.
Can you also add a case with such a query where the limit isn't a final stage?
There was a problem hiding this comment.
Since it is a subquery, the needsFinalShuffling should always be false here, so I believe that case should still work.
There was a problem hiding this comment.
My example was incorrect, but I am still doubtful. The only reason that this change seems incorrect is what happens if doLimitOrOffset=true but the factory doesn't generate a single partition.
for example: EXPORT ... SELECT col1 FROM foo GROUP BY col1 LIMIT 1
In that case, the LIMIT wouldn't be correct
Limit processor must always receive a single partition, and having ShuffleSpecFactories.singlePartition ensures that. The earlier code ensured that. The new code doesn't ensure that, and it can at best have the same result as the old code, or be incorrect (as is my guess). Alternatively, in which case will this change be useful over the original code.
| final ShuffleSpec finalShuffleSpec = needsFinalShuffling ? | ||
| shuffleSpecFactoryPreAggregation.build(resultClusterBy, true) : | ||
| null; |
There was a problem hiding this comment.
I wonder if we can relax this constraint even when we don't have a final shuffle. In intermediate queries, having more partitions will improve the parallelization. Why do we not partition further in case of LIMIT processor?
| // 2. There is an offset which means everything gets funneled into a single partition hence we use MaxCountShuffleSpec | ||
| if (queryToRun.getOrderBys().isEmpty() && hasLimitOrOffset) { | ||
| // 3. The destination does not require shuffling after the limit stage to get outputs of a specific size. | ||
| if (queryToRun.getOrderBys().isEmpty() && hasLimitOrOffset && !needsFinalShuffling) { |
There was a problem hiding this comment.
Why is this change required?
There was a problem hiding this comment.
If this is not checked, it would short circuit to using the MixShuffleSpec. We need to add the boost column, create the clusterBy etc for the scan stage, if we intend to have a final shuffle, so this check should be required.
|
I have refactored the code in the QueryKits and added additional tests to address the comments. |
|
|
||
| @MethodSource("data") | ||
| @ParameterizedTest(name = "{index}:with context {0}") | ||
| public void testReplaceOnFoo1WithLimit(String contextName, Map<String, Object> context) |
Check notice
Code scanning / CodeQL
Useless parameter
| .signature(signatureToUse) | ||
| .maxWorkerCount(1) | ||
| .shuffleSpec(null) // no shuffling should be required after a limit processor. | ||
| .shuffleSpec(finalShuffleSpec) // Apply the final shuffling after limit spec. |
There was a problem hiding this comment.
This seems like a lot of shuffling. Is there any way we can avoid reshuffling the data by the same cluster by, and just repartition? Perhaps not without any supersorter changes, but I wanted to confirm.
There was a problem hiding this comment.
Well maybe we can preserve the optimisation that we had earlier - if there's no orderBy and if doLimitOrOffset == true, we don't need to partition boost the intermediate shuffleSpec.
Actually, we don't need to partitionBoost the intermediate shuffle spec in any case (i.e. the shuffleSpec for the scan stage if there's a limit present) - Since it's all going into a single partition anyway, the partitionBoost won't have any use.
i.e. If there's a limit present, only the final stage should have partition boosting.
There was a problem hiding this comment.
The initial ScanFP is the one who increments the boost column, if we do not apply the boost column at that stage, the limit processor output would have boost columns of 0, which can't be split.
I guess additional changes would be needed to allow boosting to work with LimitProcessors before this optimization can be made.
LakshSingla
left a comment
There was a problem hiding this comment.
Final comments after the last change. The patch looks good now.
| .signature(signatureToUse) | ||
| .maxWorkerCount(1) | ||
| .shuffleSpec(null) // no shuffling should be required after a limit processor. | ||
| .shuffleSpec(finalShuffleSpec) // Apply the final shuffling after limit spec. |
There was a problem hiding this comment.
Well maybe we can preserve the optimisation that we had earlier - if there's no orderBy and if doLimitOrOffset == true, we don't need to partition boost the intermediate shuffleSpec.
Actually, we don't need to partitionBoost the intermediate shuffle spec in any case (i.e. the shuffleSpec for the scan stage if there's a limit present) - Since it's all going into a single partition anyway, the partitionBoost won't have any use.
i.e. If there's a limit present, only the final stage should have partition boosting.
LakshSingla
left a comment
There was a problem hiding this comment.
LGTM, thanks for the patch!
cryptoe
left a comment
There was a problem hiding this comment.
@adarshsanjeev Please update the description with before and after changes to the number of stages and what kind of local testing is done.
| scanShuffleSpec = finalShuffleSpec; | ||
| } else { | ||
| // If there is a limit spec, check if there are any non-boost columns to sort in. | ||
| boolean requiresSort = clusterByColumns.stream() |
There was a problem hiding this comment.
Do we need to see if we are sorting on the non-boost column? Isn't that automatically added by the ScanQueryKit et al? Maybe we can simplify the condition by checking if the orderBy is non-empty (before adding any boosting)
There was a problem hiding this comment.
We have a if-branch for window functions above this bit of code, which adds its own cluster bys, which are not dependent on the orderBy.
| // Note that we still need the boost column to be present in the row signature, since the limit stage would | ||
| // need it to be populated to do its own shuffling later. |
There was a problem hiding this comment.
I was under the impression that the limit factory couldn't partition boost. Am I mistaken?
There was a problem hiding this comment.
otoh, if there isn't any partition boosting, then even the original code would have run into similar problem if there wasn't a cluster key right - too large partitions
There was a problem hiding this comment.
The limit factory does not increment the partition boosting at regular intervals. The value would be all 0 if it was not added to the row signature at the scan stage.
even the original code would have run into similar problem
The original code would have mix shuffle speced. It would always make a single partition, so yes, it should have been too large.
Add a shuffling based on the resultShuffleSpecFactory after a limit processor depending on the query destination. LimitFrameProcessors currently do not update the partition boosting column, so we also add the boost column to the previous stage, if one is required.
When creating the queryDefinition and calculating shuffleSpecs, we defaulted to a shuffling everything into a single partition if we required a limit(regardless of the resultShuffleSpecFactory passed to the queryKit). This was followed by the limit stage having no shuffling (as it would always create a single output partition).
This logic is correct, as long as we do not require any specific partitioning for the final output, since the limit would be the final MSQ stage(this would be the case for instance, in the case of writing SELECT query results into a query report).
However, for query types which expect partitions of a specific size, this causes a bug. For example, an async query which applies a LIMIT of 1000, with a context parameter
rowsPerPage:100. This would expect the limit processor to create a 10 segments of size 100 each. However, the limit processor would still create a single segment as an output. This causes issues with certain queries which would expect multiple partitions.This issue is present with all
SELECTqueries except ones which write to the query report.REPLACEorINSERTqueries which have an ALL partitioning are also affected (since we do not allow LIMIT along with other partitions).Even worse, we still sometimes expect these partitions, and this causes an exception when the subsequent stage tries to read them.
This PR aims to correct this by adding a shuffling based on the resultShuffleSpecFactory after a limit processor depending on the query destination. LimitFrameProcessors currently do not update the partition boosting column, so we also add the boost column to the previous stage, if one is required.
Behaviour changes
INSERT + LIMIT with PARTITIONED BY ALL
Before: Only the scan stage would have a shuffle spec (mix). Would create a single segment.
rowsPerSegmentwould not be respected.After: Scan stage has mix shuffle, limit stage has sort shuffle spec.

rowsPerSegmentparameter of 1000 is respected. If we have a clustering, the first scan stage is a sort shuffle spec instead.INSERT + LIMIT with PARTITIONED BY DAY (if we supported it)
Before: Only the scan stage would have a shuffle spec (mix, this would ignore day partitioning). Would create a single segment.

rowsPerSegmentwould not be respected.After: Scan stage has mix shuffle, limit stage has sort shuffle spec.
rowsPerSegmentparameter of 1000 is respected. (probably)SELECT * FROM foo LIMIT 100 -> taskReport
Before: Works as expected.
After: Works as expected.
SELECT * FROM foo LIMIT 100 -> durableStorage (async queries)

Before: Only the scan stage would have a shuffle spec (mix). Would create a single segment.
rowsPerPagewould not be respected.After: The scan stage has a mix shuffle spec. The limit stage has a sort with target size shuffle, determined by
rowsPerPage. We have an additional selectResults stageSELECT * FROM foo LIMIT 100 ORDER BY dim1 -> durableStorage (async queries)



Before: Only the scan stage would have a shuffle spec (mix). Would create a single segment.
rowsPerPagewould not be respected.After: The scan stage has a sort shuffle spec with target count of worker count. The limit stage has a sort with target size shuffle, determined by
rowsPerPage. We have an additional selectResults stage.SELECT * FROM foo LIMIT 100 -> export
Before: Only the scan stage would have a shuffle spec (mix). Would create a single segment.
rowsPerPagewould not be respected.After: The scan stage has a mix shuffle spec. The limit stage has a sort with target size shuffle, determined by
rowsPerPage.SELECT * FROM (SELECT * FROM foo ORDER BY dim1 LIMIT 10000) ORDER BY dim2 LIMIT 5000 with 1 worker
Before: Works as expected.
After: Works as expected.
SELECT * FROM (SELECT * FROM foo ORDER BY dim1 LIMIT 10000) ORDER BY dim2 LIMIT 5000 with 2 worker
Before: Inner limit produces single partition on one of the workers, other workers produce nothing on the limit. The next stage throws an exception when another worker tries to read one of these missing partitions.
After: Works as expected. The scan stages have shuffles with

target number of partitions=1(to handle the sorting), the inner limit stage has a sort withtarget number of partitions=3(for worker count), and the final limit has sort withtargetSize = 1000 (rowsPerPage). We have a selectResults stage since we are doing a sort in the previous stage.INSERT + Select dim1, count(*) GROUP BY 1 LIMIT 5000 PARTITIONED BY ALL with rowsPerSegment:1000


Before: The group by stages have globalSort and null shuffle specs. The limit stage creates a single partition.
After: The limit stage has sort shuffle spec into targetSize.
rowsPerSegmentparameter of 1000 is respected.GroupBy -> taskReport

Before: Works as expected.
After: Works as expected.
GroupBy -> durableStorage (asyncQuery)

Before: The group by stages have globalSort and null shuffle specs. The limit stage creates a single partition. Would create a single page.
rowsPerPagewould not be respected.After: The limit stage has a sort shuffle spec into targetSize. We always add an additional stage
selectResults.rowsPerPageparameter of 50 is respected.GroupBy -> export

Before: The group by stages have globalSort and null shuffle specs. The limit stage creates a single partition. Would create a single page.
rowsPerPagewould not be respected.After: The limit stage has a sort shuffle spec into targetSize.
rowsPerPageparameter of 50 is respected.This PR has: