From acddd100b99c9854ffe2d17b961a8b6ee1747add Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 29 Sep 2017 11:07:36 -0500 Subject: [PATCH 1/4] greater-than/less-than/equal-to havingSpec to call AggregatorFactory.finalizeComputation(..) --- .../io/druid/query/groupby/GroupByQuery.java | 10 +++++++++ .../query/groupby/having/AndHavingSpec.java | 9 ++++++++ .../query/groupby/having/BaseHavingSpec.java | 7 +++++++ .../groupby/having/EqualToHavingSpec.java | 13 +++++++++++- .../groupby/having/GreaterThanHavingSpec.java | 13 +++++++++++- .../query/groupby/having/HavingSpec.java | 3 +++ .../having/HavingSpecMetricComparator.java | 21 ++++++++++++++++++- .../groupby/having/LessThanHavingSpec.java | 13 +++++++++++- .../query/groupby/having/NotHavingSpec.java | 7 +++++++ .../query/groupby/having/OrHavingSpec.java | 9 ++++++++ 10 files changed, 101 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 81776df032cb..97eb952a41ca 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -70,6 +70,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -147,6 +148,7 @@ private Function, Sequence> makePostProcessingFn() postProcessingFn, (Sequence input) -> { havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this)); + havingSpec.setAggregators(getAggregatorsMap(aggregatorSpecs)); return Sequences.filter(input, havingSpec::eval); } ); @@ -220,6 +222,7 @@ private GroupByQuery( public Sequence apply(Sequence input) { GroupByQuery.this.havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this)); + GroupByQuery.this.havingSpec.setAggregators(getAggregatorsMap(GroupByQuery.this.aggregatorSpecs)); return Sequences.filter( input, new Predicate() @@ -735,6 +738,13 @@ private static void verifyOutputNames( } } + private static Map getAggregatorsMap(List aggregatorSpecs) + { + Map map = new HashMap<>(aggregatorSpecs.size()); + aggregatorSpecs.stream().forEach(v -> map.put(v.getName(), v)); + return map; + } + public static class Builder { private DataSource dataSource; diff --git a/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java index 4a5f962bcdca..2436c91613f7 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.List; @@ -55,6 +56,14 @@ public void setRowSignature(Map rowSignature) } } + @Override + public void setAggregators(Map aggregators) + { + for (HavingSpec havingSpec : havingSpecs) { + havingSpec.setAggregators(aggregators); + } + } + @Override public boolean eval(Row row) { diff --git a/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java index 5700e89dea25..be28eec2e4c5 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java @@ -19,6 +19,7 @@ package io.druid.query.groupby.having; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.Map; @@ -30,4 +31,10 @@ public void setRowSignature(Map rowSignature) { // Do nothing. } + + @Override + public void setAggregators(Map aggregators) + { + + } } diff --git a/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java index 89698285b802..2e382c4dd9cd 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java @@ -22,6 +22,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; + +import java.util.Map; /** * The "=" operator in a "having" clause. This is similar to SQL's "having aggregation = value", @@ -32,6 +35,8 @@ public class EqualToHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; + private volatile Map aggregators; + @JsonCreator public EqualToHavingSpec( @JsonProperty("aggregation") String aggName, @@ -54,10 +59,16 @@ public String getAggregationName() return aggregationName; } + @Override + public void setAggregators(Map aggregators) + { + this.aggregators = aggregators; + } + @Override public boolean eval(Row row) { - return HavingSpecMetricComparator.compare(row, aggregationName, value) == 0; + return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) == 0; } /** diff --git a/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java index d75333b74fae..1c7ec437eecd 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java @@ -22,6 +22,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; + +import java.util.Map; /** * The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value", @@ -32,6 +35,8 @@ public class GreaterThanHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; + private volatile Map aggregators; + @JsonCreator public GreaterThanHavingSpec( @JsonProperty("aggregation") String aggName, @@ -54,10 +59,16 @@ public Number getValue() return value; } + @Override + public void setAggregators(Map aggregators) + { + this.aggregators = aggregators; + } + @Override public boolean eval(Row row) { - return HavingSpecMetricComparator.compare(row, aggregationName, value) > 0; + return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) > 0; } /** diff --git a/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java index d3bc31c9989b..a2431c65f600 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.Map; @@ -58,6 +59,8 @@ public interface HavingSpec */ void setRowSignature(Map rowSignature); + void setAggregators(Map aggregators); + /** * Evaluates if a given row satisfies the having spec. * diff --git a/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java b/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java index 324962478447..2966490a0374 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java +++ b/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java @@ -19,10 +19,14 @@ package io.druid.query.groupby.having; +import com.google.common.primitives.Doubles; import com.google.common.primitives.Floats; +import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; +import java.util.Map; import java.util.regex.Pattern; /** @@ -31,13 +35,28 @@ class HavingSpecMetricComparator { static final Pattern LONG_PAT = Pattern.compile("[-|+]?\\d+"); - static int compare(Row row, String aggregationName, Number value) + static int compare(Row row, String aggregationName, Number value, Map aggregators) { + Object metricValueObj = row.getRaw(aggregationName); + if (metricValueObj != null) { + if (aggregators != null && aggregators.containsKey(aggregationName)) { + metricValueObj = aggregators.get(aggregationName).finalizeComputation(metricValueObj); + } + if (metricValueObj instanceof Long) { long l = ((Long) metricValueObj).longValue(); return Longs.compare(l, value.longValue()); + } else if (metricValueObj instanceof Float) { + float l = ((Float) metricValueObj).floatValue(); + return Floats.compare(l, value.floatValue()); + } else if (metricValueObj instanceof Double) { + double l = ((Double) metricValueObj).longValue(); + return Doubles.compare(l, value.doubleValue()); + } else if (metricValueObj instanceof Integer) { + int l = ((Integer) metricValueObj).intValue(); + return Ints.compare(l, value.intValue()); } else if (metricValueObj instanceof String) { String metricValueStr = (String) metricValueObj; if (LONG_PAT.matcher(metricValueStr).matches()) { diff --git a/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java index a60d5adc123e..37d6268b866d 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java @@ -21,6 +21,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; + +import java.util.Map; /** * The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value", @@ -31,6 +34,8 @@ public class LessThanHavingSpec extends BaseHavingSpec private final String aggregationName; private final Number value; + private volatile Map aggregators; + public LessThanHavingSpec( @JsonProperty("aggregation") String aggName, @JsonProperty("value") Number value @@ -52,10 +57,16 @@ public Number getValue() return value; } + @Override + public void setAggregators(Map aggregators) + { + this.aggregators = aggregators; + } + @Override public boolean eval(Row row) { - return HavingSpecMetricComparator.compare(row, aggregationName, value) < 0; + return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) < 0; } /** diff --git a/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java index b049fd668cec..c36a35b28815 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.Map; @@ -51,6 +52,12 @@ public void setRowSignature(Map rowSignature) havingSpec.setRowSignature(rowSignature); } + @Override + public void setAggregators(Map aggregators) + { + havingSpec.setAggregators(aggregators); + } + @Override public boolean eval(Row row) { diff --git a/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java index 67eb4edd3daa..66ebbf8dedc4 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.druid.data.input.Row; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.ValueType; import java.util.List; @@ -55,6 +56,14 @@ public void setRowSignature(Map rowSignature) } } + @Override + public void setAggregators(Map aggregators) + { + for (HavingSpec havingSpec : havingSpecs) { + havingSpec.setAggregators(aggregators); + } + } + @Override public boolean eval(Row row) { From ae78c10501620b7e2c3f1367ae16fcdbe857b589 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 4 Oct 2017 09:38:54 -0500 Subject: [PATCH 2/4] fix the unit test and expect having to work on hyperUnique agg --- .../java/io/druid/query/groupby/GroupByQueryRunnerTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 07a9e902eb82..cb5e71db03b9 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -47,7 +47,6 @@ import io.druid.java.util.common.guava.MergeSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; -import io.druid.java.util.common.parsers.ParseException; import io.druid.js.JavaScriptConfig; import io.druid.query.BySegmentResultValue; import io.druid.query.BySegmentResultValueClass; @@ -3319,10 +3318,6 @@ public void testGroupByWithHavingOnHyperUnique() ) ); - // havingSpec equalTo/greaterThan/lessThan do not work on complex aggregators, even if they could be finalized. - // See also: https://github.com/druid-io/druid/issues/2507 - expectedException.expect(ParseException.class); - expectedException.expectMessage("Unknown type[class io.druid.hll.HLLCV1]"); Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "order-limit"); } From 6a2da1e635bd4d217546dc61cc76e24840e1e867 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 16 Oct 2017 10:42:51 -0500 Subject: [PATCH 3/4] test fix --- .../java/io/druid/query/groupby/GroupByQueryRunnerTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index dcb64932e9f5..fc9054889fc3 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -3318,10 +3318,6 @@ public void testGroupByWithHavingOnHyperUnique() ) ); - // havingSpec equalTo/greaterThan/lessThan do not work on complex aggregators, even if they could be finalized. - // See also: https://github.com/druid-io/druid/issues/2507 - expectedException.expect(ISE.class); - expectedException.expectMessage("Unknown type of metric value"); Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, "order-limit"); } From 07073f718b935dcdbe8b26041a83830684f906f7 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 16 Oct 2017 10:53:22 -0500 Subject: [PATCH 4/4] fix style errors --- .../src/main/java/io/druid/query/groupby/GroupByQuery.java | 2 -- .../druid/query/groupby/having/HavingSpecMetricComparator.java | 1 - 2 files changed, 3 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index aafa19d20926..eed69f34384b 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -25,7 +25,6 @@ import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; @@ -55,7 +54,6 @@ import io.druid.query.groupby.orderby.LimitSpec; import io.druid.query.groupby.orderby.NoopLimitSpec; import io.druid.query.groupby.orderby.OrderByColumnSpec; -import io.druid.query.groupby.strategy.GroupByStrategyV2; import io.druid.query.ordering.StringComparator; import io.druid.query.ordering.StringComparators; import io.druid.query.spec.LegacySegmentSpec; diff --git a/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java b/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java index 5b29f71b6ac7..ff08202bf7cb 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java +++ b/processing/src/main/java/io/druid/query/groupby/having/HavingSpecMetricComparator.java @@ -25,7 +25,6 @@ import com.google.common.primitives.Longs; import io.druid.data.input.Row; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.data.input.Row; import io.druid.java.util.common.ISE; import java.util.Map;