Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/querying/groupbyquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ When using the "v2" strategy, the following query context parameters apply:
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|
|`forcePushDownLimit`|When all fields in the orderby are part of the grouping key, the broker will push limit application down to the historical nodes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|

When using the "v1" strategy, the following query context parameters apply:

Expand Down
266 changes: 265 additions & 1 deletion processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
Expand Down Expand Up @@ -53,6 +54,9 @@
import io.druid.query.groupby.orderby.LimitSpec;
import io.druid.query.groupby.orderby.NoopLimitSpec;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
import io.druid.query.ordering.StringComparator;
import io.druid.query.ordering.StringComparators;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.VirtualColumn;
Expand All @@ -65,6 +69,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -97,6 +102,8 @@ public static Builder builder()
private final List<AggregatorFactory> aggregatorSpecs;
private final List<PostAggregator> postAggregatorSpecs;

private final Function<Sequence<Row>, Sequence<Row>> limitFn;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is unused.

private final boolean applyLimitPushDown;
private final Function<Sequence<Row>, Sequence<Row>> postProcessingFn;

@JsonCreator
Expand Down Expand Up @@ -190,6 +197,45 @@ private GroupByQuery(
verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);

this.postProcessingFn = postProcessingFn != null ? postProcessingFn : makePostProcessingFn();

// Check if limit push down configuration is valid and check if limit push down will be applied
this.applyLimitPushDown = determineApplyLimitPushDown();

// On an inner query, we may sometimes get a LimitSpec so that row orderings can be determined for limit push down
// However, it's not necessary to build the real limitFn from it at this stage.
Function<Sequence<Row>, Sequence<Row>> postProcFn;
if (getContextBoolean(GroupByStrategyV2.CTX_KEY_OUTERMOST, true)) {
postProcFn = this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
} else {
postProcFn = NoopLimitSpec.INSTANCE.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);
}

if (havingSpec != null) {
postProcFn = Functions.compose(
postProcFn,
new Function<Sequence<Row>, Sequence<Row>>()
{
@Override
public Sequence<Row> apply(Sequence<Row> input)
{
GroupByQuery.this.havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this));
return Sequences.filter(
input,
new Predicate<Row>()
{
@Override
public boolean apply(Row input)
{
return GroupByQuery.this.havingSpec.eval(input);
}
}
);
}
}
);
}

limitFn = postProcFn;
}

@JsonProperty
Expand Down Expand Up @@ -264,6 +310,12 @@ public boolean getContextSortByDimsFirst()
return getContextBoolean(CTX_KEY_SORT_BY_DIMS_FIRST, false);
}

@JsonIgnore
public boolean isApplyLimitPushDown()
{
return applyLimitPushDown;
}

@Override
public Ordering getResultOrdering()
{
Expand All @@ -281,10 +333,177 @@ public Ordering getResultOrdering()
);
}

public Ordering<Row> getRowOrdering(final boolean granular)
private boolean validateAndGetForceLimitPushDown()
{
final boolean forcePushDown = getContextBoolean(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, false);
if (forcePushDown) {
if (!(limitSpec instanceof DefaultLimitSpec)) {
throw new IAE("When forcing limit push down, a limit spec must be provided.");
}

if (((DefaultLimitSpec) limitSpec).getLimit() == Integer.MAX_VALUE) {
throw new IAE("When forcing limit push down, the provided limit spec must have a limit.");
}

for (OrderByColumnSpec orderBySpec : ((DefaultLimitSpec) limitSpec).getColumns()) {
if (OrderByColumnSpec.getPostAggIndexForOrderBy(orderBySpec, postAggregatorSpecs) > -1) {
throw new UnsupportedOperationException("Limit push down when sorting by a post aggregator is not supported.");
}
}
}
return forcePushDown;
}

public boolean determineApplyLimitPushDown()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this can be implemented in one line return validateAndGetForceLimitPushDown() || DefaultLimitSpec.sortingOrderHasNonGroupingFields(..);

you might have to add the defaultLimitSpec.getLimit() == Integer.MAX_VALUE check inside DefaultLimitSpec.sortingOrderHasNonGroupingFields(..)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I decided to change this area in a different way, I removed an unnecessary null and type check from sortingOrderHasNonGroupingFields(), but kept determineApplyLimitPushDown() the same.

