diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index f017cad0d2a8..eed69f34384b 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -68,6 +68,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -144,6 +145,7 @@ private Function, Sequence> makePostProcessingFn() postProcessingFn, (Sequence input) -> { havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this)); + havingSpec.setAggregators(getAggregatorsMap(aggregatorSpecs)); return Sequences.filter(input, havingSpec::eval); } ); @@ -696,6 +698,13 @@ private static void verifyOutputNames( } } + private static Map getAggregatorsMap(List aggregatorSpecs) + { + Map map = new HashMap<>(aggregatorSpecs.size()); + aggregatorSpecs.stream().forEach(v -> map.put(v.getName(), v)); + return map; + } + public static class Builder { private DataSource dataSource; diff --git a/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java index 4a5f962bcdca..2436c91613f7 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.List; @@ -55,6 +56,14 @@ public void setRowSignature(Map rowSignature) } } + @Override + public void setAggregators(Map aggregators) + { + for (HavingSpec havingSpec : havingSpecs) { + havingSpec.setAggregators(aggregators); + } + } + @Override public boolean eval(Row row) { diff --git a/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java index 5700e89dea25..be28eec2e4c5 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java @@ -19,6 +19,7 @@ package io.druid.query.groupby.having; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.Map; @@ -30,4 +31,10 @@ public void setRowSignature(Map rowSignature) { // Do nothing. } + + @Override + public void setAggregators(Map aggregators) + { + + } } diff --git a/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java index 89698285b802..2e382c4dd9cd 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java @@ -22,6 +22,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; + +import java.util.Map; /** * The "=" operator in a "having" clause. This is similar to SQL's "having aggregation = value", @@ -32,6 +35,8 @@ public class EqualToHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; + private volatile Map aggregators; + @JsonCreator public EqualToHavingSpec( @JsonProperty("aggregation") String aggName, @@ -54,10 +59,16 @@ public String getAggregationName() return aggregationName; } + @Override + public void setAggregators(Map aggregators) + { + this.aggregators = aggregators; + } + @Override public boolean eval(Row row) { - return HavingSpecMetricComparator.compare(row, aggregationName, value) == 0; + return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) == 0; } /** diff --git a/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java index d75333b74fae..1c7ec437eecd 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java @@ -22,6 +22,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; + +import java.util.Map; /** * The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value", @@ -32,6 +35,8 @@ public class GreaterThanHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; + private volatile Map aggregators; + @JsonCreator public GreaterThanHavingSpec( @JsonProperty("aggregation") String aggName, @@ -54,10 +59,16 @@ public Number getValue() return value; } + @Override + public void setAggregators(Map aggregators) + { + this.aggregators = aggregators; + } + @Override public boolean eval(Row row) { - return HavingSpecMetricComparator.compare(row, aggregationName, value) > 0; + return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) > 0; } /** diff --git a/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java index d3bc31c9989b..a2431c65f600 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.Map; @@ -58,6 +59,8 @@ public interface HavingSpec */ void setRowSignature(Map rowSignature); + void setAggregators(Map aggregators); + /** * Evaluates if a given row satisfies the having spec. * diff --git a/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java b/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java index cae152a1d091..ff08202bf7cb 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java +++ b/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java @@ -19,9 +19,15 @@ package io.druid.query.groupby.having; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Floats; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.java.util.common.ISE; +import java.util.Map; import java.util.regex.Pattern; /** @@ -30,16 +36,28 @@ class HavingSpecMetricComparator { static final Pattern LONG_PAT = Pattern.compile("[-|+]?\\d+"); - static int compare(Row row, String aggregationName, Number value) + static int compare(Row row, String aggregationName, Number value, Map aggregators) { + Object metricValueObj = row.getRaw(aggregationName); + if (metricValueObj != null) { + if (aggregators != null && aggregators.containsKey(aggregationName)) { + metricValueObj = aggregators.get(aggregationName).finalizeComputation(metricValueObj); + } + if (metricValueObj instanceof Long) { - return Long.compare((Long) metricValueObj, value.longValue()); + long l = ((Long) metricValueObj).longValue(); + return Longs.compare(l, value.longValue()); } else if (metricValueObj instanceof Float) { - return Float.compare((Float) metricValueObj, value.floatValue()); + float l = ((Float) metricValueObj).floatValue(); + return Floats.compare(l, value.floatValue()); } else if (metricValueObj instanceof Double) { - return Double.compare((Double) metricValueObj, value.doubleValue()); + double l = ((Double) metricValueObj).longValue(); + return Doubles.compare(l, value.doubleValue()); + } else if (metricValueObj instanceof Integer) { + int l = ((Integer) metricValueObj).intValue(); + return Ints.compare(l, value.intValue()); } else if (metricValueObj instanceof String) { String metricValueStr = (String) metricValueObj; if (LONG_PAT.matcher(metricValueStr).matches()) { diff --git a/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java index a60d5adc123e..37d6268b866d 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java @@ -21,6 +21,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; + +import java.util.Map; /** * The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value", @@ -31,6 +34,8 @@ public class LessThanHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; + private volatile Map aggregators; + public LessThanHavingSpec( @JsonProperty("aggregation") String aggName, @JsonProperty("value") Number value @@ -52,10 +57,16 @@ public Number getValue() return value; } + @Override + public void setAggregators(Map aggregators) + { + this.aggregators = aggregators; + } + @Override public boolean eval(Row row) { - return HavingSpecMetricComparator.compare(row, aggregationName, value) < 0; + return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) < 0; } /** diff --git a/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java index b049fd668cec..c36a35b28815 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.Map; @@ -51,6 +52,12 @@ public void setRowSignature(Map rowSignature) havingSpec.setRowSignature(rowSignature); } + @Override + public void setAggregators(Map aggregators) + { + havingSpec.setAggregators(aggregators); + } + @Override public boolean eval(Row row) { diff --git a/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java index 67eb4edd3daa..66ebbf8dedc4 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.List; @@ -55,6 +56,14 @@ public void setRowSignature(Map rowSignature) } } + @Override + public void setAggregators(Map aggregators) + { + for (HavingSpec havingSpec : havingSpecs) { + havingSpec.setAggregators(aggregators); + } + } + @Override public boolean eval(Row row) { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index dcb64932e9f5..fc9054889fc3 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -3318,10 +3318,6 @@ public void testGroupByWithHavingOnHyperUnique() ) ); - // havingSpec equalTo/greaterThan/lessThan do not work on complex aggregators, even if they could be finalized. - // See also: https://github.com/druid-io/druid/issues/2507 - expectedException.expect(ISE.class); - expectedException.expectMessage("Unknown type of metric value"); Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "order-limit"); }