From 676c3f8f3d7c5e39f2f748524f31049d93efda55 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 25 Aug 2017 11:16:51 -0700 Subject: [PATCH 1/4] Add "round" option to cardinality and hyperUnique aggregators. Also turn it on by default in SQL, to make math on distinct counts work more as expected. --- docs/content/querying/aggregations.md | 18 +++- docs/content/querying/post-aggregations.md | 10 +- .../io/druid/hll/HyperLogLogCollector.java | 5 + .../indexer/DetermineHashedPartitionsJob.java | 2 +- .../druid/indexing/common/task/IndexTask.java | 2 +- .../CardinalityAggregatorFactory.java | 99 ++++++++----------- .../HyperUniqueFinalizingPostAggregator.java | 26 ++++- .../HyperUniquesAggregatorFactory.java | 59 +++++++---- .../io/druid/query/QueryRunnerTestHelper.java | 6 ++ .../CardinalityAggregatorTest.java | 26 ++++- ...perUniqueFinalizingPostAggregatorTest.java | 35 +++++++ .../HyperUniquesAggregatorFactoryTest.java | 55 ++++++++--- .../druid/query/topn/TopNQueryRunnerTest.java | 50 ++++++++++ .../ApproxCountDistinctSqlAggregator.java | 4 +- .../druid/sql/calcite/CalciteQueryTest.java | 39 +++++--- 15 files changed, 316 insertions(+), 120 deletions(-) diff --git a/docs/content/querying/aggregations.md b/docs/content/querying/aggregations.md index 0cac388273b8..420daf97afbb 100644 --- a/docs/content/querying/aggregations.md +++ b/docs/content/querying/aggregations.md @@ -227,12 +227,17 @@ instead of the cardinality aggregator if you do not care about the individual va "type": "cardinality", "name": "", "fields": [ , , ... ], - "byRow": # (optional, defaults to false) + "byRow": # (optional, defaults to false), + "round": # (optional, defaults to false) } ``` Each individual element of the "fields" list can be a String or [DimensionSpec](../querying/dimensionspecs.html). A String dimension in the fields list is equivalent to a DefaultDimensionSpec (no transformations). +The HyperLogLog algorithm generates decimal estimates with some error. "round" can be set to true to round off estimated +values to whole numbers. Note that even with rounding, the cardinality is still an estimate. The "round" field only +affects query-time behavior, and is ignored at ingestion-time. + #### Cardinality by value When setting `byRow` to `false` (the default) it computes the cardinality of the set composed of the union of all dimension values for all the given dimensions. @@ -315,12 +320,17 @@ Uses [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to "type" : "hyperUnique", "name" : , "fieldName" : , - "isInputHyperUnique" : false + "isInputHyperUnique" : false, + "round" : false } ``` -isInputHyperUnique can be set to true to index pre-computed HLL (Base64 encoded output from druid-hll is expected). -The isInputHyperUnique field only affects ingestion-time behavior, and is ignored at query time. +"isInputHyperUnique" can be set to true to index pre-computed HLL (Base64 encoded output from druid-hll is expected). +The "isInputHyperUnique" field only affects ingestion-time behavior, and is ignored at query-time. + +The HyperLogLog algorithm generates decimal estimates with some error. "round" can be set to true to round off estimated +values to whole numbers. Note that even with rounding, the cardinality is still an estimate. The "round" field only +affects query-time behavior, and is ignored at ingestion-time. For more approximate aggregators, please see [theta sketches](../development/extensions-core/datasketches-aggregators.html). diff --git a/docs/content/querying/post-aggregations.md b/docs/content/querying/post-aggregations.md index 799aa046eb42..c574bbd6e43e 100644 --- a/docs/content/querying/post-aggregations.md +++ b/docs/content/querying/post-aggregations.md @@ -107,7 +107,11 @@ JavaScript-based functionality is disabled by default. Please refer to the Druid The hyperUniqueCardinality post aggregator is used to wrap a hyperUnique object such that it can be used in post aggregations. ```json -{ "type" : "hyperUniqueCardinality", "name": , "fieldName" : } +{ + "type" : "hyperUniqueCardinality", + "name": , + "fieldName" : +} ``` It can be used in a sample calculation as so: @@ -128,6 +132,10 @@ It can be used in a sample calculation as so: }] ``` +This post-aggregator will inherit the rounding behavior of the aggregator it references. Note that this inheritance +is only effective if you directly reference an aggregator. Going through another post-aggregator, for example, will +cause the user-specified rounding behavior to get lost and default to "no rounding". + ## Example Usage In this example, let’s calculate a simple percentage using post aggregators. Let’s imagine our data set has a metric called "total". diff --git a/hll/src/main/java/io/druid/hll/HyperLogLogCollector.java b/hll/src/main/java/io/druid/hll/HyperLogLogCollector.java index 51f6770ece50..1c30f2227c99 100644 --- a/hll/src/main/java/io/druid/hll/HyperLogLogCollector.java +++ b/hll/src/main/java/io/druid/hll/HyperLogLogCollector.java @@ -502,6 +502,11 @@ public byte[] toByteArray() return theBytes; } + public long estimateCardinalityRound() + { + return Math.round(estimateCardinality()); + } + public double estimateCardinality() { if (estimatedCardinality == null) { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 7d0e2a9cb461..820e50343457 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -344,7 +344,7 @@ protected void reduce( } ).writeValue( out, - new Double(aggregate.estimateCardinality()).longValue() + aggregate.estimateCardinalityRound() ); } finally { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index bc68c7b4ed92..3aa0e5c2444b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -377,7 +377,7 @@ private static ShardSpecs createShardSpecsFromInput( final int numShards; if (determineNumPartitions) { - final long numRows = new Double(collector.estimateCardinality()).longValue(); + final long numRows = collector.estimateCardinalityRound(); numShards = (int) Math.ceil((double) numRows / tuningConfig.getTargetPartitionSize()); log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards); } else { diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java index 177815391add..8b3a6c4678f5 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java @@ -39,6 +39,7 @@ import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy; import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.query.cache.CacheKeyBuilder; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.ColumnSelectorFactory; @@ -46,10 +47,11 @@ import org.apache.commons.codec.binary.Base64; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; public class CardinalityAggregatorFactory extends AggregatorFactory { @@ -87,28 +89,21 @@ public DimensionSpec apply(String input) ); } - public static Object estimateCardinality(Object object) - { - if (object == null) { - return 0; - } - - return ((HyperLogLogCollector) object).estimateCardinality(); - } - private static final CardinalityAggregatorColumnSelectorStrategyFactory STRATEGY_FACTORY = new CardinalityAggregatorColumnSelectorStrategyFactory(); private final String name; private final List fields; private final boolean byRow; + private final boolean round; @JsonCreator public CardinalityAggregatorFactory( @JsonProperty("name") String name, @Deprecated @JsonProperty("fieldNames") final List fieldNames, @JsonProperty("fields") final List fields, - @JsonProperty("byRow") final boolean byRow + @JsonProperty("byRow") final boolean byRow, + @JsonProperty("round") final boolean round ) { this.name = name; @@ -123,6 +118,7 @@ public CardinalityAggregatorFactory( this.fields = fields; } this.byRow = byRow; + this.round = round; } public CardinalityAggregatorFactory( @@ -131,7 +127,7 @@ public CardinalityAggregatorFactory( final boolean byRow ) { - this(name, null, fields, byRow); + this(name, null, fields, byRow, false); } @Override @@ -201,7 +197,7 @@ public AggregateCombiner makeAggregateCombiner() @Override public AggregatorFactory getCombiningFactory() { - return new HyperUniquesAggregatorFactory(name, name); + return new HyperUniquesAggregatorFactory(name, name, false, round); } @Override @@ -213,17 +209,16 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre @Override public List getRequiredColumns() { - return Lists.transform( - fields, - new Function() - { - @Override - public AggregatorFactory apply(DimensionSpec input) - { - return new CardinalityAggregatorFactory(input.getOutputName(), Collections.singletonList(input), byRow); - } - } - ); + return fields.stream() + .map( + input -> new CardinalityAggregatorFactory( + input.getOutputName(), + null, + Collections.singletonList(input), + byRow, + round + ) + ).collect(Collectors.toList()); } @Override @@ -249,7 +244,7 @@ public Object deserialize(Object object) public Object finalizeComputation(Object object) { - return estimateCardinality(object); + return HyperUniquesAggregatorFactory.estimateCardinality(object, round); } @Override @@ -277,25 +272,20 @@ public boolean isByRow() return byRow; } + @JsonProperty + public boolean isRound() + { + return round; + } + @Override public byte[] getCacheKey() { - List dimSpecKeys = new ArrayList<>(); - int dimSpecKeysLength = fields.size(); - for (DimensionSpec dimSpec : fields) { - byte[] dimSpecKey = dimSpec.getCacheKey(); - dimSpecKeysLength += dimSpecKey.length; - dimSpecKeys.add(dimSpec.getCacheKey()); - } - - ByteBuffer retBuf = ByteBuffer.allocate(2 + dimSpecKeysLength); - retBuf.put(AggregatorUtil.CARD_CACHE_TYPE_ID); - for (byte[] dimSpecKey : dimSpecKeys) { - retBuf.put(dimSpecKey); - retBuf.put(AggregatorUtil.STRING_SEPARATOR); - } - retBuf.put((byte) (byRow ? 1 : 0)); - return retBuf.array(); + return new CacheKeyBuilder(AggregatorUtil.CARD_CACHE_TYPE_ID) + .appendCacheables(fields) + .appendBoolean(byRow) + .appendBoolean(round) + .build(); } @Override @@ -311,7 +301,7 @@ public int getMaxIntermediateSize() } @Override - public boolean equals(Object o) + public boolean equals(final Object o) { if (this == o) { return true; @@ -319,26 +309,17 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - - CardinalityAggregatorFactory that = (CardinalityAggregatorFactory) o; - - if (isByRow() != that.isByRow()) { - return false; - } - if (!getName().equals(that.getName())) { - return false; - } - return getFields().equals(that.getFields()); - + final CardinalityAggregatorFactory that = (CardinalityAggregatorFactory) o; + return byRow == that.byRow && + round == that.round && + Objects.equals(name, that.name) && + Objects.equals(fields, that.fields); } @Override public int hashCode() { - int result = getName().hashCode(); - result = 31 * result + getFields().hashCode(); - result = 31 * result + (isByRow() ? 1 : 0); - return result; + return Objects.hash(name, fields, byRow, round); } @Override @@ -346,7 +327,9 @@ public String toString() { return "CardinalityAggregatorFactory{" + "name='" + name + '\'' + - ", fields='" + fields + '\'' + + ", fields=" + fields + + ", byRow=" + byRow + + ", round=" + round + '}'; } } diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregator.java index ab1b405a4de0..f49c79c3c1df 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregator.java @@ -48,18 +48,28 @@ public int compare(Double lhs, Double rhs) private final String name; private final String fieldName; + private final AggregatorFactory aggregatorFactory; @JsonCreator public HyperUniqueFinalizingPostAggregator( @JsonProperty("name") String name, @JsonProperty("fieldName") String fieldName ) + { + this(name, fieldName, null); + } + + private HyperUniqueFinalizingPostAggregator( + String name, + String fieldName, + AggregatorFactory aggregatorFactory + ) { this.fieldName = Preconditions.checkNotNull(fieldName, "fieldName is null"); //Note that, in general, name shouldn't be null, we are defaulting //to fieldName here just to be backward compatible with 0.7.x this.name = name == null ? fieldName : name; - + this.aggregatorFactory = aggregatorFactory; } @Override @@ -77,7 +87,16 @@ public Comparator getComparator() @Override public Object compute(Map combinedAggregators) { - return HyperUniquesAggregatorFactory.estimateCardinality(combinedAggregators.get(fieldName)); + final Object collector = combinedAggregators.get(fieldName); + + if (aggregatorFactory == null) { + // This didn't come directly from an aggregator. Maybe it came through a FieldAccessPostAggregator or + // something like that. Hope it's a HyperLogLogCollector, and estimate it without rounding. + return HyperUniquesAggregatorFactory.estimateCardinality(collector, false); + } else { + // Delegate to the aggregator factory to get the user-specified rounding behavior. + return aggregatorFactory.finalizeComputation(collector); + } } @Override @@ -90,7 +109,8 @@ public String getName() @Override public HyperUniqueFinalizingPostAggregator decorate(Map aggregators) { - return this; + final AggregatorFactory theAggregatorFactory = aggregators != null ? aggregators.get(fieldName) : null; + return new HyperUniqueFinalizingPostAggregator(name, fieldName, theAggregatorFactory); } @JsonProperty("fieldName") diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java index fd0850ce352b..e6ded9de10f4 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java @@ -34,6 +34,7 @@ import io.druid.query.aggregation.NoopAggregator; import io.druid.query.aggregation.NoopBufferAggregator; import io.druid.query.aggregation.cardinality.HyperLogLogCollectorAggregateCombiner; +import io.druid.query.cache.CacheKeyBuilder; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ObjectColumnSelector; import org.apache.commons.codec.binary.Base64; @@ -49,29 +50,39 @@ */ public class HyperUniquesAggregatorFactory extends AggregatorFactory { - public static Object estimateCardinality(Object object) + public static Object estimateCardinality(Object object, boolean round) { if (object == null) { return 0; } - return ((HyperLogLogCollector) object).estimateCardinality(); + final HyperLogLogCollector collector = (HyperLogLogCollector) object; + + // Avoid ternary, it causes estimateCardinalityRound to be cast to double. + if (round) { + return collector.estimateCardinalityRound(); + } else { + return collector.estimateCardinality(); + } } private final String name; private final String fieldName; private final boolean isInputHyperUnique; + private final boolean round; @JsonCreator public HyperUniquesAggregatorFactory( @JsonProperty("name") String name, @JsonProperty("fieldName") String fieldName, - @JsonProperty("isInputHyperUnique") Boolean isInputHyperUnique + @JsonProperty("isInputHyperUnique") boolean isInputHyperUnique, + @JsonProperty("round") boolean round ) { this.name = name; this.fieldName = fieldName; - this.isInputHyperUnique = (isInputHyperUnique == null) ? false : isInputHyperUnique; + this.isInputHyperUnique = isInputHyperUnique; + this.round = round; } public HyperUniquesAggregatorFactory( @@ -79,7 +90,7 @@ public HyperUniquesAggregatorFactory( String fieldName ) { - this(name, fieldName, false); + this(name, fieldName, false, false); } @Override @@ -147,7 +158,7 @@ public AggregateCombiner makeAggregateCombiner() @Override public AggregatorFactory getCombiningFactory() { - return new HyperUniquesAggregatorFactory(name, name, false); + return new HyperUniquesAggregatorFactory(name, name, false, round); } @Override @@ -166,7 +177,8 @@ public List getRequiredColumns() return Arrays.asList(new HyperUniquesAggregatorFactory( fieldName, fieldName, - isInputHyperUnique + isInputHyperUnique, + round )); } @@ -192,7 +204,7 @@ public Object deserialize(Object object) @Override public Object finalizeComputation(Object object) { - return estimateCardinality(object); + return estimateCardinality(object, round); } @Override @@ -220,15 +232,19 @@ public boolean getIsInputHyperUnique() return isInputHyperUnique; } + @JsonProperty + public boolean isRound() + { + return round; + } + @Override public byte[] getCacheKey() { - byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); - - return ByteBuffer.allocate(1 + fieldNameBytes.length) - .put(AggregatorUtil.HYPER_UNIQUE_CACHE_TYPE_ID) - .put(fieldNameBytes) - .array(); + return new CacheKeyBuilder(AggregatorUtil.HYPER_UNIQUE_CACHE_TYPE_ID) + .appendString(fieldName) + .appendBoolean(round) + .build(); } @Override @@ -254,11 +270,12 @@ public String toString() "name='" + name + '\'' + ", fieldName='" + fieldName + '\'' + ", isInputHyperUnique=" + isInputHyperUnique + + ", round=" + round + '}'; } @Override - public boolean equals(Object o) + public boolean equals(final Object o) { if (this == o) { return true; @@ -266,16 +283,16 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - - HyperUniquesAggregatorFactory that = (HyperUniquesAggregatorFactory) o; - - return Objects.equals(fieldName, that.fieldName) && Objects.equals(name, that.name) && - Objects.equals(isInputHyperUnique, that.isInputHyperUnique); + final HyperUniquesAggregatorFactory that = (HyperUniquesAggregatorFactory) o; + return isInputHyperUnique == that.isInputHyperUnique && + round == that.round && + Objects.equals(name, that.name) && + Objects.equals(fieldName, that.fieldName); } @Override public int hashCode() { - return Objects.hash(name, fieldName, isInputHyperUnique); + return Objects.hash(name, fieldName, isInputHyperUnique, round); } } diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 4cb45b848c60..f9fff791b1a8 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -158,6 +158,12 @@ public TableDataSource apply(@Nullable String input) "uniques", "quality_uniques" ); + public static final HyperUniquesAggregatorFactory qualityUniquesRounded = new HyperUniquesAggregatorFactory( + "uniques", + "quality_uniques", + false, + true + ); public static final CardinalityAggregatorFactory qualityCardinality = new CardinalityAggregatorFactory( "cardinality", Arrays.asList(new DefaultDimensionSpec("quality", "quality")), diff --git a/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java index c7b3f73665dc..362f14285a2a 100644 --- a/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java @@ -286,6 +286,7 @@ private static void bufferAggregate( List> dimInfoList; List selectorList; CardinalityAggregatorFactory rowAggregatorFactory; + CardinalityAggregatorFactory rowAggregatorFactoryRounded; CardinalityAggregatorFactory valueAggregatorFactory; final TestDimensionSelector dim1; final TestDimensionSelector dim2; @@ -335,6 +336,17 @@ public CardinalityAggregatorTest() true ); + rowAggregatorFactoryRounded = new CardinalityAggregatorFactory( + "billy", + null, + Lists.newArrayList( + dimSpec1, + dimSpec2 + ), + true, + true + ); + valueAggregatorFactory = new CardinalityAggregatorFactory( "billy", Lists.newArrayList( @@ -403,6 +415,7 @@ public void testAggregateRows() throws Exception aggregate(selectorList, agg); } Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get()), 0.05); + Assert.assertEquals(9L, rowAggregatorFactoryRounded.finalizeComputation(agg.get())); } @Test @@ -418,6 +431,7 @@ public void testAggregateValues() throws Exception aggregate(selectorList, agg); } Assert.assertEquals(7.0, (Double) valueAggregatorFactory.finalizeComputation(agg.get()), 0.05); + Assert.assertEquals(7L, rowAggregatorFactoryRounded.finalizeComputation(agg.get())); } @Test @@ -439,6 +453,7 @@ public void testBufferAggregateRows() throws Exception bufferAggregate(selectorList, agg, buf, pos); } Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get(buf, pos)), 0.05); + Assert.assertEquals(9L, rowAggregatorFactoryRounded.finalizeComputation(agg.get(buf, pos))); } @Test @@ -460,6 +475,7 @@ public void testBufferAggregateValues() throws Exception bufferAggregate(selectorList, agg, buf, pos); } Assert.assertEquals(7.0, (Double) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)), 0.05); + Assert.assertEquals(7L, rowAggregatorFactoryRounded.finalizeComputation(agg.get(buf, pos))); } @Test @@ -606,11 +622,13 @@ public void testSerde() throws Exception { CardinalityAggregatorFactory factory = new CardinalityAggregatorFactory( "billy", + null, ImmutableList.of( new DefaultDimensionSpec("b", "b"), new DefaultDimensionSpec("a", "a"), new DefaultDimensionSpec("c", "c") ), + true, true ); ObjectMapper objectMapper = new DefaultObjectMapper(); @@ -619,7 +637,13 @@ public void testSerde() throws Exception objectMapper.readValue(objectMapper.writeValueAsString(factory), AggregatorFactory.class) ); - String fieldNamesOnly = "{\"type\":\"cardinality\",\"name\":\"billy\",\"fields\":[\"b\",\"a\",\"c\"],\"byRow\":true}"; + String fieldNamesOnly = "{" + + "\"type\":\"cardinality\"," + + "\"name\":\"billy\"," + + "\"fields\":[\"b\",\"a\",\"c\"]," + + "\"byRow\":true," + + "\"round\":true" + + "}"; Assert.assertEquals( factory, objectMapper.readValue(fieldNamesOnly, AggregatorFactory.class) diff --git a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregatorTest.java index c16b51a03ec2..b543f73e9cf3 100644 --- a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregatorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregatorTest.java @@ -23,9 +23,13 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import io.druid.hll.HyperLogLogCollector; +import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import io.druid.query.dimension.DefaultDimensionSpec; +import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.Random; /** @@ -52,4 +56,35 @@ public void testCompute() throws Exception Assert.assertTrue(cardinality == 99.37233005831612); } + + @Test + public void testComputeRounded() throws Exception + { + Random random = new Random(0L); + HyperUniqueFinalizingPostAggregator postAggregator = new HyperUniqueFinalizingPostAggregator( + "uniques", "uniques" + ).decorate( + ImmutableMap.of( + "uniques", + new CardinalityAggregatorFactory( + "uniques", + null, + Collections.singletonList(DefaultDimensionSpec.of("dummy")), + false, + true + ) + ) + ); + HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); + + for (int i = 0; i < 100; ++i) { + byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes(); + collector.add(hashedVal); + } + + Object cardinality = postAggregator.compute(ImmutableMap.of("uniques", collector)); + + Assert.assertThat(cardinality, CoreMatchers.instanceOf(Long.class)); + Assert.assertEquals(99L, cardinality); + } } diff --git a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactoryTest.java b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactoryTest.java index b9f7be621ec0..7a54e87a5251 100644 --- a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactoryTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactoryTest.java @@ -19,11 +19,14 @@ package io.druid.query.aggregation.hyperloglog; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import io.druid.hll.HLLCV0; import io.druid.hll.HyperLogLogCollector; import io.druid.java.util.common.StringUtils; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.TestHelper; import org.junit.Assert; import org.junit.Test; @@ -83,8 +86,8 @@ public void testCompare2() throws Exception } Assert.assertEquals( - Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()), - comparator.compare(collector1, collector2) + Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()), + comparator.compare(collector1, collector2) ); } @@ -102,8 +105,8 @@ public void testCompare2() throws Exception } Assert.assertEquals( - Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()), - comparator.compare(collector1, collector2) + Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()), + comparator.compare(collector1, collector2) ); } @@ -121,8 +124,8 @@ public void testCompare2() throws Exception } Assert.assertEquals( - Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()), - comparator.compare(collector1, collector2) + Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()), + comparator.compare(collector1, collector2) ); } } @@ -150,20 +153,42 @@ public void testCompareToShouldBehaveConsistentlyWithEstimatedCardinalitiesEvenI } // when - final int orderedByCardinality = Double.compare(leftCollector.estimateCardinality(), - rightCollector.estimateCardinality()); + final int orderedByCardinality = Double.compare( + leftCollector.estimateCardinality(), + rightCollector.estimateCardinality() + ); final int orderedByComparator = comparator.compare(leftCollector, rightCollector); // then, assert hyperloglog comparator behaves consistently with estimated cardinalities Assert.assertEquals( - StringUtils.format("orderedByComparator=%d, orderedByCardinality=%d,\n" + - "Left={cardinality=%f, hll=%s},\n" + - "Right={cardinality=%f, hll=%s},\n", orderedByComparator, orderedByCardinality, - leftCollector.estimateCardinality(), leftCollector, - rightCollector.estimateCardinality(), rightCollector), - orderedByCardinality, - orderedByComparator + StringUtils.format("orderedByComparator=%d, orderedByCardinality=%d,\n" + + "Left={cardinality=%f, hll=%s},\n" + + "Right={cardinality=%f, hll=%s},\n", orderedByComparator, orderedByCardinality, + leftCollector.estimateCardinality(), leftCollector, + rightCollector.estimateCardinality(), rightCollector + ), + orderedByCardinality, + orderedByComparator ); } } + + @Test + public void testSerde() throws Exception + { + final HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory( + "foo", + "bar", + true, + true + ); + + final ObjectMapper jsonMapper = TestHelper.getJsonMapper(); + final AggregatorFactory factory2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(factory), + AggregatorFactory.class + ); + + Assert.assertEquals(factory, factory2); + } } diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index 13c83489629e..2bb6dff7b82a 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -712,6 +712,56 @@ public void testTopNOverHyperUniqueExpression() assertExpectedResults(expectedResults, query); } + @Test + public void testTopNOverHyperUniqueExpressionRounded() + { + TopNQuery query = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(QueryRunnerTestHelper.marketDimension) + .metric(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric) + .threshold(3) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Arrays.asList(QueryRunnerTestHelper.qualityUniquesRounded) + ) + .postAggregators( + Collections.singletonList(new ExpressionPostAggregator( + QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, + "uniques + 1", + null, + TestExprMacroTable.INSTANCE + )) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result<>( + DateTimes.of("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put("market", "spot") + .put(QueryRunnerTestHelper.uniqueMetric, 9L) + .put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, 10L) + .build(), + ImmutableMap.builder() + .put("market", "total_market") + .put(QueryRunnerTestHelper.uniqueMetric, 2L) + .put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, 3L) + .build(), + ImmutableMap.builder() + .put("market", "upfront") + .put(QueryRunnerTestHelper.uniqueMetric, 2L) + .put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, 3L) + .build() + ) + ) + ) + ); + assertExpectedResults(expectedResults, query); + } + @Test public void testTopNOverFirstLastAggregator() { diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java index 824aad70de91..bc0d8a7cefd1 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java @@ -91,7 +91,7 @@ public Aggregation toDruidAggregation( final AggregatorFactory aggregatorFactory; if (input.isDirectColumnAccess() && rowSignature.getColumnType(input.getDirectColumn()) == ValueType.COMPLEX) { - aggregatorFactory = new HyperUniquesAggregatorFactory(name, input.getDirectColumn()); + aggregatorFactory = new HyperUniquesAggregatorFactory(name, input.getDirectColumn(), false, true); } else { final SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName(); final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName); @@ -113,7 +113,7 @@ public Aggregation toDruidAggregation( virtualColumns.add(virtualColumn); } - aggregatorFactory = new CardinalityAggregatorFactory(name, ImmutableList.of(dimensionSpec), false); + aggregatorFactory = new CardinalityAggregatorFactory(name, null, ImmutableList.of(dimensionSpec), false, true); } return Aggregation.create(virtualColumns, aggregatorFactory).filter(filter); diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index 18dcf54ef5de..5df03a3bfe3d 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -2979,9 +2979,10 @@ public void testCountDistinct() throws Exception "a1", null, DIMS(new DefaultDimensionSpec("dim2", null)), - false + false, + true ), - new HyperUniquesAggregatorFactory("a2", "unique_dim1") + new HyperUniquesAggregatorFactory("a2", "unique_dim1", false, true) ) ) .context(TIMESERIES_CONTEXT_DEFAULT) @@ -3050,7 +3051,8 @@ public void testApproxCountDistinctWhenHllDisabled() throws Exception "a0", null, DIMS(new DefaultDimensionSpec("dim2", null)), - false + false, + true ) ) ) @@ -3136,19 +3138,22 @@ public void testApproxCountDistinct() throws Exception "a1", null, DIMS(new DefaultDimensionSpec("dim2", "dim2")), - false + false, + true ), new FilteredAggregatorFactory( new CardinalityAggregatorFactory( "a2", null, DIMS(new DefaultDimensionSpec("dim2", "dim2")), - false + false, + true ), NOT(SELECTOR("dim2", "", null)) ), new CardinalityAggregatorFactory( "a3", + null, DIMS( new ExtractionDimensionSpec( "dim2", @@ -3157,14 +3162,17 @@ public void testApproxCountDistinct() throws Exception new SubstringDimExtractionFn(0, 1) ) ), - false + false, + true ), new CardinalityAggregatorFactory( "a4", + null, DIMS(new DefaultDimensionSpec("a4:v", "a4:v", ValueType.STRING)), - false + false, + true ), - new HyperUniquesAggregatorFactory("a5", "unique_dim1") + new HyperUniquesAggregatorFactory("a5", "unique_dim1", false, true) ) ) .context(TIMESERIES_CONTEXT_DEFAULT) @@ -3562,7 +3570,7 @@ public void testCompareExactAndApproximateCountDistinctUsingSubquery() throws Ex .build() ), ImmutableList.of( - new Object[]{5L, 5L, -0.12226936f} + new Object[]{5L, 5L, 0.0f} ) ); } @@ -3683,7 +3691,8 @@ public void testCountDistinctArithmetic() throws Exception "a1", null, DIMS(new DefaultDimensionSpec("dim2", null)), - false + false, + true ) ) ) @@ -3697,7 +3706,7 @@ public void testCountDistinctArithmetic() throws Exception .build() ), ImmutableList.of( - new Object[]{6L, 3L, 3.0021994f, 1L, 4L, 4.9985347f} + new Object[]{6L, 3L, 3.0f, 1L, 4L, 5.0f} ) ); } @@ -3717,6 +3726,7 @@ public void testCountDistinctOfSubstring() throws Exception AGGS( new CardinalityAggregatorFactory( "a0", + null, DIMS( new ExtractionDimensionSpec( "dim1", @@ -3724,7 +3734,8 @@ public void testCountDistinctOfSubstring() throws Exception new SubstringDimExtractionFn(0, 1) ) ), - false + false, + true ) ) ) @@ -4408,8 +4419,10 @@ public void testCountDistinctOfLookup() throws Exception .aggregators(AGGS( new CardinalityAggregatorFactory( "a0", + null, ImmutableList.of(new ExtractionDimensionSpec("dim1", null, extractionFn)), - false + false, + true ) )) .context(TIMESERIES_CONTEXT_DEFAULT) From 21deb480dc94e1907101cf8bed96fe23725ddc71 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 25 Aug 2017 13:10:54 -0700 Subject: [PATCH 2/4] Fix some compile errors. --- .../indexer/BatchDeltaIngestionTest.java | 2 +- .../indexer/IndexGeneratorCombinerTest.java | 21 ++++++++++++++++--- .../hadoop/DatasourceRecordReaderTest.java | 2 +- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 829a52ea8f90..86cfc9ca59e6 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -425,7 +425,7 @@ private void verifyRows(List> expectedRows, List actualRows) Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum")); Assert.assertEquals( (Double) expected.get("unique_hosts"), - (Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")), + (Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts"), false), 0.001 ); } From 8fc74da022792aeef8d66f2559b4b9fe95cf81fe Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 25 Aug 2017 14:24:53 -0700 Subject: [PATCH 3/4] Fix test. --- .../test/java/io/druid/sql/calcite/CalciteQueryTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index 5df03a3bfe3d..9b24b7e7fd84 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -3557,8 +3557,10 @@ public void testCompareExactAndApproximateCountDistinctUsingSubquery() throws Ex new CountAggregatorFactory("a0"), new CardinalityAggregatorFactory( "a1", + null, DIMS(new DefaultDimensionSpec("d0", null)), - false + false, + true ) )) .setPostAggregatorSpecs( @@ -3706,7 +3708,7 @@ public void testCountDistinctArithmetic() throws Exception .build() ), ImmutableList.of( - new Object[]{6L, 3L, 3.0f, 1L, 4L, 5.0f} + new Object[]{6L, 3L, 3.0f, 2L, 5L, 5.0f} ) ); } From c0b4afc94e4a0ae18e540f6d40c124d985b509c2 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 25 Aug 2017 18:21:52 -0700 Subject: [PATCH 4/4] Formatting. --- .../CardinalityAggregatorFactory.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java index 8b3a6c4678f5..cf82c4d612a3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java @@ -211,14 +211,16 @@ public List getRequiredColumns() { return fields.stream() .map( - input -> new CardinalityAggregatorFactory( - input.getOutputName(), - null, - Collections.singletonList(input), - byRow, - round - ) - ).collect(Collectors.toList()); + field -> + new CardinalityAggregatorFactory( + field.getOutputName(), + null, + Collections.singletonList(field), + byRow, + round + ) + ) + .collect(Collectors.toList()); } @Override