Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
Expand All @@ -57,6 +59,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
*/
Expand Down Expand Up @@ -106,6 +109,11 @@ public GroupByQuery(
Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);

// Verify no duplicate names between dimensions, aggregators, and postAggregators.
// They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other.
// We're not counting __time, even though that name is problematic. See: https://github.com/druid-io/druid/pull/3684
verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);

Function<Sequence<Row>, Sequence<Row>> postProcFn =
this.limitSpec.build(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);

Expand Down Expand Up @@ -436,6 +444,32 @@ public GroupByQuery withPostAggregatorSpecs(final List<PostAggregator> postAggre
);
}

private static void verifyOutputNames(
List<DimensionSpec> dimensions,
List<AggregatorFactory> aggregators,
List<PostAggregator> postAggregators
)
{
final Set<String> outputNames = Sets.newHashSet();
for (DimensionSpec dimension : dimensions) {
if (!outputNames.add(dimension.getOutputName())) {
throw new IAE("Duplicate output name[%s]", dimension.getOutputName());
}
}

for (AggregatorFactory aggregator : aggregators) {
if (!outputNames.add(aggregator.getName())) {
throw new IAE("Duplicate output name[%s]", aggregator.getName());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change log message to "Duplicate Aggregator name[%s]"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that message would be misleading - the duplicate name might be a dimension and aggregator with the same name, not two aggregators.

}
}

for (PostAggregator postAggregator : postAggregators) {
if (!outputNames.add(postAggregator.getName())) {
throw new IAE("Duplicate output name[%s]", postAggregator.getName());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate postagg name ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to above

}
}
}

public static class Builder
{
private DataSource dataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow;
Expand All @@ -37,7 +36,6 @@
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
Expand All @@ -48,7 +46,6 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

public class GroupByQueryHelper
Expand Down Expand Up @@ -95,20 +92,10 @@ public String apply(DimensionSpec input)

final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true);

// All groupBy dimensions are strings, for now, as long as they don't conflict with any non-dimensions.
// This should get cleaned up if/when https://github.com/druid-io/druid/pull/3686 makes name conflicts impossible.
final Set<String> otherNames = Sets.newHashSet();
for (AggregatorFactory agg : aggs) {
otherNames.add(agg.getName());
}
for (PostAggregator postAggregator : query.getPostAggregatorSpecs()) {
otherNames.add(postAggregator.getName());
}
// All groupBy dimensions are strings, for now.
final List<DimensionSchema> dimensionSchemas = Lists.newArrayList();
for (DimensionSpec dimension : query.getDimensions()) {
if (!otherNames.contains(dimension.getOutputName())) {
dimensionSchemas.add(new StringDimensionSchema(dimension.getOutputName()));
}
dimensionSchemas.add(new StringDimensionSchema(dimension.getOutputName()));
}

final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.druid.query.QueryWatcher;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine;
Expand Down Expand Up @@ -135,8 +136,21 @@ public Sequence<Row> processSubqueryResult(
// multiple columns of the same name using different aggregator types and will fail. Here, we permit multiple
// aggregators of the same type referencing the same fieldName (and skip creating identical columns for the
// subsequent ones) and return an error if the aggregator types are different.
final Set<String> dimensionNames = Sets.newHashSet();
for (DimensionSpec dimension : subquery.getDimensions()) {
dimensionNames.add(dimension.getOutputName());
}
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
for (final AggregatorFactory transferAgg : aggregatorFactory.getRequiredColumns()) {
if (dimensionNames.contains(transferAgg.getName())) {
// This transferAgg is already represented in the subquery's dimensions. Assume that the outer aggregator
// *probably* wants the dimension and just ignore it. This is a gross workaround for cases like having
// a cardinality aggregator in the outer query. It is necessary because what this block of code is trying to
// do is use aggregators to "transfer" values from the inner results to an incremental index, but aggregators
// can't transfer all kinds of values (strings are a common one). If you don't like it, use groupBy v2, which
// doesn't have this problem.
continue;
}
if (Iterables.any(aggs, new Predicate<AggregatorFactory>()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
Expand Down Expand Up @@ -379,6 +378,27 @@ public void testGroupBy()
TestHelper.assertExpectedObjects(expectedResults, results, "");
}

@Test
public void testGroupByWithOutputNameCollisions()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Duplicate output name[alias]");

GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("alias", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
}

@Test
public void testGroupByNoAggregators()
{
Expand Down Expand Up @@ -4965,23 +4985,11 @@ public void testSubqueryWithOuterCardinalityAggregator()
.setGranularity(QueryRunnerTestHelper.allGran)
.build();

// v1 strategy would throw an exception for this because it calls AggregatorFactory.getRequiredColumns to get
// aggregator for all fields to build the inner query result incremental index. In this case, quality is a string
// field but getRequiredColumn() returned a Cardinality aggregator for it, which has type hyperUnique.
// The "quality" column is interpreted as a dimension because it appears in the dimension list of the
// MapBasedInputRows from the subquery, but the COMPLEX type from the agg overrides the actual string type.
// COMPLEX is not currently supported as a dimension type, so IAE is thrown. Even if it were, the actual string
// values in the "quality" column could not be interpreted as hyperUniques.
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
expectedException.expect(IAE.class);
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} else {
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01", "car", QueryRunnerTestHelper.UNIQUES_9)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01", "car", QueryRunnerTestHelper.UNIQUES_9)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}

@Test
Expand Down Expand Up @@ -5066,43 +5074,29 @@ public void testSubqueryWithOuterJavascriptAggregators()
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();

// v1 strategy would throw an exception for this because it calls AggregatorFactory.getRequiredColumns to get
// aggregator for all fields to build the inner query result incremental index. In this case, market is a string
// field but getRequiredColumn() returned a Javascript aggregator for it, which has type float.
// The "market" column is interpreted as a dimension because it appears in the dimension list of the
// MapBasedInputRows from the subquery, but the float type from the agg overrides the actual string type.
// Float is not currently supported as a dimension type, so IAE is thrown. Even if it were, a ParseException
// would occur because the "market" column really contains non-numeric values.
// Additionally, the V1 strategy always uses "combining" aggregator factories (meant for merging) on the subquery,
// which does not work for this particular javascript agg.
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
expectedException.expect(IAE.class);
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} else {
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "automotive", "js_agg", 139D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "business", "js_agg", 122D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "entertainment", "js_agg", 162D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "health", "js_agg", 124D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "mezzanine", "js_agg", 2893D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "news", "js_agg", 125D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "premium", "js_agg", 2923D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "technology", "js_agg", 82D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "travel", "js_agg", 123D),

GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "automotive", "js_agg", 151D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "business", "js_agg", 116D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "entertainment", "js_agg", 170D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "health", "js_agg", 117D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "mezzanine", "js_agg", 2470D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "news", "js_agg", 118D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "premium", "js_agg", 2528D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "technology", "js_agg", 101D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "travel", "js_agg", 130D)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "automotive", "js_agg", 139D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "business", "js_agg", 122D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "entertainment", "js_agg", 162D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "health", "js_agg", 124D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "mezzanine", "js_agg", 2893D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "news", "js_agg", 125D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "premium", "js_agg", 2923D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "technology", "js_agg", 82D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "quality", "travel", "js_agg", 123D),

GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "automotive", "js_agg", 151D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "business", "js_agg", 116D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "entertainment", "js_agg", 170D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "health", "js_agg", 117D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "mezzanine", "js_agg", 2470D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "news", "js_agg", 118D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "premium", "js_agg", 2528D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "technology", "js_agg", 101D),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "quality", "travel", "js_agg", 130D)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}

@Test
Expand Down