Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion processing/src/main/java/io/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ static <T> int parseInt(Query<T> query, String key, int defaultValue)
}
}

static <T> boolean parseBoolean(Query<T> query, String key, boolean defaultValue)
public static <T> boolean parseBoolean(Query<T> query, String key, boolean defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
Expand Down
38 changes: 34 additions & 4 deletions processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.druid.query.DataSource;
import io.druid.query.Queries;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryDataSource;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
Expand Down Expand Up @@ -102,6 +103,7 @@ public static Builder builder()

private final boolean applyLimitPushDown;
private final Function<Sequence<Row>, Sequence<Row>> postProcessingFn;
private final GroupByQuery pushedDownQuery;

@JsonCreator
public GroupByQuery(
Expand All @@ -115,7 +117,8 @@ public GroupByQuery(
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
@JsonProperty("having") HavingSpec havingSpec,
@JsonProperty("limitSpec") LimitSpec limitSpec,
@JsonProperty("context") Map<String, Object> context
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("pushedDownQuery") GroupByQuery pushedDownQuery
)
{
this(
Expand All @@ -130,7 +133,8 @@ public GroupByQuery(
havingSpec,
limitSpec,
null,
context
context,
pushedDownQuery
);
}

Expand Down Expand Up @@ -167,7 +171,8 @@ private GroupByQuery(
final HavingSpec havingSpec,
final LimitSpec limitSpec,
final @Nullable Function<Sequence<Row>, Sequence<Row>> postProcessingFn,
final Map<String, Object> context
final Map<String, Object> context,
final GroupByQuery pushedDownQuery
)
{
super(dataSource, querySegmentSpec, false, context, granularity);
Expand Down Expand Up @@ -198,6 +203,7 @@ private GroupByQuery(

// Check if limit push down configuration is valid and check if limit push down will be applied
this.applyLimitPushDown = determineApplyLimitPushDown();
this.pushedDownQuery = pushedDownQuery;
}

@JsonProperty
Expand Down Expand Up @@ -242,6 +248,9 @@ public LimitSpec getLimitSpec()
return limitSpec;
}

@JsonProperty
public GroupByQuery getPushedDownQuery() { return pushedDownQuery; }

@Override
public boolean hasFilters()
{
Expand Down Expand Up @@ -623,6 +632,13 @@ public GroupByQuery withOverriddenContext(Map<String, Object> contextOverride)
@Override
public GroupByQuery withQuerySegmentSpec(QuerySegmentSpec spec)
{
GroupByQuery pushedDownQueryWithSegmentSpec;
boolean isPushedDownQuery = this.getPushedDownQuery() != null;
if (isPushedDownQuery) {
// In case of nested queries that are pushed down, we need to make sure that segment specs are changed on them too.
pushedDownQueryWithSegmentSpec = new Builder(this.getPushedDownQuery()).setQuerySegmentSpec(spec).build();
return new Builder(this).setQueryToPushDown(pushedDownQueryWithSegmentSpec).setQuerySegmentSpec(spec).build();
}
return new Builder(this).setQuerySegmentSpec(spec).build();
}

Expand Down Expand Up @@ -652,6 +668,10 @@ public GroupByQuery withPostAggregatorSpecs(final List<PostAggregator> postAggre
return new Builder(this).setPostAggregatorSpecs(postAggregatorSpecs).build();
}

public GroupByQuery withPushedDownQuery(final GroupByQuery q) {
return new Builder(this).setQueryToPushDown(q).build();
}

private static void verifyOutputNames(
List<DimensionSpec> dimensions,
List<AggregatorFactory> aggregators,
Expand Down Expand Up @@ -710,6 +730,7 @@ public static class Builder
private Function<Sequence<Row>, Sequence<Row>> postProcessingFn;
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
private int limit = Integer.MAX_VALUE;
private GroupByQuery pushedDownQuery;

public Builder()
{
Expand All @@ -729,6 +750,7 @@ public Builder(GroupByQuery query)
limitSpec = query.getLimitSpec();
postProcessingFn = query.postProcessingFn;
context = query.getContext();
pushedDownQuery = query.getPushedDownQuery();
}

public Builder(Builder builder)
Expand All @@ -747,6 +769,7 @@ public Builder(Builder builder)
limit = builder.limit;
orderByColumnSpecs = new ArrayList<>(builder.orderByColumnSpecs);
context = builder.context;
pushedDownQuery = builder.pushedDownQuery;
}

public Builder setDataSource(DataSource dataSource)
Expand Down Expand Up @@ -807,6 +830,12 @@ public Builder setLimit(int limit)
return this;
}

public Builder setQueryToPushDown(GroupByQuery q)
{
this.pushedDownQuery = q;
return this;
}

public Builder addOrderByColumn(String dimension)
{
return addOrderByColumn(dimension, null);
Expand Down Expand Up @@ -968,7 +997,8 @@ public GroupByQuery build()
havingSpec,
theLimitSpec,
postProcessingFn,
context
context,
pushedDownQuery
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class GroupByQueryConfig
public static final String CTX_KEY_STRATEGY = "groupByStrategy";
public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown";
public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN = "applyLimitPushDown";
public static final String CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY = "forcePushDownNestedQuery";
private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded";
private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows";
private static final String CTX_KEY_MAX_RESULTS = "maxResults";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
import io.druid.java.util.common.guava.Accumulator;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryDataSource;
import io.druid.query.QueryPlus;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
Expand Down Expand Up @@ -257,4 +261,15 @@ public static Map<String, ValueType> rowSignatureFor(final GroupByQuery query)
// Don't include post-aggregators since we don't know what types they are.
return types.build();
}

public static Query getPushedDownQueryIfPresent(QueryPlus q) {
Query query = q.getQuery();
if (query instanceof GroupByQuery) {
GroupByQuery gp = (GroupByQuery)query;
if (gp.getPushedDownQuery() != null) {
query = gp.getPushedDownQuery();
}
}
return query;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.SubqueryQueryRunner;
import io.druid.query.TableDataSource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.MetricManipulatorFns;
Expand Down Expand Up @@ -162,47 +163,45 @@ private Sequence<Row> mergeGroupByResults(
)
{
// If there's a subquery, merge subquery results and then apply the aggregator

final DataSource dataSource = query.getDataSource();

if (dataSource instanceof QueryDataSource) {
final GroupByQuery subquery;
try {
// Inject outer query context keys into subquery if they don't already exist in the subquery context.
// Unlike withOverriddenContext's normal behavior, we want keys present in the subquery to win.
final Map<String, Object> subqueryContext = Maps.newTreeMap();
if (query.getContext() != null) {
for (Map.Entry<String, Object> entry : query.getContext().entrySet()) {
if (entry.getValue() != null) {
subqueryContext.put(entry.getKey(), entry.getValue());
}
}
}
if (((QueryDataSource) dataSource).getQuery().getContext() != null) {
subqueryContext.putAll(((QueryDataSource) dataSource).getQuery().getContext());
boolean pushDownQuery = shouldPushDownQuery(query);
if (pushDownQuery) {
GroupByQuery innerMostQuery = getInnerMostQuery(query);
GroupByQuery.Builder innerQueryBuilder = new GroupByQuery.Builder(innerMostQuery);
GroupByQuery.Builder pushDownQueryBuilder = new GroupByQuery.Builder(query);
// Unset the push down nested query flag so that the historical doesn't erroneously end up pushing down the query itself
pushDownQueryBuilder.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false));
GroupByQuery queryToPushDown = pushDownQueryBuilder.build();
innerQueryBuilder.setQueryToPushDown(queryToPushDown);
GroupByQuery newInnerQuery = innerQueryBuilder.build();
Sequence<Row> pushDownQueryResults = groupByStrategy.mergeResults(runner, newInnerQuery, context);
return groupByStrategy.processSubqueryResult(newInnerQuery, queryToPushDown, resource, pushDownQueryResults);
}
subqueryContext.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false);
final Map<String, Object> subqueryContext = getSubqueryContext(query, (QueryDataSource) dataSource);
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery().withOverriddenContext(subqueryContext);
}
catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
}

GroupByQuery.Builder subqueryBuilder = new GroupByQuery.Builder(subquery);
subqueryBuilder.overrideContext(ImmutableMap.<String, Object>of(
//setting sort to false avoids unnecessary sorting while merging results. we only need to sort
//in the end when returning results to user. (note this is only respected by groupBy v1)
GroupByQueryHelper.CTX_KEY_SORT_RESULTS,
false
));
GroupByQuery newSubquery = subqueryBuilder.build();
final Sequence<Row> subqueryResult = mergeGroupByResults(
groupByStrategy,
subquery.withOverriddenContext(
ImmutableMap.<String, Object>of(
//setting sort to false avoids unnecessary sorting while merging results. we only need to sort
//in the end when returning results to user. (note this is only respected by groupBy v1)
GroupByQueryHelper.CTX_KEY_SORT_RESULTS,
false
)
),
newSubquery,
resource,
runner,
context
);

final Sequence<Row> finalizingResults;
if (QueryContexts.isFinalize(subquery, false)) {
finalizingResults = new MappedSequence<>(
Expand All @@ -215,13 +214,49 @@ private Sequence<Row> mergeGroupByResults(
} else {
finalizingResults = subqueryResult;
}

return groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults);
return groupByStrategy.processSubqueryResult(newSubquery, query, resource, finalizingResults);
} else {
return groupByStrategy.mergeResults(runner, query, context);
}
}

public static boolean shouldPushDownQuery(GroupByQuery q)
{
return QueryContexts.parseBoolean(q, GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false);
}

private static GroupByQuery getInnerMostQuery(GroupByQuery query)
{
if (query.getDataSource() instanceof TableDataSource) {
return query;
}
QueryDataSource queryDataSource = (QueryDataSource) query.getDataSource();
// Make sure that we pass down the context values to the inner most query
final Map<String, Object> newSubQueryContext = getSubqueryContext(query, queryDataSource);
GroupByQuery subquery = (GroupByQuery) queryDataSource.getQuery();
subquery = subquery.withOverriddenContext(newSubQueryContext);
return getInnerMostQuery(subquery);
}

private static Map<String, Object> getSubqueryContext(GroupByQuery query, QueryDataSource dataSource)
{
final Map<String, Object> subqueryContext = Maps.newTreeMap();
// Inject outer query context keys into subquery if they don't already exist in the subquery context.
// Unlike withOverriddenContext's normal behavior, we want keys present in the subquery to win.
if (query.getContext() != null) {
for (Map.Entry<String, Object> entry : query.getContext().entrySet()) {
if (entry.getValue() != null) {
subqueryContext.put(entry.getKey(), entry.getValue());
}
}
}
if (dataSource.getQuery().getContext() != null) {
subqueryContext.putAll(dataSource.getQuery().getContext());
}
subqueryContext.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false);
return subqueryContext;
}

@Override
public GroupByQueryMetrics makeMetrics(GroupByQuery query)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
Expand All @@ -67,15 +68,17 @@ public static Sequence<Row> process(
final GroupByQueryResource resource,
final ObjectMapper spillMapper,
final String processingTmpDir,
final int mergeBufferSize
final int mergeBufferSize,
final boolean wasQueryPushedDown
)
{
final GroupByQuery query = (GroupByQuery) queryParam;
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);

final AggregatorFactory[] aggregatorFactories = new AggregatorFactory[query.getAggregatorSpecs().size()];
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
aggregatorFactories[i] = query.getAggregatorSpecs().get(i);
AggregatorFactory af = query.getAggregatorSpecs().get(i);
aggregatorFactories[i] = wasQueryPushedDown ? af.getCombiningFactory() : af;
}


Expand All @@ -99,7 +102,17 @@ public static Sequence<Row> process(
? BooleanValueMatcher.of(true)
: filter.makeMatcher(columnSelectorFactory);

final FilteredSequence<Row> filteredSequence = new FilteredSequence<>(
final FilteredSequence<Row> filteredSequence = wasQueryPushedDown ? new FilteredSequence<>(
rows,
new Predicate<Row>()
{
@Override
public boolean apply(@Nullable Row row)
{
return true; // nothing to filter since it has already been done on the push down
}
}
) : new FilteredSequence<>(
rows,
new Predicate<Row>()
{
Expand Down Expand Up @@ -143,7 +156,7 @@ public CloseableGrouperIterator<RowBasedKey, Row> make()

Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
query,
true,
true, //todo: samarth think about this attribute. I think this might be false in our case?
rowSignature,
querySpecificConfig,
new Supplier<ByteBuffer>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>

private final List<File> files = Lists.newArrayList();
private final List<File> dictionaryFiles = Lists.newArrayList();
private final boolean sortHasNonGroupingFields;
private final boolean sortHasNonGroupingFields; //TODO: samarth what to do when there is sorting present different from grouping

private boolean spillingAllowed = false;

Expand Down
Loading