From 6bff8aa3c3a2f3a48c7c209d3e4bdd37f5ee6acd Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 12 Nov 2016 14:33:06 -0800 Subject: [PATCH] GroupBy: Validation of output names, and a gross hack for v1 subqueries. v1 subqueries try to use aggregators to "transfer" values from the inner results to an incremental index, but aggregators can't transfer all kinds of values (strings are a common one). This is a workaround that selectively ignores what the outer aggregators ask for and instead assumes that we know best. These are in the same commit because the name validation changed the kinds of errors that were thrown by v1 subqueries. --- .../io/druid/query/groupby/GroupByQuery.java | 34 ++++++ .../query/groupby/GroupByQueryHelper.java | 17 +-- .../groupby/strategy/GroupByStrategyV1.java | 14 +++ .../query/groupby/GroupByQueryRunnerTest.java | 104 +++++++++--------- 4 files changed, 99 insertions(+), 70 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index bbc0ff6a4962..30e223d5321c 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -28,9 +28,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; import com.google.common.primitives.Longs; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -57,6 +59,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Set; /** */ @@ -106,6 +109,11 @@ public GroupByQuery( Preconditions.checkNotNull(this.granularity, "Must specify a granularity"); Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs); + // Verify no duplicate names between dimensions, aggregators, and postAggregators. + // They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other. + // We're not counting __time, even though that name is problematic. See: https://github.com/druid-io/druid/pull/3684 + verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs); + Function, Sequence> postProcFn = this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs); @@ -436,6 +444,32 @@ public GroupByQuery withPostAggregatorSpecs(final List postAggre ); } + private static void verifyOutputNames( + List dimensions, + List aggregators, + List postAggregators + ) + { + final Set outputNames = Sets.newHashSet(); + for (DimensionSpec dimension : dimensions) { + if (!outputNames.add(dimension.getOutputName())) { + throw new IAE("Duplicate output name[%s]", dimension.getOutputName()); + } + } + + for (AggregatorFactory aggregator : aggregators) { + if (!outputNames.add(aggregator.getName())) { + throw new IAE("Duplicate output name[%s]", aggregator.getName()); + } + } + + for (PostAggregator postAggregator : postAggregators) { + if (!outputNames.add(postAggregator.getName())) { + throw new IAE("Duplicate output name[%s]", postAggregator.getName()); + } + } + } + public static class Builder { private DataSource dataSource; diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 0869b39387ff..880150bd7d13 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -21,7 +21,6 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedRow; @@ -37,7 +36,6 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.ResourceLimitExceededException; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; @@ -48,7 +46,6 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Queue; -import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; public class GroupByQueryHelper @@ -95,20 +92,10 @@ public String apply(DimensionSpec input) final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true); - // All groupBy dimensions are strings, for now, as long as they don't conflict with any non-dimensions. - // This should get cleaned up if/when https://github.com/druid-io/druid/pull/3686 makes name conflicts impossible. - final Set otherNames = Sets.newHashSet(); - for (AggregatorFactory agg : aggs) { - otherNames.add(agg.getName()); - } - for (PostAggregator postAggregator : query.getPostAggregatorSpecs()) { - otherNames.add(postAggregator.getName()); - } + // All groupBy dimensions are strings, for now. final List dimensionSchemas = Lists.newArrayList(); for (DimensionSpec dimension : query.getDimensions()) { - if (!otherNames.contains(dimension.getOutputName())) { - dimensionSchemas.add(new StringDimensionSchema(dimension.getOutputName())); - } + dimensionSchemas.add(new StringDimensionSchema(dimension.getOutputName())); } final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java index cdf926b7cc5d..9660a8c2bb7a 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV1.java @@ -41,6 +41,7 @@ import io.druid.query.QueryWatcher; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; +import io.druid.query.dimension.DimensionSpec; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryEngine; @@ -135,8 +136,21 @@ public Sequence processSubqueryResult( // multiple columns of the same name using different aggregator types and will fail. Here, we permit multiple // aggregators of the same type referencing the same fieldName (and skip creating identical columns for the // subsequent ones) and return an error if the aggregator types are different. + final Set dimensionNames = Sets.newHashSet(); + for (DimensionSpec dimension : subquery.getDimensions()) { + dimensionNames.add(dimension.getOutputName()); + } for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) { for (final AggregatorFactory transferAgg : aggregatorFactory.getRequiredColumns()) { + if (dimensionNames.contains(transferAgg.getName())) { + // This transferAgg is already represented in the subquery's dimensions. Assume that the outer aggregator + // *probably* wants the dimension and just ignore it. This is a gross workaround for cases like having + // a cardinality aggregator in the outer query. It is necessary because what this block of code is trying to + // do is use aggregators to "transfer" values from the inner results to an incremental index, but aggregators + // can't transfer all kinds of values (strings are a common one). If you don't like it, use groupBy v2, which + // doesn't have this problem. + continue; + } if (Iterables.any(aggs, new Predicate() { @Override diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 6075f114bc16..0958cd3cb9cd 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -35,7 +35,6 @@ import io.druid.granularity.PeriodGranularity; import io.druid.granularity.QueryGranularities; import io.druid.jackson.DefaultObjectMapper; -import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.MergeSequence; import io.druid.java.util.common.guava.Sequence; @@ -379,6 +378,27 @@ public void testGroupBy() TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testGroupByWithOutputNameCollisions() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Duplicate output name[alias]"); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("alias", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + } + @Test public void testGroupByNoAggregators() { @@ -4965,23 +4985,11 @@ public void testSubqueryWithOuterCardinalityAggregator() .setGranularity(QueryRunnerTestHelper.allGran) .build(); - // v1 strategy would throw an exception for this because it calls AggregatorFactory.getRequiredColumns to get - // aggregator for all fields to build the inner query result incremental index. In this case, quality is a string - // field but getRequiredColumn() returned a Cardinality aggregator for it, which has type hyperUnique. - // The "quality" column is interpreted as a dimension because it appears in the dimension list of the - // MapBasedInputRows from the subquery, but the COMPLEX type from the agg overrides the actual string type. - // COMPLEX is not currently supported as a dimension type, so IAE is thrown. Even if it were, the actual string - // values in the "quality" column could not be interpreted as hyperUniques. - if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) { - expectedException.expect(IAE.class); - GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); - } else { - List expectedResults = Arrays.asList( - GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01", "car", QueryRunnerTestHelper.UNIQUES_9) - ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); - TestHelper.assertExpectedObjects(expectedResults, results, ""); - } + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01", "car", QueryRunnerTestHelper.UNIQUES_9) + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); } @Test @@ -5066,43 +5074,29 @@ public void testSubqueryWithOuterJavascriptAggregators() .setGranularity(QueryRunnerTestHelper.dayGran) .build(); - // v1 strategy would throw an exception for this because it calls AggregatorFactory.getRequiredColumns to get - // aggregator for all fields to build the inner query result incremental index. In this case, market is a string - // field but getRequiredColumn() returned a Javascript aggregator for it, which has type float. - // The "market" column is interpreted as a dimension because it appears in the dimension list of the - // MapBasedInputRows from the subquery, but the float type from the agg overrides the actual string type. - // Float is not currently supported as a dimension type, so IAE is thrown. Even if it were, a ParseException - // would occur because the "market" column really contains non-numeric values. - // Additionally, the V1 strategy always uses "combining" aggregator factories (meant for merging) on the subquery, - // which does not work for this particular javascript agg. - if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) { - expectedException.expect(IAE.class); - GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); - } else { - List expectedResults = Arrays.asList( - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "automotive", "js_agg", 139D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "business", "js_agg", 122D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "entertainment", "js_agg", 162D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "health", "js_agg", 124D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "mezzanine", "js_agg", 2893D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "news", "js_agg", 125D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "premium", "js_agg", 2923D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "technology", "js_agg", 82D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "travel", "js_agg", 123D), - - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "automotive", "js_agg", 151D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "business", "js_agg", 116D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "entertainment", "js_agg", 170D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "health", "js_agg", 117D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "mezzanine", "js_agg", 2470D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "news", "js_agg", 118D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "premium", "js_agg", 2528D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "technology", "js_agg", 101D), - GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "travel", "js_agg", 130D) - ); - Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); - TestHelper.assertExpectedObjects(expectedResults, results, ""); - } + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "automotive", "js_agg", 139D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "business", "js_agg", 122D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "entertainment", "js_agg", 162D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "health", "js_agg", 124D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "mezzanine", "js_agg", 2893D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "news", "js_agg", 125D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "premium", "js_agg", 2923D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "technology", "js_agg", 82D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "travel", "js_agg", 123D), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "automotive", "js_agg", 151D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "business", "js_agg", 116D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "entertainment", "js_agg", 170D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "health", "js_agg", 117D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "mezzanine", "js_agg", 2470D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "news", "js_agg", 118D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "premium", "js_agg", 2528D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "technology", "js_agg", 101D), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "travel", "js_agg", 130D) + ); + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); } @Test