The sortingOrder method is used elsewhere, where the MAX_VALUE check is no longer relevant (as it would've been already checked earlier when the query object was created), so I didn't think it really belongs there.

I also felt that the conditions for when limit push down will be applied are more clear if they're all in the determineApplyLimitPushDown() method.

{
final boolean forceLimitPushDown = validateAndGetForceLimitPushDown();

if (limitSpec instanceof DefaultLimitSpec) {
DefaultLimitSpec defaultLimitSpec = (DefaultLimitSpec) limitSpec;

// If only applying an orderby without a limit, don't try to push down
if (defaultLimitSpec.getLimit() == Integer.MAX_VALUE) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is always false because validateAndGetForceLimitPushDown() throws an exception if it satisfies.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateAndGetForceLimitPushDown() only throws the exception if the user explicitly sets forceLimitPushDown to true in the query context.

In this block, forceLimitPushDown may be false, so it's checking for the case where a limit was not provided (e.g., the user only wants to change the sort, but not impose a limit, without specifying forced push down)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, somewhat unrelated, maybe it's better to represent "no limit specified" as null instead of Integer.MAX_VALUE, I'll think about changing that

return false;
}

if (forceLimitPushDown) {
return true;
}

// If the sorting order only uses columns in the grouping key, we can always push the limit down
// to the buffer grouper without affecting result accuracy
boolean sortHasNonGroupingFields = DefaultLimitSpec.sortingOrderHasNonGroupingFields(
(DefaultLimitSpec) limitSpec,
getDimensions()
);

return !sortHasNonGroupingFields;
}

return false;
}

/**
* When limit push down is applied, the partial results would be sorted by the ordering specified by the
* limit/order spec (unlike non-push down case where the results always use the default natural ascending order),
* so when merging these partial result streams, the merge needs to use the same ordering to get correct results.
*/
private Ordering<Row> getRowOrderingForPushDown(
final boolean granular,
final DefaultLimitSpec limitSpec
)
{
final boolean sortByDimsFirst = getContextSortByDimsFirst();

final List<String> orderedFieldNames = new ArrayList<>();
final Set<Integer> dimsInOrderBy = new HashSet<>();
final List<Boolean> needsReverseList = new ArrayList<>();
final List<Boolean> isNumericField = new ArrayList<>();
final List<StringComparator> comparators = new ArrayList<>();

for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) {
boolean needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING;
int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions);
if (dimIndex >= 0) {
DimensionSpec dim = dimensions.get(dimIndex);
orderedFieldNames.add(dim.getOutputName());
dimsInOrderBy.add(dimIndex);
needsReverseList.add(needsReverse);
final ValueType type = dimensions.get(dimIndex).getOutputType();
isNumericField.add(type == ValueType.LONG || type == ValueType.FLOAT);
comparators.add(orderSpec.getDimensionComparator());
}
}

for (int i = 0; i < dimensions.size(); i++) {
if (!dimsInOrderBy.contains(i)) {
orderedFieldNames.add(dimensions.get(i).getOutputName());
needsReverseList.add(false);
final ValueType type = dimensions.get(i).getOutputType();
isNumericField.add(type == ValueType.LONG || type == ValueType.FLOAT);
comparators.add(StringComparators.LEXICOGRAPHIC);
}
}

final Comparator<Row> timeComparator = getTimeComparator(granular);

if (timeComparator == null) {
return Ordering.from(
new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
return compareDimsForLimitPushDown(
orderedFieldNames,
needsReverseList,
isNumericField,
comparators,
lhs,
rhs
);
}
}
);
} else if (sortByDimsFirst) {
return Ordering.from(
new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
final int cmp = compareDimsForLimitPushDown(
orderedFieldNames,
needsReverseList,
isNumericField,
comparators,
lhs,
rhs
);
if (cmp != 0) {
return cmp;
}

return timeComparator.compare(lhs, rhs);
}
}
);
} else {
return Ordering.from(
new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
final int timeCompare = timeComparator.compare(lhs, rhs);

if (timeCompare != 0) {
return timeCompare;
}

return compareDimsForLimitPushDown(
orderedFieldNames,
needsReverseList,
isNumericField,
comparators,
lhs,
rhs
);
}
}
);
}
}

