Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codestyle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<Or>
<Class name="org.apache.druid.jackson.DefaultTrueJsonIncludeFilter"/>
<Class name="org.apache.druid.java.util.common.StringEncodingDefaultUTF16LEJsonIncludeFilter"/>
<Class name="org.apache.druid.msq.kernel.LimitHintJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanRowsLimitJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$ScanTimeOrderJsonIncludeFilter"/>
<Class name="org.apache.druid.query.scan.ScanQuery$BatchSizeJsonIncludeFilter"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ public void globalSort(
makeSuperSorterIntermediateOutputChannelFactory(sorterTmpDir),
memoryParameters.getSuperSorterMaxActiveProcessors(),
memoryParameters.getSuperSorterMaxChannelsPerProcessor(),
-1,
stageDefinition.getShuffleSpec().limitHint(),
cancellationId,
counterTracker.sortProgress(),
removeNullBytes
Expand Down Expand Up @@ -845,7 +845,7 @@ public OutputChannel openNilChannel(int expectedZero)
makeSuperSorterIntermediateOutputChannelFactory(sorterTmpDir),
1,
2,
-1,
ShuffleSpec.UNLIMITED,
cancellationId,

// Tracker is not actually tracked, since it doesn't quite fit into the way we report counters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,20 @@ public class GlobalSortMaxCountShuffleSpec implements GlobalSortShuffleSpec
private final ClusterBy clusterBy;
private final int maxPartitions;
private final boolean aggregate;
private final long limitHint;

@JsonCreator
public GlobalSortMaxCountShuffleSpec(
@JsonProperty("clusterBy") final ClusterBy clusterBy,
@JsonProperty("partitions") final int maxPartitions,
@JsonProperty("aggregate") final boolean aggregate
@JsonProperty("aggregate") final boolean aggregate,
@JsonProperty("limitHint") final Long limitHint
)
{
this.clusterBy = Preconditions.checkNotNull(clusterBy, "clusterBy");
this.maxPartitions = maxPartitions;
this.aggregate = aggregate;
this.limitHint = limitHint == null ? UNLIMITED : limitHint;

if (maxPartitions < 1) {
throw new IAE("Partition count must be at least 1");
Expand Down Expand Up @@ -133,6 +136,14 @@ public int getMaxPartitions()
return maxPartitions;
}

@Override
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = LimitHintJsonIncludeFilter.class)
@JsonProperty
public long limitHint()
{
return limitHint;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -145,22 +156,24 @@ public boolean equals(Object o)
GlobalSortMaxCountShuffleSpec that = (GlobalSortMaxCountShuffleSpec) o;
return maxPartitions == that.maxPartitions
&& aggregate == that.aggregate
&& Objects.equals(clusterBy, that.clusterBy);
&& Objects.equals(clusterBy, that.clusterBy)
&& Objects.equals(limitHint, that.limitHint);
}

@Override
public int hashCode()
{
return Objects.hash(clusterBy, maxPartitions, aggregate);
return Objects.hash(clusterBy, maxPartitions, aggregate, limitHint);
}

@Override
public String toString()
{
return "MaxCountShuffleSpec{" +
return "GlobalSortMaxCountShuffleSpec{" +
"clusterBy=" + clusterBy +
", partitions=" + maxPartitions +
", maxPartitions=" + maxPartitions +
", aggregate=" + aggregate +
", limitHint=" + limitHint +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,4 @@ public int partitionCount()
{
return numPartitions;
}

}
Copy link
Copy Markdown
Contributor

@LakshSingla LakshSingla Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, how does this work?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's this API: https://fasterxml.github.io/jackson-annotations/javadoc/2.9/com/fasterxml/jackson/annotation/JsonInclude.Include.html#CUSTOM

Value that indicates that separate filter Object (specified by JsonInclude.valueFilter() for value itself, and/or JsonInclude.contentFilter() for contents of structured types) is to be used for determining inclusion criteria. Filter object's equals() method is called with value to serialize; if it returns true value is excluded (that is, filtered out); if false value is included.

It's kind of goofy, but it's the only tool Jackson provides us for keeping the serialized JSON clean other than "include non-null", "include non-default", and "include non-empty".

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.kernel;

import com.fasterxml.jackson.annotation.JsonInclude;

/**
* {@link JsonInclude} filter for {@link ShuffleSpec#limitHint()}.
*
* This API works by "creative" use of equals. It requires warnings to be suppressed
* and also requires spotbugs exclusions (see spotbugs-exclude.xml).
*/
@SuppressWarnings({"EqualsAndHashcode", "EqualsHashCode"})
public class LimitHintJsonIncludeFilter

Check failure

Code scanning / CodeQL

Inconsistent equals and hashCode

Class LimitHintJsonIncludeFilter overrides [equals](1) but not hashCode.
{
@Override
public boolean equals(Object obj)
{
return obj instanceof Long && (Long) obj == ShuffleSpec.UNLIMITED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
})
public interface ShuffleSpec
{
long UNLIMITED = -1;

/**
* The nature of this shuffle: hash vs. range based partitioning; whether the data are sorted or not.
*
Expand Down Expand Up @@ -68,4 +70,17 @@ public interface ShuffleSpec
* @throws IllegalStateException if kind is {@link ShuffleKind#GLOBAL_SORT} with more than one target partition
*/
int partitionCount();

/**
* Limit that can be applied during shuffling. This is provided to enable performance optimizations.
*
* Implementations may apply this limit to each partition individually, or may apply it to the entire resultset
* (across all partitions). Either approach is valid, so downstream logic must handle either one.
*
* Implementations may also ignore this hint completely, or may apply a limit that is somewhat higher than this hint.
*/
default long limitHint()
Copy link
Copy Markdown
Contributor

@LakshSingla LakshSingla Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if there's any merit in separating the limit hint into a POJO, and having both limit and offset baked into it. Something like DefaultLimitSpec without the columns. It's up to the reader to calculate the combined limit and use that, instead of baking that knowledge into a long.

I was thinking of use cases when we want to pushdown this limit into other portions of MSQ's stack and want to distinguish between rows [0..offset) (thrown away) and rows [offset, offset + limit) (kept)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think offset can be pushed down? Limit can be pushed down when sorting because if some row is in the top N globally, it must also be in the first N rows of whichever partition it appears in.

But with offset, if for example you have LIMIT N OFFSET M, we can't push down OFFSET M (i.e. skip M rows). It is possible that some of the first M rows in partition A still need to appear in the final resultset, perhaps because some of them are greater than any of the first M rows in the globally-sorted result. So the best we can do is push down LIMIT N + M.

{
return UNLIMITED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec;
import org.apache.druid.msq.kernel.MixShuffleSpec;
import org.apache.druid.msq.kernel.ShuffleSpec;

/**
* Static factory methods for common implementations of {@link ShuffleSpecFactory}.
Expand All @@ -37,10 +38,21 @@ private ShuffleSpecFactories()
* Factory that produces a single output partition, which may or may not be sorted.
*/
public static ShuffleSpecFactory singlePartition()
{
return singlePartitionWithLimit(ShuffleSpec.UNLIMITED);
}

/**
* Factory that produces a single output partition, which may or may not be sorted.
*
* @param limitHint limit that can be applied during shuffling. May not actually be applied; this is just an
* optional optimization. See {@link ShuffleSpec#limitHint()}.
*/
public static ShuffleSpecFactory singlePartitionWithLimit(final long limitHint)
{
return (clusterBy, aggregate) -> {
if (clusterBy.sortable() && !clusterBy.isEmpty()) {
return new GlobalSortMaxCountShuffleSpec(clusterBy, 1, aggregate);
return new GlobalSortMaxCountShuffleSpec(clusterBy, 1, aggregate, limitHint);
} else {
return MixShuffleSpec.instance();
}
Expand All @@ -52,7 +64,8 @@ public static ShuffleSpecFactory singlePartition()
*/
public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int partitions)
{
return (clusterBy, aggregate) -> new GlobalSortMaxCountShuffleSpec(clusterBy, partitions, aggregate);
return (clusterBy, aggregate) ->
new GlobalSortMaxCountShuffleSpec(clusterBy, partitions, aggregate, ShuffleSpec.UNLIMITED);
}

/**
Expand All @@ -61,10 +74,6 @@ public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int parti
public static ShuffleSpecFactory getGlobalSortWithTargetSize(int targetSize)
{
return (clusterBy, aggregate) ->
new GlobalSortTargetSizeShuffleSpec(
clusterBy,
targetSize,
aggregate
);
new GlobalSortTargetSizeShuffleSpec(clusterBy, targetSize, aggregate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,25 @@ public QueryDefinition makeQueryDefinition(
final ShuffleSpecFactory shuffleSpecFactoryPostAggregation;
boolean partitionBoost;

// limitHint to use for the shuffle after the post-aggregation stage.
// Don't apply limitHint pre-aggregation, because results from pre-aggregation may not be fully grouped.
final long postAggregationLimitHint;

if (doLimitOrOffset) {
final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec();
postAggregationLimitHint =
limitSpec.isLimited() ? limitSpec.getOffset() + limitSpec.getLimit() : ShuffleSpec.UNLIMITED;
} else {
postAggregationLimitHint = ShuffleSpec.UNLIMITED;
}

if (intermediateClusterBy.isEmpty() && resultClusterByWithoutPartitionBoost.isEmpty()) {
// Ignore shuffleSpecFactory, since we know only a single partition will come out, and we can save some effort.
// This condition will be triggered when we don't have a grouping dimension, no partitioning granularity
// (PARTITIONED BY ALL) and no ordering/clustering dimensions
// For example: INSERT INTO foo SELECT COUNT(*) FROM bar PARTITIONED BY ALL
shuffleSpecFactoryPreAggregation = ShuffleSpecFactories.singlePartition();
shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartition();
shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint);
partitionBoost = false;
} else if (doOrderBy) {
// There can be a situation where intermediateClusterBy is empty, while the resultClusterBy is non-empty
Expand All @@ -130,9 +142,13 @@ public QueryDefinition makeQueryDefinition(
shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty()
? ShuffleSpecFactories.singlePartition()
: ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount);
shuffleSpecFactoryPostAggregation = doLimitOrOffset
? ShuffleSpecFactories.singlePartition()
: resultShuffleSpecFactory;

if (doLimitOrOffset) {
shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint);
} else {
shuffleSpecFactoryPostAggregation = resultShuffleSpecFactory;
}

partitionBoost = true;
} else {
shuffleSpecFactoryPreAggregation = doLimitOrOffset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,22 +143,32 @@ public QueryDefinition makeQueryDefinition(
);

ShuffleSpec scanShuffleSpec;
if (!hasLimitOrOffset) {
// If there is no limit spec, apply the final shuffling here itself. This will ensure partition sizes etc are respected.
scanShuffleSpec = finalShuffleSpec;
} else {
if (hasLimitOrOffset) {
// If there is a limit spec, check if there are any non-boost columns to sort in.
boolean requiresSort = clusterByColumns.stream()
.anyMatch(keyColumn -> !QueryKitUtils.PARTITION_BOOST_COLUMN.equals(keyColumn.columnName()));
boolean requiresSort =
clusterByColumns.stream()
.anyMatch(keyColumn -> !QueryKitUtils.PARTITION_BOOST_COLUMN.equals(keyColumn.columnName()));
if (requiresSort) {
// If yes, do a sort into a single partition.
scanShuffleSpec = ShuffleSpecFactories.singlePartition().build(clusterBy, false);
final long limitHint;

if (queryToRun.isLimited()
&& queryToRun.getScanRowsOffset() + queryToRun.getScanRowsLimit() > 0 /* overflow check */) {
limitHint = queryToRun.getScanRowsOffset() + queryToRun.getScanRowsLimit();
} else {
limitHint = ShuffleSpec.UNLIMITED;
}

scanShuffleSpec = ShuffleSpecFactories.singlePartitionWithLimit(limitHint).build(clusterBy, false);
} else {
// If the only clusterBy column is the boost column, we just use a mix shuffle to avoid unused shuffling.
// Note that we still need the boost column to be present in the row signature, since the limit stage would
// need it to be populated to do its own shuffling later.
scanShuffleSpec = MixShuffleSpec.instance();
}
} else {
// If there is no limit spec, apply the final shuffling here itself. This will ensure partition sizes etc are respected.
scanShuffleSpec = finalShuffleSpec;
}

queryDefBuilder.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1847,7 +1847,7 @@ public void testGroupByWithLimitAndOrdering(String contextName, Map<String, Obje
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(5),
.with().rows(4),
1, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
Expand All @@ -1862,7 +1862,7 @@ public void testGroupByWithLimitAndOrdering(String contextName, Map<String, Obje
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(5),
.with().rows(4),
2, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.segment.TestHelper;
Expand Down Expand Up @@ -69,7 +70,8 @@ public class MSQTaskReportTest
new GlobalSortMaxCountShuffleSpec(
new ClusterBy(ImmutableList.of(new KeyColumn("s", KeyOrder.ASCENDING)), 0),
2,
false
false,
ShuffleSpec.UNLIMITED
)
)
.maxWorkerCount(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public void testSerde() throws Exception
new GlobalSortMaxCountShuffleSpec(
new ClusterBy(ImmutableList.of(new KeyColumn("s", KeyOrder.ASCENDING)), 0),
2,
false
false,
ShuffleSpec.UNLIMITED
)
)
.maxWorkerCount(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public void testGeneratePartitionsForNonNullShuffleWithNullCollector()
new GlobalSortMaxCountShuffleSpec(
new ClusterBy(ImmutableList.of(new KeyColumn("test", KeyOrder.ASCENDING)), 0),
2,
false
false,
ShuffleSpec.UNLIMITED
),
1,
false
Expand All @@ -95,7 +96,8 @@ public void testGeneratePartitionsForNonNullShuffleWithNonNullCollector()
new GlobalSortMaxCountShuffleSpec(
new ClusterBy(ImmutableList.of(new KeyColumn("test", KeyOrder.ASCENDING)), 0),
1,
false
false,
ShuffleSpec.UNLIMITED
),
1,
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public MockQueryDefinitionBuilder defineStage(
0
),
MAX_NUM_PARTITIONS,
false
false,
ShuffleSpec.UNLIMITED
);
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,20 @@ public ReturnOrAwait<Long> runIncrementally(final IntSet readableInputs) throws
return ReturnOrAwait.awaitAll(awaitSet);
}

// Check finished() after populateCurrentFramesAndTournamentTree().
if (finished()) {
// Done!
return ReturnOrAwait.returnObject(rowsOutput);
}

// Generate one output frame and stop for now.
outputChannel.write(nextFrame());
return ReturnOrAwait.runAgain();

// Check finished() after nextFrame().
if (finished()) {
return ReturnOrAwait.returnObject(rowsOutput);
} else {
return ReturnOrAwait.runAgain();
}
}

private FrameWithPartition nextFrame()
Expand Down
Loading