diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java index 8f5770378c55..24f6dfffe27a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java @@ -19,6 +19,7 @@ package org.apache.druid.msq.querykit; +import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec; import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec; import org.apache.druid.msq.kernel.MixShuffleSpec; @@ -69,11 +70,21 @@ public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int parti } /** - * Factory that produces globally sorted partitions of a target size. + * Factory that produces globally sorted partitions of a target size, using the {@link ClusterBy} to partition + * rows across partitions. + * + * Produces {@link MixShuffleSpec}, ignoring the target size, if the provided {@link ClusterBy} is empty. */ public static ShuffleSpecFactory getGlobalSortWithTargetSize(int targetSize) { - return (clusterBy, aggregate) -> - new GlobalSortTargetSizeShuffleSpec(clusterBy, targetSize, aggregate); + return (clusterBy, aggregate) -> { + if (clusterBy.isEmpty()) { + // Cannot partition or sort meaningfully because there are no cluster-by keys. Generate a MixShuffleSpec + // so everything goes into a single partition. + return MixShuffleSpec.instance(); + } else { + return new GlobalSortTargetSizeShuffleSpec(clusterBy, targetSize, aggregate); + } + }; } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index 801faac44aff..6662f23d9b1c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -1606,6 +1606,38 @@ public void testInsertOnFoo1WithLimit(String contextName, Map co .verifyResults(); } + @MethodSource("data") + @ParameterizedTest(name = "{index}:with context {0}") + public void testInsertOnFoo1NoDimensionsWithLimit(String contextName, Map context) + { + Map queryContext = ImmutableMap.builder() + .putAll(context) + .put(MultiStageQueryContext.CTX_ROWS_PER_SEGMENT, 2) + .build(); + + List expectedRows = ImmutableList.of(new Object[]{DateTimes.utc(0L).getMillis(), 5L}); + + RowSignature rowSignature = RowSignature.builder() + .addTimeColumn() + .add("cnt", ColumnType.LONG) + .build(); + + testIngestQuery() + .setSql("insert into foo1 select count(*) cnt from foo where dim1 != '' limit 4 partitioned by all") + .setExpectedDataSource("foo1") + .setQueryContext(queryContext) + .setExpectedRowSignature(rowSignature) + .setExpectedSegments(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0))) + .setExpectedResultRows(expectedRows) + .setExpectedMSQSegmentReport( + new MSQSegmentReport( + NumberedShardSpec.class.getSimpleName(), + "Using NumberedShardSpec to generate segments since the query is inserting rows." + ) + ) + .verifyResults(); + } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testInsertOnRestricted(String contextName, Map context)