public Ordering<Row> getRowOrdering(final boolean granular)
{
if (applyLimitPushDown) {
if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields((DefaultLimitSpec) limitSpec, dimensions)) {
return getRowOrderingForPushDown(granular, (DefaultLimitSpec) limitSpec);
}
}

final boolean sortByDimsFirst = getContextSortByDimsFirst();
final Comparator<Row> timeComparator = getTimeComparator(granular);

if (timeComparator == null) {
Expand Down Expand Up @@ -357,6 +576,51 @@ private static int compareDims(List<DimensionSpec> dimensions, Row lhs, Row rhs)
return 0;
}

private static int compareDimsForLimitPushDown(
final List<String> fields,
final List<Boolean> needsReverseList,
final List<Boolean> isNumericField,
final List<StringComparator> comparators,
Row lhs,
Row rhs
)
{
for (int i = 0; i < fields.size(); i++) {
final String fieldName = fields.get(i);
final StringComparator comparator = comparators.get(i);

final int dimCompare;

Object lhsObj;
Object rhsObj;
if (needsReverseList.get(i)) {
lhsObj = rhs.getRaw(fieldName);
rhsObj = lhs.getRaw(fieldName);
} else {
lhsObj = lhs.getRaw(fieldName);
rhsObj = rhs.getRaw(fieldName);
}

if (isNumericField.get(i)) {
if (comparator == StringComparators.NUMERIC) {
dimCompare = NATURAL_NULLS_FIRST.compare(
rhs.getRaw(fieldName),
lhs.getRaw(fieldName)
);
} else {
dimCompare = comparator.compare(String.valueOf(lhsObj), String.valueOf(rhsObj));
}
} else {
dimCompare = comparator.compare((String) lhsObj, (String) rhsObj);
}

if (dimCompare != 0) {
return dimCompare;
}
}
return 0;
}

/**
* Apply the havingSpec and limitSpec. Because havingSpecs are not thread safe, and because they are applied during
* accumulation of the returned sequence, callers must take care to avoid accumulating two different Sequences
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class GroupByQueryConfig
{
public static final String CTX_KEY_STRATEGY = "groupByStrategy";
public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown";
private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded";
private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows";
private static final String CTX_KEY_MAX_RESULTS = "maxResults";
Expand Down Expand Up @@ -66,6 +67,12 @@ public class GroupByQueryConfig
// Max on-disk temporary storage, per-query; when exceeded, the query fails
private long maxOnDiskStorage = 0L;

@JsonProperty
private boolean forcePushDownLimit = false;

@JsonProperty
private Class<? extends GroupByQueryMetricsFactory> queryMetricsFactory;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jon-wei could you please remove this field, getter and setter? They were removed here: https://github.com/druid-io/druid/pull/4336/files#diff-5e30ea112240e82f233099071bb3389e but resurrected in this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leventov Patch to remove those is here, #4383, thanks for catching that


public String getDefaultStrategy()
{
return defaultStrategy;
Expand Down Expand Up @@ -126,6 +133,21 @@ public long getMaxOnDiskStorage()
return maxOnDiskStorage;
}

public boolean isForcePushDownLimit()
{
return forcePushDownLimit;
}

public Class<? extends GroupByQueryMetricsFactory> getQueryMetricsFactory()
{
return queryMetricsFactory != null ? queryMetricsFactory : DefaultGroupByQueryMetricsFactory.class;
}

public void setQueryMetricsFactory(Class<? extends GroupByQueryMetricsFactory> queryMetricsFactory)
{
this.queryMetricsFactory = queryMetricsFactory;
}

public GroupByQueryConfig withOverrides(final GroupByQuery query)
{
final GroupByQueryConfig newConfig = new GroupByQueryConfig();
Expand Down Expand Up @@ -159,6 +181,7 @@ public GroupByQueryConfig withOverrides(final GroupByQuery query)
((Number) query.getContextValue(CTX_KEY_MAX_MERGING_DICTIONARY_SIZE, getMaxMergingDictionarySize())).longValue(),
getMaxMergingDictionarySize()
);
newConfig.forcePushDownLimit = query.getContextBoolean(CTX_KEY_FORCE_LIMIT_PUSH_DOWN, isForcePushDownLimit());
return newConfig;
}
}
Loading