diff --git a/processing/src/main/java/io/druid/query/QueryContexts.java b/processing/src/main/java/io/druid/query/QueryContexts.java index d0a16fd87844..23bac3721542 100644 --- a/processing/src/main/java/io/druid/query/QueryContexts.java +++ b/processing/src/main/java/io/druid/query/QueryContexts.java @@ -218,7 +218,7 @@ static int parseInt(Query query, String key, int defaultValue) } } - static boolean parseBoolean(Query query, String key, boolean defaultValue) + public static boolean parseBoolean(Query query, String key, boolean defaultValue) { Object val = query.getContextValue(key); if (val == null) { diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 9ca527ea17a1..a14b1aea2c05 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -42,6 +42,7 @@ import io.druid.query.DataSource; import io.druid.query.Queries; import io.druid.query.Query; +import io.druid.query.QueryContexts; import io.druid.query.QueryDataSource; import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; @@ -102,6 +103,7 @@ public static Builder builder() private final boolean applyLimitPushDown; private final Function, Sequence> postProcessingFn; + private final GroupByQuery pushedDownQuery; @JsonCreator public GroupByQuery( @@ -115,7 +117,8 @@ public GroupByQuery( @JsonProperty("postAggregations") List postAggregatorSpecs, @JsonProperty("having") HavingSpec havingSpec, @JsonProperty("limitSpec") LimitSpec limitSpec, - @JsonProperty("context") Map context + @JsonProperty("context") Map context, + @JsonProperty("pushedDownQuery") GroupByQuery pushedDownQuery ) { this( @@ -130,7 +133,8 @@ public GroupByQuery( havingSpec, limitSpec, null, - context + context, + pushedDownQuery ); } @@ -167,7 +171,8 @@ private GroupByQuery( final HavingSpec havingSpec, final LimitSpec limitSpec, final @Nullable Function, Sequence> postProcessingFn, - final Map context + final Map context, + final GroupByQuery pushedDownQuery ) { super(dataSource, querySegmentSpec, false, context, granularity); @@ -198,6 +203,7 @@ private GroupByQuery( // Check if limit push down configuration is valid and check if limit push down will be applied this.applyLimitPushDown = determineApplyLimitPushDown(); + this.pushedDownQuery = pushedDownQuery; } @JsonProperty @@ -242,6 +248,9 @@ public LimitSpec getLimitSpec() return limitSpec; } + @JsonProperty + public GroupByQuery getPushedDownQuery() { return pushedDownQuery; } + @Override public boolean hasFilters() { @@ -623,6 +632,13 @@ public GroupByQuery withOverriddenContext(Map contextOverride) @Override public GroupByQuery withQuerySegmentSpec(QuerySegmentSpec spec) { + GroupByQuery pushedDownQueryWithSegmentSpec; + boolean isPushedDownQuery = this.getPushedDownQuery() != null; + if (isPushedDownQuery) { + // In case of nested queries that are pushed down, we need to make sure that segment specs are changed on them too. + pushedDownQueryWithSegmentSpec = new Builder(this.getPushedDownQuery()).setQuerySegmentSpec(spec).build(); + return new Builder(this).setQueryToPushDown(pushedDownQueryWithSegmentSpec).setQuerySegmentSpec(spec).build(); + } return new Builder(this).setQuerySegmentSpec(spec).build(); } @@ -652,6 +668,10 @@ public GroupByQuery withPostAggregatorSpecs(final List postAggre return new Builder(this).setPostAggregatorSpecs(postAggregatorSpecs).build(); } + public GroupByQuery withPushedDownQuery(final GroupByQuery q) { + return new Builder(this).setQueryToPushDown(q).build(); + } + private static void verifyOutputNames( List dimensions, List aggregators, @@ -710,6 +730,7 @@ public static class Builder private Function, Sequence> postProcessingFn; private List orderByColumnSpecs = Lists.newArrayList(); private int limit = Integer.MAX_VALUE; + private GroupByQuery pushedDownQuery; public Builder() { @@ -729,6 +750,7 @@ public Builder(GroupByQuery query) limitSpec = query.getLimitSpec(); postProcessingFn = query.postProcessingFn; context = query.getContext(); + pushedDownQuery = query.getPushedDownQuery(); } public Builder(Builder builder) @@ -747,6 +769,7 @@ public Builder(Builder builder) limit = builder.limit; orderByColumnSpecs = new ArrayList<>(builder.orderByColumnSpecs); context = builder.context; + pushedDownQuery = builder.pushedDownQuery; } public Builder setDataSource(DataSource dataSource) @@ -807,6 +830,12 @@ public Builder setLimit(int limit) return this; } + public Builder setQueryToPushDown(GroupByQuery q) + { + this.pushedDownQuery = q; + return this; + } + public Builder addOrderByColumn(String dimension) { return addOrderByColumn(dimension, null); @@ -968,7 +997,8 @@ public GroupByQuery build() havingSpec, theLimitSpec, postProcessingFn, - context + context, + pushedDownQuery ); } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java index 9c744dc24aa1..c56fdc7c785d 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java @@ -29,6 +29,7 @@ public class GroupByQueryConfig public static final String CTX_KEY_STRATEGY = "groupByStrategy"; public static final String CTX_KEY_FORCE_LIMIT_PUSH_DOWN = "forceLimitPushDown"; public static final String CTX_KEY_APPLY_LIMIT_PUSH_DOWN = "applyLimitPushDown"; + public static final String CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY = "forcePushDownNestedQuery"; private static final String CTX_KEY_IS_SINGLE_THREADED = "groupByIsSingleThreaded"; private static final String CTX_KEY_MAX_INTERMEDIATE_ROWS = "maxIntermediateRows"; private static final String CTX_KEY_MAX_RESULTS = "maxResults"; diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index f8b070d9e964..94a4606aa697 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -38,6 +38,10 @@ import io.druid.java.util.common.guava.Accumulator; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.query.Query; +import io.druid.query.QueryContexts; +import io.druid.query.QueryDataSource; +import io.druid.query.QueryPlus; import io.druid.query.ResourceLimitExceededException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.DimensionSpec; @@ -257,4 +261,15 @@ public static Map rowSignatureFor(final GroupByQuery query) // Don't include post-aggregators since we don't know what types they are. return types.build(); } + + public static Query getPushedDownQueryIfPresent(QueryPlus q) { + Query query = q.getQuery(); + if (query instanceof GroupByQuery) { + GroupByQuery gp = (GroupByQuery)query; + if (gp.getPushedDownQuery() != null) { + query = gp.getPushedDownQuery(); + } + } + return query; + } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 28d5acb42de0..4be649658530 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -47,6 +47,7 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.SubqueryQueryRunner; +import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.query.aggregation.MetricManipulatorFns; @@ -162,47 +163,45 @@ private Sequence mergeGroupByResults( ) { // If there's a subquery, merge subquery results and then apply the aggregator - final DataSource dataSource = query.getDataSource(); - if (dataSource instanceof QueryDataSource) { final GroupByQuery subquery; try { - // Inject outer query context keys into subquery if they don't already exist in the subquery context. - // Unlike withOverriddenContext's normal behavior, we want keys present in the subquery to win. - final Map subqueryContext = Maps.newTreeMap(); - if (query.getContext() != null) { - for (Map.Entry entry : query.getContext().entrySet()) { - if (entry.getValue() != null) { - subqueryContext.put(entry.getKey(), entry.getValue()); - } - } - } - if (((QueryDataSource) dataSource).getQuery().getContext() != null) { - subqueryContext.putAll(((QueryDataSource) dataSource).getQuery().getContext()); + boolean pushDownQuery = shouldPushDownQuery(query); + if (pushDownQuery) { + GroupByQuery innerMostQuery = getInnerMostQuery(query); + GroupByQuery.Builder innerQueryBuilder = new GroupByQuery.Builder(innerMostQuery); + GroupByQuery.Builder pushDownQueryBuilder = new GroupByQuery.Builder(query); + // Unset the push down nested query flag so that the historical doesn't erroneously end up pushing down the query itself + pushDownQueryBuilder.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false)); + GroupByQuery queryToPushDown = pushDownQueryBuilder.build(); + innerQueryBuilder.setQueryToPushDown(queryToPushDown); + GroupByQuery newInnerQuery = innerQueryBuilder.build(); + Sequence pushDownQueryResults = groupByStrategy.mergeResults(runner, newInnerQuery, context); + return groupByStrategy.processSubqueryResult(newInnerQuery, queryToPushDown, resource, pushDownQueryResults); } - subqueryContext.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false); + final Map subqueryContext = getSubqueryContext(query, (QueryDataSource) dataSource); subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery().withOverriddenContext(subqueryContext); } catch (ClassCastException e) { throw new UnsupportedOperationException("Subqueries must be of type 'group by'"); } + GroupByQuery.Builder subqueryBuilder = new GroupByQuery.Builder(subquery); + subqueryBuilder.overrideContext(ImmutableMap.of( + //setting sort to false avoids unnecessary sorting while merging results. we only need to sort + //in the end when returning results to user. (note this is only respected by groupBy v1) + GroupByQueryHelper.CTX_KEY_SORT_RESULTS, + false + )); + GroupByQuery newSubquery = subqueryBuilder.build(); final Sequence subqueryResult = mergeGroupByResults( groupByStrategy, - subquery.withOverriddenContext( - ImmutableMap.of( - //setting sort to false avoids unnecessary sorting while merging results. we only need to sort - //in the end when returning results to user. (note this is only respected by groupBy v1) - GroupByQueryHelper.CTX_KEY_SORT_RESULTS, - false - ) - ), + newSubquery, resource, runner, context ); - final Sequence finalizingResults; if (QueryContexts.isFinalize(subquery, false)) { finalizingResults = new MappedSequence<>( @@ -215,13 +214,49 @@ private Sequence mergeGroupByResults( } else { finalizingResults = subqueryResult; } - - return groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults); + return groupByStrategy.processSubqueryResult(newSubquery, query, resource, finalizingResults); } else { return groupByStrategy.mergeResults(runner, query, context); } } + public static boolean shouldPushDownQuery(GroupByQuery q) + { + return QueryContexts.parseBoolean(q, GroupByQueryConfig.CTX_KEY_FORCE_PUSH_DOWN_NESTED_QUERY, false); + } + + private static GroupByQuery getInnerMostQuery(GroupByQuery query) + { + if (query.getDataSource() instanceof TableDataSource) { + return query; + } + QueryDataSource queryDataSource = (QueryDataSource) query.getDataSource(); + // Make sure that we pass down the context values to the inner most query + final Map newSubQueryContext = getSubqueryContext(query, queryDataSource); + GroupByQuery subquery = (GroupByQuery) queryDataSource.getQuery(); + subquery = subquery.withOverriddenContext(newSubQueryContext); + return getInnerMostQuery(subquery); + } + + private static Map getSubqueryContext(GroupByQuery query, QueryDataSource dataSource) + { + final Map subqueryContext = Maps.newTreeMap(); + // Inject outer query context keys into subquery if they don't already exist in the subquery context. + // Unlike withOverriddenContext's normal behavior, we want keys present in the subquery to win. + if (query.getContext() != null) { + for (Map.Entry entry : query.getContext().entrySet()) { + if (entry.getValue() != null) { + subqueryContext.put(entry.getKey(), entry.getValue()); + } + } + } + if (dataSource.getQuery().getContext() != null) { + subqueryContext.putAll(dataSource.getQuery().getContext()); + } + subqueryContext.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false); + return subqueryContext; + } + @Override public GroupByQueryMetrics makeMetrics(GroupByQuery query) { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 8be060028e9d..d53c1227b4c8 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -49,6 +49,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -67,7 +68,8 @@ public static Sequence process( final GroupByQueryResource resource, final ObjectMapper spillMapper, final String processingTmpDir, - final int mergeBufferSize + final int mergeBufferSize, + final boolean wasQueryPushedDown ) { final GroupByQuery query = (GroupByQuery) queryParam; @@ -75,7 +77,8 @@ public static Sequence process( final AggregatorFactory[] aggregatorFactories = new AggregatorFactory[query.getAggregatorSpecs().size()]; for (int i = 0; i < query.getAggregatorSpecs().size(); i++) { - aggregatorFactories[i] = query.getAggregatorSpecs().get(i); + AggregatorFactory af = query.getAggregatorSpecs().get(i); + aggregatorFactories[i] = wasQueryPushedDown ? af.getCombiningFactory() : af; } @@ -99,7 +102,17 @@ public static Sequence process( ? BooleanValueMatcher.of(true) : filter.makeMatcher(columnSelectorFactory); - final FilteredSequence filteredSequence = new FilteredSequence<>( + final FilteredSequence filteredSequence = wasQueryPushedDown ? new FilteredSequence<>( + rows, + new Predicate() + { + @Override + public boolean apply(@Nullable Row row) + { + return true; // nothing to filter since it has already been done on the push down + } + } + ) : new FilteredSequence<>( rows, new Predicate() { @@ -143,7 +156,7 @@ public CloseableGrouperIterator make() Pair, Accumulator> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( query, - true, + true, //todo: samarth think about this attribute. I think this might be false in our case? rowSignature, querySpecificConfig, new Supplier() diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java index 4ef2f26681d9..eb596ef80c58 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/SpillingGrouper.java @@ -71,7 +71,7 @@ public class SpillingGrouper implements Grouper private final List files = Lists.newArrayList(); private final List dictionaryFiles = Lists.newArrayList(); - private final boolean sortHasNonGroupingFields; + private final boolean sortHasNonGroupingFields; //TODO: samarth what to do when there is sorting present different from grouping private boolean spillingAllowed = false; diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 5ad75004b760..985b4c2038b0 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -71,7 +71,7 @@ import java.util.List; import java.util.Map; -public class GroupByStrategyV2 implements GroupByStrategy +public class GroupByStrategyV2 implements GroupByStrategy { public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp"; public static final String CTX_KEY_OUTERMOST = "groupByOutermost"; @@ -209,6 +209,10 @@ public Sequence mergeResults( final Map responseContext ) { + final boolean pushDownQuery = query.getPushedDownQuery() != null; + //TODO: samarth I don't think we need to worry about queryToUse here + final GroupByQuery queryToUse = pushDownQuery ? query.getPushedDownQuery() : query; + // Merge streams using ResultMergeQueryRunner, then apply postaggregators, then apply limit (which may // involve materialization) final ResultMergeQueryRunner mergingQueryRunner = new ResultMergeQueryRunner(baseRunner) @@ -216,13 +220,13 @@ public Sequence mergeResults( @Override protected Ordering makeOrdering(Query queryParam) { - return ((GroupByQuery) queryParam).getRowOrdering(true); + return (queryToUse).getRowOrdering(true); } @Override protected BinaryFn createMergeFn(Query queryParam) { - return new GroupByBinaryFnV2((GroupByQuery) queryParam); + return new GroupByBinaryFnV2(queryToUse); } }; @@ -241,7 +245,8 @@ protected BinaryFn createMergeFn(Query queryParam) // Don't do "having" clause until the end of this method. null, query.getLimitSpec(), - query.getContext() + query.getContext(), + query.getPushedDownQuery() ).withOverriddenContext( ImmutableMap.of( "finalize", false, @@ -264,23 +269,22 @@ protected BinaryFn createMergeFn(Query queryParam) public Row apply(final Row row) { // Apply postAggregators and fudgeTimestamp if present and if this is the outermost mergeResults. - - if (!query.getContextBoolean(CTX_KEY_OUTERMOST, true)) { + if (!queryToUse.getContextBoolean(CTX_KEY_OUTERMOST, true)) { return row; } - if (query.getPostAggregatorSpecs().isEmpty() && fudgeTimestamp == null) { + if (queryToUse.getPostAggregatorSpecs().isEmpty() && fudgeTimestamp == null) { return row; } final Map newMap; - if (query.getPostAggregatorSpecs().isEmpty()) { + if (queryToUse.getPostAggregatorSpecs().isEmpty()) { newMap = ((MapBasedRow) row).getEvent(); } else { newMap = Maps.newLinkedHashMap(((MapBasedRow) row).getEvent()); - for (PostAggregator postAggregator : query.getPostAggregatorSpecs()) { + for (PostAggregator postAggregator : queryToUse.getPostAggregatorSpecs()) { newMap.put(postAggregator.getName(), postAggregator.compute(newMap)); } } @@ -290,9 +294,9 @@ public Row apply(final Row row) } ); - // Don't apply limit here for inner results, that will be pushed down to the BufferHashGrouper - if (query.getContextBoolean(CTX_KEY_OUTERMOST, true)) { - return query.postProcess(rowSequence); + // Don't apply limit here for inner results, that will be pushed down to the BufferGrouper + if (queryToUse.getContextBoolean(CTX_KEY_OUTERMOST, true)) { + return queryToUse.postProcess(rowSequence); } else { return rowSequence; } @@ -306,15 +310,18 @@ public Sequence processSubqueryResult( Sequence subqueryResult ) { + boolean wasQueryPushedDown = subquery.getPushedDownQuery() != null; final Sequence results = GroupByRowProcessor.process( query, subqueryResult, - GroupByQueryHelper.rowSignatureFor(subquery), + // Use the nestedQuery to obtain the row signature if it was pushed down to the historical nodes + GroupByQueryHelper.rowSignatureFor(wasQueryPushedDown ? subquery.getPushedDownQuery() : subquery), configSupplier.get(), resource, spillMapper, processingConfig.getTmpDir(), - processingConfig.intermediateComputeSizeBytes() + processingConfig.intermediateComputeSizeBytes(), + wasQueryPushedDown ); return mergeResults(new QueryRunner() { diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 5399a01de593..b85321102b14 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -63,6 +63,7 @@ import io.druid.query.ResourceLimitExceededException; import io.druid.query.Result; import io.druid.query.aggregation.MetricManipulatorFns; +import io.druid.query.groupby.GroupByQueryHelper; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.handler.codec.http.HttpChunk; @@ -168,7 +169,7 @@ public int getNumOpenConnections() @Override public Sequence run(final QueryPlus queryPlus, final Map context) { - final Query query = queryPlus.getQuery(); + final Query query = GroupByQueryHelper.getPushedDownQueryIfPresent(queryPlus); QueryToolChest> toolChest = warehouse.getToolChest(query); boolean isBySegment = QueryContexts.isBySegment(query); diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 8c1208709116..46e6abf10695 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -43,6 +43,8 @@ import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; +import io.druid.query.groupby.GroupByQuery; + import io.druid.server.metrics.QueryCountStatsProvider; import io.druid.server.security.Access; import io.druid.server.security.AuthConfig; diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 39da89d03fd3..a31ed2dba593 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -40,6 +40,7 @@ import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; +import io.druid.query.QueryDataSource; import io.druid.query.QueryMetrics; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; @@ -50,6 +51,7 @@ import io.druid.query.ReportTimelineMissingSegmentQueryRunner; import io.druid.query.SegmentDescriptor; import io.druid.query.TableDataSource; +import io.druid.query.groupby.GroupByQuery; import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.ReferenceCountingSegment; @@ -121,6 +123,9 @@ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable