diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java index 373f7a30cd4c..9d13f0598062 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java @@ -25,6 +25,7 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.primitives.Doubles; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -95,6 +96,12 @@ public String getName() return name; } + @Override + public PostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty public PostAggregator getField() { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java index 40849ce37a40..1aa490b5eefb 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java @@ -25,6 +25,7 @@ import com.yahoo.sketches.Util; import io.druid.java.util.common.IAE; import io.druid.java.util.common.logger.Logger; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -95,6 +96,12 @@ public String getName() return name; } + @Override + public PostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty public String getFunc() { diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/BucketsPostAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/BucketsPostAggregator.java index f117f507f145..56372d357b6e 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/BucketsPostAggregator.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/BucketsPostAggregator.java @@ -25,6 +25,8 @@ import com.google.common.collect.Sets; import io.druid.java.util.common.IAE; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; import java.util.Map; import java.util.Set; @@ -67,6 +69,12 @@ public Object compute(Map values) return ah.toHistogram(bucketSize, offset); } + @Override + public PostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty public float getBucketSize() { diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/CustomBucketsPostAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/CustomBucketsPostAggregator.java index aa958601a2ad..f312369ff2ae 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/CustomBucketsPostAggregator.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/CustomBucketsPostAggregator.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.collect.Sets; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; import java.util.Arrays; import java.util.Map; @@ -59,6 +61,12 @@ public Object compute(Map values) return ah.toHistogram(breaks); } + @Override + public PostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty public float[] getBreaks() { diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/EqualBucketsPostAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/EqualBucketsPostAggregator.java index 60e9de60e81a..0afe3b7383a0 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/EqualBucketsPostAggregator.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/EqualBucketsPostAggregator.java @@ -25,6 +25,8 @@ import com.google.common.collect.Sets; import io.druid.java.util.common.IAE; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; import java.util.Map; import java.util.Set; @@ -63,6 +65,12 @@ public Object compute(Map values) return ah.toHistogram(numBuckets); } + @Override + public PostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty public int getNumBuckets() { diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/MaxPostAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/MaxPostAggregator.java index 120f4bb9f088..a0bdb6623469 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/MaxPostAggregator.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/MaxPostAggregator.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.collect.Sets; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; import java.util.Map; @@ -71,6 +73,12 @@ public Object compute(Map values) return ah.getMax(); } + @Override + public PostAggregator decorate(Map aggregators) + { + return this; + } + @Override public String toString() { diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/MinPostAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/MinPostAggregator.java index fbf030c6a65a..35f090895030 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/MinPostAggregator.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/MinPostAggregator.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.collect.Sets; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; import java.util.Map; @@ -71,6 +73,12 @@ public Object compute(Map values) return ah.getMin(); } + @Override + public PostAggregator decorate(Map aggregators) + { + return this; + } + @Override public String toString() { diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilePostAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilePostAggregator.java index 7eefe04ff59f..1406379b77de 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilePostAggregator.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilePostAggregator.java @@ -25,6 +25,8 @@ import com.google.common.collect.Sets; import io.druid.java.util.common.IAE; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; import java.util.Map; @@ -80,6 +82,12 @@ public Object compute(Map values) return ah.getQuantiles(new float[]{this.getProbability()})[0]; } + @Override + public PostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty public float getProbability() { diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilesPostAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilesPostAggregator.java index 7999a45f341a..1387d97aa0ef 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilesPostAggregator.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/QuantilesPostAggregator.java @@ -25,6 +25,8 @@ import com.google.common.collect.Sets; import io.druid.java.util.common.IAE; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; import java.util.Arrays; import java.util.Comparator; @@ -75,6 +77,12 @@ public Object compute(Map values) return new Quantiles(this.getProbabilities(), ah.getQuantiles(this.getProbabilities()), ah.getMin(), ah.getMax()); } + @Override + public PostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty public float[] getProbabilities() { diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java index 2bdcb0da9c91..e4ea6838ecde 100644 --- a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/StandardDeviationPostAggregator.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.post.ArithmeticPostAggregator; @@ -80,6 +81,12 @@ public String getName() return name; } + @Override + public PostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty("fieldName") public String getFieldName() { diff --git a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java index 57fc798a6f40..fad8ba7eb524 100644 --- a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java +++ b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java @@ -48,6 +48,7 @@ import io.druid.query.aggregation.post.DoubleGreatestPostAggregator; import io.druid.query.aggregation.post.DoubleLeastPostAggregator; import io.druid.query.aggregation.post.FieldAccessPostAggregator; +import io.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator; import io.druid.query.aggregation.post.JavaScriptPostAggregator; import io.druid.query.aggregation.post.ExpressionPostAggregator; import io.druid.query.aggregation.post.LongGreatestPostAggregator; @@ -98,6 +99,7 @@ public static interface AggregatorFactoryMixin @JsonSubTypes.Type(name = "expression", value = ExpressionPostAggregator.class), @JsonSubTypes.Type(name = "arithmetic", value = ArithmeticPostAggregator.class), @JsonSubTypes.Type(name = "fieldAccess", value = FieldAccessPostAggregator.class), + @JsonSubTypes.Type(name = "finalizingFieldAccess", value = FinalizingFieldAccessPostAggregator.class), @JsonSubTypes.Type(name = "constant", value = ConstantPostAggregator.class), @JsonSubTypes.Type(name = "javascript", value = JavaScriptPostAggregator.class), @JsonSubTypes.Type(name = "hyperUniqueCardinality", value = HyperUniqueFinalizingPostAggregator.class), diff --git a/processing/src/main/java/io/druid/query/Queries.java b/processing/src/main/java/io/druid/query/Queries.java index 26afcc1bfc8c..d62050965b42 100644 --- a/processing/src/main/java/io/druid/query/Queries.java +++ b/processing/src/main/java/io/druid/query/Queries.java @@ -20,33 +20,49 @@ package io.druid.query; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import java.util.List; +import java.util.Map; import java.util.Set; /** */ public class Queries { - public static void verifyAggregations( + public static List decoratePostAggregators(List postAggs, + Map aggFactories) + { + List decorated = Lists.newArrayListWithExpectedSize(postAggs.size()); + for (PostAggregator aggregator : postAggs) { + decorated.add(aggregator.decorate(aggFactories)); + } + return decorated; + } + + public static List prepareAggregations( List aggFactories, List postAggs ) { Preconditions.checkNotNull(aggFactories, "aggregations cannot be null"); - final Set aggNames = Sets.newHashSet(); + final Map aggsFactoryMap = Maps.newHashMap(); for (AggregatorFactory aggFactory : aggFactories) { - Preconditions.checkArgument(aggNames.add(aggFactory.getName()), "[%s] already defined", aggFactory.getName()); + Preconditions.checkArgument(!aggsFactoryMap.containsKey(aggFactory.getName()), + "[%s] already defined", aggFactory.getName()); + aggsFactoryMap.put(aggFactory.getName(), aggFactory); } if (postAggs != null && !postAggs.isEmpty()) { - final Set combinedAggNames = Sets.newHashSet(aggNames); + final Set combinedAggNames = Sets.newHashSet(aggsFactoryMap.keySet()); - for (PostAggregator postAgg : postAggs) { + List decorated = Lists.newArrayListWithExpectedSize(postAggs.size()); + for (final PostAggregator postAgg : postAggs) { final Set dependencies = postAgg.getDependentFields(); final Set missing = Sets.difference(dependencies, combinedAggNames); @@ -54,8 +70,14 @@ public static void verifyAggregations( missing.isEmpty(), "Missing fields [%s] for postAggregator [%s]", missing, postAgg.getName() ); - Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()), "[%s] already defined", postAgg.getName()); + Preconditions.checkArgument(combinedAggNames.add(postAgg.getName()), + "[%s] already defined", postAgg.getName()); + + decorated.add(postAgg.decorate(aggsFactoryMap)); } + return decorated; } + + return postAggs; } } diff --git a/processing/src/main/java/io/druid/query/aggregation/PostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/PostAggregator.java index d3037e9ec2e9..e4af91bdd034 100644 --- a/processing/src/main/java/io/druid/query/aggregation/PostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/PostAggregator.java @@ -35,4 +35,13 @@ public interface PostAggregator public Object compute(Map combinedAggregators); public String getName(); + + /** + * Returns a richer post aggregator which are built from the given aggregators with their names and some accessible + * environmental variables such as ones in the object scope. + * + * @param aggregators A map of aggregator factories with their names. + * + */ + public PostAggregator decorate(Map aggregators); } diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregator.java index e84bb5ab5fcf..0bbbae1f7c97 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregator.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -84,6 +85,12 @@ public String getName() return name; } + @Override + public HyperUniqueFinalizingPostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty("fieldName") public String getFieldName() { diff --git a/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java index 989118441cb5..25ab4456989d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/ArithmeticPostAggregator.java @@ -25,6 +25,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.druid.java.util.common.IAE; +import io.druid.query.Queries; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -52,6 +54,7 @@ public int compare(Object o, Object o1) private final Ops op; private final Comparator comparator; private final String ordering; + private Map aggFactoryMap; public ArithmeticPostAggregator( String name, @@ -123,6 +126,12 @@ public String getName() return name; } + @Override + public ArithmeticPostAggregator decorate(Map aggregators) + { + return new ArithmeticPostAggregator(name, fnName, Queries.decoratePostAggregators(fields, aggregators), ordering); + } + @JsonProperty("fn") public String getFnName() { diff --git a/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java index d0a482c9bce7..1f6323a7b966 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/ConstantPostAggregator.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -79,6 +80,12 @@ public String getName() return name; } + @Override + public ConstantPostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty("value") public Number getConstantValue() { diff --git a/processing/src/main/java/io/druid/query/aggregation/post/DoubleGreatestPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/DoubleGreatestPostAggregator.java index 8a238847d163..7d4d0cb1e3c4 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/DoubleGreatestPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/DoubleGreatestPostAggregator.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; +import io.druid.query.Queries; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -98,6 +100,12 @@ public String getName() return name; } + @Override + public DoubleGreatestPostAggregator decorate(Map aggregators) + { + return new DoubleGreatestPostAggregator(name, Queries.decoratePostAggregators(fields, aggregators)); + } + @JsonProperty public List getFields() { diff --git a/processing/src/main/java/io/druid/query/aggregation/post/DoubleLeastPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/DoubleLeastPostAggregator.java index 88e4f765fed0..a49cc643135d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/DoubleLeastPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/DoubleLeastPostAggregator.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; +import io.druid.query.Queries; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -98,6 +100,12 @@ public String getName() return name; } + @Override + public DoubleLeastPostAggregator decorate(Map aggregators) + { + return new DoubleLeastPostAggregator(name, Queries.decoratePostAggregators(fields, aggregators)); + } + @JsonProperty public List getFields() { diff --git a/processing/src/main/java/io/druid/query/aggregation/post/ExpressionPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/ExpressionPostAggregator.java index b917869cd525..5fd394fd7694 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/ExpressionPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/ExpressionPostAggregator.java @@ -25,6 +25,7 @@ import com.google.common.collect.Sets; import io.druid.math.expr.Expr; import io.druid.math.expr.Parser; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -105,6 +106,12 @@ public String getName() return name; } + @Override + public ExpressionPostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty("expression") public String getExpression() { diff --git a/processing/src/main/java/io/druid/query/aggregation/post/FieldAccessPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/FieldAccessPostAggregator.java index 64b369366498..138b9b82ac7e 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/FieldAccessPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/FieldAccessPostAggregator.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Sets; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -70,6 +71,12 @@ public String getName() return name; } + @Override + public FieldAccessPostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty public String getFieldName() { diff --git a/processing/src/main/java/io/druid/query/aggregation/post/FinalizingFieldAccessPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/FinalizingFieldAccessPostAggregator.java new file mode 100644 index 000000000000..8c2fcd63f140 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/post/FinalizingFieldAccessPostAggregator.java @@ -0,0 +1,156 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.post; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.PostAggregator; + +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +public class FinalizingFieldAccessPostAggregator implements PostAggregator +{ + private final String name; + private final String fieldName; + + @JsonCreator + public FinalizingFieldAccessPostAggregator( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName + ) + { + this.name = name; + this.fieldName = fieldName; + } + + @Override + public Set getDependentFields() + { + return Sets.newHashSet(fieldName); + } + + @Override + public Comparator getComparator() + { + throw new UnsupportedOperationException(); + } + + @Override + public Object compute(Map combinedAggregators) + { + throw new UnsupportedOperationException("No decorated"); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @Override + public FinalizingFieldAccessPostAggregator decorate(final Map aggregators) + { + return new FinalizingFieldAccessPostAggregator(name, fieldName) { + + @Override + public Comparator getComparator() + { + if (aggregators != null && aggregators.containsKey(fieldName)) { + return aggregators.get(fieldName).getComparator(); + } else { + return Ordering.natural().nullsFirst(); + } + } + + @Override + public Object compute(Map combinedAggregators) + { + if (aggregators != null && aggregators.containsKey(fieldName)) { + return aggregators.get(fieldName).finalizeComputation( + combinedAggregators.get(fieldName) + ); + } else { + return combinedAggregators.get(fieldName); + } + } + }; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Override + public String toString() + { + return "FinalizingFieldAccessPostAggregator{" + + "name='" + name + '\'' + + ", fieldName='" + fieldName + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FinalizingFieldAccessPostAggregator that = (FinalizingFieldAccessPostAggregator)o; + + if (fieldName != null ? !fieldName.equals(that.fieldName) : that.fieldName != null) { + return false; + } + if (name != null ? !name.equals(that.name) : that.name != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (fieldName != null ? fieldName.hashCode() : 0); + return result; + } + + @VisibleForTesting + static FinalizingFieldAccessPostAggregator buildDecorated(String name, + String fieldName, + Map aggregators) + { + FinalizingFieldAccessPostAggregator ret = new FinalizingFieldAccessPostAggregator(name, fieldName); + return ret.decorate(aggregators); + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/post/JavaScriptPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/JavaScriptPostAggregator.java index 866bb835b653..e775e527cd8d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/JavaScriptPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/JavaScriptPostAggregator.java @@ -26,6 +26,7 @@ import com.google.common.collect.Sets; import io.druid.java.util.common.ISE; import io.druid.js.JavaScriptConfig; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import org.mozilla.javascript.Context; import org.mozilla.javascript.ContextFactory; @@ -143,6 +144,12 @@ public String getName() return name; } + @Override + public JavaScriptPostAggregator decorate(Map aggregators) + { + return this; + } + @JsonProperty public List getFieldNames() { diff --git a/processing/src/main/java/io/druid/query/aggregation/post/LongGreatestPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/LongGreatestPostAggregator.java index 8f9e5cdf31be..b906911c4feb 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/LongGreatestPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/LongGreatestPostAggregator.java @@ -24,6 +24,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.primitives.Longs; +import io.druid.query.Queries; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -99,6 +101,12 @@ public String getName() return name; } + @Override + public LongGreatestPostAggregator decorate(Map aggregators) + { + return new LongGreatestPostAggregator(name, Queries.decoratePostAggregators(fields, aggregators)); + } + @JsonProperty public List getFields() { diff --git a/processing/src/main/java/io/druid/query/aggregation/post/LongLeastPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/post/LongLeastPostAggregator.java index 80f4325857cd..30932a706daa 100644 --- a/processing/src/main/java/io/druid/query/aggregation/post/LongLeastPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/post/LongLeastPostAggregator.java @@ -24,6 +24,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.primitives.Longs; +import io.druid.query.Queries; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -99,6 +101,12 @@ public String getName() return name; } + @Override + public LongLeastPostAggregator decorate(Map aggregators) + { + return new LongLeastPostAggregator(name, Queries.decoratePostAggregators(fields, aggregators)); + } + @JsonProperty public List getFields() { diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index c5a67d0a1781..742f578fc919 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -120,12 +120,15 @@ public GroupByQuery( Preconditions.checkArgument(spec != null, "dimensions has null DimensionSpec"); } this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs; - this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs; + this.postAggregatorSpecs = Queries.prepareAggregations( + this.aggregatorSpecs, + postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs + ); this.havingSpec = havingSpec; this.limitSpec = (limitSpec == null) ? new NoopLimitSpec() : limitSpec; Preconditions.checkNotNull(this.granularity, "Must specify a granularity"); - Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs); + // Verify no duplicate names between dimensions, aggregators, and postAggregators. // They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other. diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java index 964bf38fb9d1..dbf29dcfaf49 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java @@ -63,9 +63,11 @@ public TimeseriesQuery( this.dimFilter = dimFilter; this.granularity = granularity; this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs; - this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs; - - Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs); + this.postAggregatorSpecs = Queries.prepareAggregations(this.aggregatorSpecs, + postAggregatorSpecs == null + ? ImmutableList.of() + : postAggregatorSpecs + ); } @Override @@ -176,15 +178,15 @@ public TimeseriesQuery withDimFilter(DimFilter dimFilter) public String toString() { return "TimeseriesQuery{" + - "dataSource='" + getDataSource() + '\'' + - ", querySegmentSpec=" + getQuerySegmentSpec() + - ", descending=" + isDescending() + - ", dimFilter=" + dimFilter + - ", granularity='" + granularity + '\'' + - ", aggregatorSpecs=" + aggregatorSpecs + - ", postAggregatorSpecs=" + postAggregatorSpecs + - ", context=" + getContext() + - '}'; + "dataSource='" + getDataSource() + '\'' + + ", querySegmentSpec=" + getQuerySegmentSpec() + + ", descending=" + isDescending() + + ", dimFilter=" + dimFilter + + ", granularity='" + granularity + '\'' + + ", aggregatorSpecs=" + aggregatorSpecs + + ", postAggregatorSpecs=" + postAggregatorSpecs + + ", context=" + getContext() + + '}'; } @Override @@ -211,7 +213,9 @@ public boolean equals(Object o) if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) { return false; } - if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) : that.postAggregatorSpecs != null) { + if (postAggregatorSpecs != null + ? !postAggregatorSpecs.equals(that.postAggregatorSpecs) + : that.postAggregatorSpecs != null) { return false; } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQuery.java b/processing/src/main/java/io/druid/query/topn/TopNQuery.java index 2221e65d4863..9a75517852c5 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQuery.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQuery.java @@ -74,15 +74,17 @@ public TopNQuery( this.dimFilter = dimFilter; this.granularity = granularity; this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs; - this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.of() : postAggregatorSpecs; + this.postAggregatorSpecs = Queries.prepareAggregations(this.aggregatorSpecs, + postAggregatorSpecs == null + ? ImmutableList.of() + : postAggregatorSpecs + ); Preconditions.checkNotNull(dimensionSpec, "dimensionSpec can't be null"); Preconditions.checkNotNull(topNMetricSpec, "must specify a metric"); Preconditions.checkArgument(threshold != 0, "Threshold cannot be equal to 0."); topNMetricSpec.verifyPreconditions(this.aggregatorSpecs, this.postAggregatorSpecs); - - Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs); } @Override @@ -316,7 +318,9 @@ public boolean equals(Object o) if (threshold != topNQuery.threshold) { return false; } - if (aggregatorSpecs != null ? !aggregatorSpecs.equals(topNQuery.aggregatorSpecs) : topNQuery.aggregatorSpecs != null) { + if (aggregatorSpecs != null + ? !aggregatorSpecs.equals(topNQuery.aggregatorSpecs) + : topNQuery.aggregatorSpecs != null) { return false; } if (dimFilter != null ? !dimFilter.equals(topNQuery.dimFilter) : topNQuery.dimFilter != null) { @@ -328,7 +332,9 @@ public boolean equals(Object o) if (granularity != null ? !granularity.equals(topNQuery.granularity) : topNQuery.granularity != null) { return false; } - if (postAggregatorSpecs != null ? !postAggregatorSpecs.equals(topNQuery.postAggregatorSpecs) : topNQuery.postAggregatorSpecs != null) { + if (postAggregatorSpecs != null + ? !postAggregatorSpecs.equals(topNQuery.postAggregatorSpecs) + : topNQuery.postAggregatorSpecs != null) { return false; } if (topNMetricSpec != null ? !topNMetricSpec.equals(topNQuery.topNMetricSpec) : topNQuery.topNMetricSpec != null) { diff --git a/processing/src/test/java/io/druid/query/QueriesTest.java b/processing/src/test/java/io/druid/query/QueriesTest.java index 1cc8aceaeef8..ba4ddb516091 100644 --- a/processing/src/test/java/io/druid/query/QueriesTest.java +++ b/processing/src/test/java/io/druid/query/QueriesTest.java @@ -59,7 +59,7 @@ public void testVerifyAggregations() throws Exception boolean exceptionOccured = false; try { - Queries.verifyAggregations(aggFactories, postAggs); + Queries.prepareAggregations(aggFactories, postAggs); } catch (IllegalArgumentException e) { exceptionOccured = true; @@ -91,7 +91,7 @@ public void testVerifyAggregationsMissingVal() throws Exception boolean exceptionOccured = false; try { - Queries.verifyAggregations(aggFactories, postAggs); + Queries.prepareAggregations(aggFactories, postAggs); } catch (IllegalArgumentException e) { exceptionOccured = true; @@ -145,7 +145,7 @@ public void testVerifyAggregationsMultiLevel() throws Exception boolean exceptionOccured = false; try { - Queries.verifyAggregations(aggFactories, postAggs); + Queries.prepareAggregations(aggFactories, postAggs); } catch (IllegalArgumentException e) { exceptionOccured = true; @@ -199,7 +199,7 @@ public void testVerifyAggregationsMultiLevelMissingVal() throws Exception boolean exceptionOccured = false; try { - Queries.verifyAggregations(aggFactories, postAggs); + Queries.prepareAggregations(aggFactories, postAggs); } catch (IllegalArgumentException e) { exceptionOccured = true; diff --git a/processing/src/test/java/io/druid/query/aggregation/post/FinalizingFieldAccessPostAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/post/FinalizingFieldAccessPostAggregatorTest.java new file mode 100644 index 000000000000..d32170d6a338 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/post/FinalizingFieldAccessPostAggregatorTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.post; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import io.druid.data.input.MapBasedRow; +import io.druid.granularity.QueryGranularities; +import io.druid.jackson.AggregatorsModule; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.query.aggregation.AggregationTestHelper; +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregator; +import io.druid.query.aggregation.PostAggregator; +import io.druid.query.groupby.GroupByQueryRunnerTest; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class FinalizingFieldAccessPostAggregatorTest +{ + @Rule + public final TemporaryFolder tempFoler = new TemporaryFolder(); + + @Test(expected = UnsupportedOperationException.class) + public void testComputeWithoutFinalizing() + { + String aggName = "rows"; + Aggregator agg = new CountAggregator(); + agg.aggregate(); + agg.aggregate(); + agg.aggregate(); + + Map metricValues = Maps.newHashMap(); + metricValues.put(aggName, agg.get()); + + FinalizingFieldAccessPostAggregator postAgg = new FinalizingFieldAccessPostAggregator("final_rows", aggName); + Assert.assertEquals(new Long(3L), postAgg.compute(metricValues)); + } + + @Test + public void testComputedWithFinalizing() + { + String aggName = "biily"; + AggregatorFactory aggFactory = EasyMock.createMock(AggregatorFactory.class); + EasyMock.expect(aggFactory.finalizeComputation("test")) + .andReturn(new Long(3L)) + .times(1); + EasyMock.replay(aggFactory); + + FinalizingFieldAccessPostAggregator postAgg = FinalizingFieldAccessPostAggregator.buildDecorated( + "final_billy", aggName, ImmutableMap.of(aggName, aggFactory) + ); + + Map metricValues = Maps.newHashMap(); + metricValues.put(aggName, "test"); + + Assert.assertEquals(new Long(3L), postAgg.compute(metricValues)); + EasyMock.verify(aggFactory); + } + + @Test + public void testComputedInArithmeticPostAggregator() + { + String aggName = "billy"; + AggregatorFactory aggFactory = EasyMock.createMock(AggregatorFactory.class); + EasyMock.expect(aggFactory.finalizeComputation("test")) + .andReturn(new Long(3L)) + .times(1); + EasyMock.replay(aggFactory); + + FinalizingFieldAccessPostAggregator postAgg = FinalizingFieldAccessPostAggregator.buildDecorated( + "final_billy", aggName, ImmutableMap.of(aggName, aggFactory) + ); + + Map metricValues = Maps.newHashMap(); + metricValues.put(aggName, "test"); + + List postAggsList = Lists.newArrayList( + new ConstantPostAggregator("roku", 6), postAgg); + + ArithmeticPostAggregator arithmeticPostAggregator = new ArithmeticPostAggregator("add", "+", postAggsList); + + Assert.assertEquals(new Double(9.0f), arithmeticPostAggregator.compute(metricValues)); + EasyMock.verify(); + } + + @Test + public void testComparatorsWithFinalizing() throws Exception + { + String aggName = "billy"; + AggregatorFactory aggFactory = EasyMock.createMock(AggregatorFactory.class); + EasyMock.expect(aggFactory.finalizeComputation("test_val1")) + .andReturn(new Long(10L)) + .times(1); + EasyMock.expect(aggFactory.finalizeComputation("test_val2")) + .andReturn(new Long(21)) + .times(1); + EasyMock.expect(aggFactory.finalizeComputation("test_val3")) + .andReturn(new Long(3)) + .times(1); + EasyMock.expect(aggFactory.finalizeComputation("test_val4")) + .andReturn(null) + .times(1); + EasyMock.expect(aggFactory.getComparator()) + .andReturn(Ordering.natural().nullsLast()) + .times(1); + EasyMock.replay(aggFactory); + + FinalizingFieldAccessPostAggregator postAgg = FinalizingFieldAccessPostAggregator.buildDecorated( + "final_billy", aggName, ImmutableMap.of(aggName, aggFactory) + ); + + List computedValues = Lists.newArrayList(); + computedValues.add(postAgg.compute(ImmutableMap.of(aggName, (Object)"test_val1"))); + computedValues.add(postAgg.compute(ImmutableMap.of(aggName, (Object)"test_val2"))); + computedValues.add(postAgg.compute(ImmutableMap.of(aggName, (Object)"test_val3"))); + computedValues.add(postAgg.compute(ImmutableMap.of(aggName, (Object)"test_val4"))); + + Collections.sort(computedValues, postAgg.getComparator()); + Assert.assertArrayEquals(new Object[]{3L, 10L, 21L, null}, computedValues.toArray(new Object[]{})); + EasyMock.verify(); + } + + @Test + public void testComparatorsWithFinalizingAndComparatorNull() throws Exception + { + String aggName = "billy"; + AggregatorFactory aggFactory = EasyMock.createMock(AggregatorFactory.class); + EasyMock.expect(aggFactory.getComparator()) + .andReturn(null) + .times(1); + EasyMock.replay(aggFactory); + + FinalizingFieldAccessPostAggregator postAgg = FinalizingFieldAccessPostAggregator.buildDecorated( + "final_billy", "joe", ImmutableMap.of(aggName, aggFactory)); + + List computedValues = Lists.newArrayList(); + Map forNull = Maps.newHashMap(); + forNull.put("joe", (Object)null); // guava does not allow the value to be null. + computedValues.add(postAgg.compute(ImmutableMap.of("joe", (Object)"test_val1"))); + computedValues.add(postAgg.compute(ImmutableMap.of("joe", (Object)"test_val2"))); + computedValues.add(postAgg.compute(forNull)); + computedValues.add(postAgg.compute(ImmutableMap.of("joe", (Object)"test_val4"))); + Collections.sort(computedValues, postAgg.getComparator()); + + Assert.assertArrayEquals(new Object[]{null, "test_val1", "test_val2", "test_val4"}, computedValues.toArray(new Object[]{})); + + EasyMock.verify(); + } + + @Test + public void testIngestAndQueryWithArithmeticPostAggregator() throws Exception + { + AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + Lists.newArrayList(new AggregatorsModule()), + GroupByQueryRunnerTest.testConfigs().get(0), + tempFoler + ); + + String metricSpec = "[{\"type\": \"hyperUnique\", \"name\": \"hll_market\", \"fieldName\": \"market\"}," + + "{\"type\": \"hyperUnique\", \"name\": \"hll_quality\", \"fieldName\": \"quality\"}]"; + + String parseSpec = "{" + + "\"type\" : \"string\"," + + "\"parseSpec\" : {" + + " \"format\" : \"tsv\"," + + " \"timestampSpec\" : {" + + " \"column\" : \"timestamp\"," + + " \"format\" : \"auto\"" + + "}," + + " \"dimensionsSpec\" : {" + + " \"dimensions\": []," + + " \"dimensionExclusions\" : []," + + " \"spatialDimensions\" : []" + + " }," + + " \"columns\": [\"timestamp\", \"market\", \"quality\", \"placement\", \"placementish\", \"index\"]" + + " }" + + "}"; + + String query = "{" + + "\"queryType\": \"groupBy\"," + + "\"dataSource\": \"test_datasource\"," + + "\"granularity\": \"ALL\"," + + "\"dimensions\": []," + + "\"aggregations\": [" + + " { \"type\": \"hyperUnique\", \"name\": \"hll_market\", \"fieldName\": \"hll_market\" }," + + " { \"type\": \"hyperUnique\", \"name\": \"hll_quality\", \"fieldName\": \"hll_quality\" }" + + "]," + + "\"postAggregations\": [" + + " { \"type\": \"arithmetic\", \"name\": \"uniq_add\", \"fn\": \"+\", \"fields\":[" + + " { \"type\": \"finalizingFieldAccess\", \"name\": \"uniq_market\", \"fieldName\": \"hll_market\" }," + + " { \"type\": \"finalizingFieldAccess\", \"name\": \"uniq_quality\", \"fieldName\": \"hll_quality\" }]" + + " }" + + "]," + + "\"intervals\": [ \"1970/2050\" ]" + + "}"; + + Sequence seq = helper.createIndexAndRunQueryOnSegment( + new File(this.getClass().getClassLoader().getResource("druid.sample.tsv").getFile()), + parseSpec, + metricSpec, + 0, + QueryGranularities.NONE, + 50000, + query + ); + + MapBasedRow row = (MapBasedRow) Sequences.toList(seq, Lists.newArrayList()).get(0); + Assert.assertEquals(3.0, row.getFloatMetric("hll_market"), 0.1); + Assert.assertEquals(9.0, row.getFloatMetric("hll_quality"), 0.1); + Assert.assertEquals(12.0, row.getFloatMetric("uniq_add"), 0.1); + } +}