Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions docs/content/querying/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,17 @@ instead of the cardinality aggregator if you do not care about the individual va
"type": "cardinality",
"name": "<output_name>",
"fields": [ <dimension1>, <dimension2>, ... ],
"byRow": <false | true> # (optional, defaults to false)
"byRow": <false | true> # (optional, defaults to false),
"round": <false | true> # (optional, defaults to false)
}
```

Each individual element of the "fields" list can be a String or [DimensionSpec](../querying/dimensionspecs.html). A String dimension in the fields list is equivalent to a DefaultDimensionSpec (no transformations).

The HyperLogLog algorithm generates decimal estimates with some error. "round" can be set to true to round off estimated
values to whole numbers. Note that even with rounding, the cardinality is still an estimate. The "round" field only
affects query-time behavior, and is ignored at ingestion-time.

#### Cardinality by value

When setting `byRow` to `false` (the default) it computes the cardinality of the set composed of the union of all dimension values for all the given dimensions.
Expand Down Expand Up @@ -315,12 +320,17 @@ Uses [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to
"type" : "hyperUnique",
"name" : <output_name>,
"fieldName" : <metric_name>,
"isInputHyperUnique" : false
"isInputHyperUnique" : false,
"round" : false
}
```

isInputHyperUnique can be set to true to index pre-computed HLL (Base64 encoded output from druid-hll is expected).
The isInputHyperUnique field only affects ingestion-time behavior, and is ignored at query time.
"isInputHyperUnique" can be set to true to index pre-computed HLL (Base64 encoded output from druid-hll is expected).
The "isInputHyperUnique" field only affects ingestion-time behavior, and is ignored at query-time.

The HyperLogLog algorithm generates decimal estimates with some error. "round" can be set to true to round off estimated
values to whole numbers. Note that even with rounding, the cardinality is still an estimate. The "round" field only
affects query-time behavior, and is ignored at ingestion-time.

For more approximate aggregators, please see [theta sketches](../development/extensions-core/datasketches-aggregators.html).

Expand Down
10 changes: 9 additions & 1 deletion docs/content/querying/post-aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ JavaScript-based functionality is disabled by default. Please refer to the Druid
The hyperUniqueCardinality post aggregator is used to wrap a hyperUnique object such that it can be used in post aggregations.

```json
{ "type" : "hyperUniqueCardinality", "name": <output name>, "fieldName" : <the name field value of the hyperUnique aggregator>}
{
"type" : "hyperUniqueCardinality",
"name": <output name>,
"fieldName" : <the name field value of the hyperUnique aggregator>
}
```

It can be used in a sample calculation as so:
Expand All @@ -128,6 +132,10 @@ It can be used in a sample calculation as so:
}]
```

This post-aggregator will inherit the rounding behavior of the aggregator it references. Note that this inheritance
is only effective if you directly reference an aggregator. Going through another post-aggregator, for example, will
cause the user-specified rounding behavior to get lost and default to "no rounding".

## Example Usage

In this example, let’s calculate a simple percentage using post aggregators. Let’s imagine our data set has a metric called "total".
Expand Down
5 changes: 5 additions & 0 deletions hll/src/main/java/io/druid/hll/HyperLogLogCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,11 @@ public byte[] toByteArray()
return theBytes;
}

public long estimateCardinalityRound()
{
return Math.round(estimateCardinality());
}

public double estimateCardinality()
{
if (estimatedCardinality == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ protected void reduce(
}
).writeValue(
out,
new Double(aggregate.estimateCardinality()).longValue()
aggregate.estimateCardinalityRound()
);
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ private void verifyRows(List<ImmutableMap<String, Object>> expectedRows, List<In
Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
Assert.assertEquals(
(Double) expected.get("unique_hosts"),
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")),
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts"), false),
0.001
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,14 @@ public void testMultipleRowsMerged() throws Exception
Assert.assertEquals(ImmutableList.of(), capturedRow.getDimension("host"));
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow.getDimension("keywords"));
Assert.assertEquals(15, capturedRow.getLongMetric("visited_sum"));
Assert.assertEquals(2.0, (Double) HyperUniquesAggregatorFactory.estimateCardinality(capturedRow.getRaw("unique_hosts")), 0.001);
Assert.assertEquals(
2.0,
(Double) HyperUniquesAggregatorFactory.estimateCardinality(
capturedRow.getRaw("unique_hosts"),
false
),
0.001
);
}

@Test
Expand Down Expand Up @@ -250,13 +257,21 @@ public void testMultipleRowsNotMerged() throws Exception
Assert.assertEquals(Collections.singletonList("host1"), capturedRow1.getDimension("host"));
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow1.getDimension("keywords"));
Assert.assertEquals(10, capturedRow1.getLongMetric("visited_sum"));
Assert.assertEquals(1.0, (Double) HyperUniquesAggregatorFactory.estimateCardinality(capturedRow1.getRaw("unique_hosts")), 0.001);
Assert.assertEquals(
1.0,
(Double) HyperUniquesAggregatorFactory.estimateCardinality(capturedRow1.getRaw("unique_hosts"), false),
0.001
);

InputRow capturedRow2 = InputRowSerde.fromBytes(captureVal2.getValue().getBytes(), aggregators);
Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow2.getDimensions());
Assert.assertEquals(Collections.singletonList("host2"), capturedRow2.getDimension("host"));
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow2.getDimension("keywords"));
Assert.assertEquals(5, capturedRow2.getLongMetric("visited_sum"));
Assert.assertEquals(1.0, (Double) HyperUniquesAggregatorFactory.estimateCardinality(capturedRow2.getRaw("unique_hosts")), 0.001);
Assert.assertEquals(
1.0,
(Double) HyperUniquesAggregatorFactory.estimateCardinality(capturedRow2.getRaw("unique_hosts"), false),
0.001
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private void verifyRows(List<InputRow> actualRows)
Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
Assert.assertEquals(
(Double) expected.get("unique_hosts"),
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")),
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts"), false),
0.001
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ private static ShardSpecs createShardSpecsFromInput(

final int numShards;
if (determineNumPartitions) {
final long numRows = new Double(collector.estimateCardinality()).longValue();
final long numRows = collector.estimateCardinalityRound();
numShards = (int) Math.ceil((double) numRows / tuningConfig.getTargetPartitionSize());
log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.cache.CacheKeyBuilder;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionHandlerUtils;
import org.apache.commons.codec.binary.Base64;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public class CardinalityAggregatorFactory extends AggregatorFactory
{
Expand Down Expand Up @@ -87,28 +89,21 @@ public DimensionSpec apply(String input)
);
}

public static Object estimateCardinality(Object object)
{
if (object == null) {
return 0;
}

return ((HyperLogLogCollector) object).estimateCardinality();
}

private static final CardinalityAggregatorColumnSelectorStrategyFactory STRATEGY_FACTORY =
new CardinalityAggregatorColumnSelectorStrategyFactory();

private final String name;
private final List<DimensionSpec> fields;
private final boolean byRow;
private final boolean round;

@JsonCreator
public CardinalityAggregatorFactory(
@JsonProperty("name") String name,
@Deprecated @JsonProperty("fieldNames") final List<String> fieldNames,
@JsonProperty("fields") final List<DimensionSpec> fields,
@JsonProperty("byRow") final boolean byRow
@JsonProperty("byRow") final boolean byRow,
@JsonProperty("round") final boolean round
)
{
this.name = name;
Expand All @@ -123,6 +118,7 @@ public CardinalityAggregatorFactory(
this.fields = fields;
}
this.byRow = byRow;
this.round = round;
}

public CardinalityAggregatorFactory(
Expand All @@ -131,7 +127,7 @@ public CardinalityAggregatorFactory(
final boolean byRow
)
{
this(name, null, fields, byRow);
this(name, null, fields, byRow, false);
}

@Override
Expand Down Expand Up @@ -201,7 +197,7 @@ public AggregateCombiner makeAggregateCombiner()
@Override
public AggregatorFactory getCombiningFactory()
{
return new HyperUniquesAggregatorFactory(name, name);
return new HyperUniquesAggregatorFactory(name, name, false, round);
}

@Override
Expand All @@ -213,17 +209,18 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Lists.transform(
fields,
new Function<DimensionSpec, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(DimensionSpec input)
{
return new CardinalityAggregatorFactory(input.getOutputName(), Collections.singletonList(input), byRow);
}
}
);
return fields.stream()
.map(
field ->
new CardinalityAggregatorFactory(
field.getOutputName(),
null,
Collections.singletonList(field),
byRow,
round
)
)
.collect(Collectors.toList());
}

@Override
Expand All @@ -249,7 +246,7 @@ public Object deserialize(Object object)

public Object finalizeComputation(Object object)
{
return estimateCardinality(object);
return HyperUniquesAggregatorFactory.estimateCardinality(object, round);
}

@Override
Expand Down Expand Up @@ -277,25 +274,20 @@ public boolean isByRow()
return byRow;
}

@JsonProperty
public boolean isRound()
{
return round;
}

@Override
public byte[] getCacheKey()
{
List<byte[]> dimSpecKeys = new ArrayList<>();
int dimSpecKeysLength = fields.size();
for (DimensionSpec dimSpec : fields) {
byte[] dimSpecKey = dimSpec.getCacheKey();
dimSpecKeysLength += dimSpecKey.length;
dimSpecKeys.add(dimSpec.getCacheKey());
}

ByteBuffer retBuf = ByteBuffer.allocate(2 + dimSpecKeysLength);
retBuf.put(AggregatorUtil.CARD_CACHE_TYPE_ID);
for (byte[] dimSpecKey : dimSpecKeys) {
retBuf.put(dimSpecKey);
retBuf.put(AggregatorUtil.STRING_SEPARATOR);
}
retBuf.put((byte) (byRow ? 1 : 0));
return retBuf.array();
return new CacheKeyBuilder(AggregatorUtil.CARD_CACHE_TYPE_ID)
.appendCacheables(fields)
.appendBoolean(byRow)
.appendBoolean(round)
.build();
}

@Override
Expand All @@ -311,42 +303,35 @@ public int getMaxIntermediateSize()
}

@Override
public boolean equals(Object o)
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

CardinalityAggregatorFactory that = (CardinalityAggregatorFactory) o;

if (isByRow() != that.isByRow()) {
return false;
}
if (!getName().equals(that.getName())) {
return false;
}
return getFields().equals(that.getFields());

final CardinalityAggregatorFactory that = (CardinalityAggregatorFactory) o;
return byRow == that.byRow &&
round == that.round &&
Objects.equals(name, that.name) &&
Objects.equals(fields, that.fields);
}

@Override
public int hashCode()
{
int result = getName().hashCode();
result = 31 * result + getFields().hashCode();
result = 31 * result + (isByRow() ? 1 : 0);
return result;
return Objects.hash(name, fields, byRow, round);
}

@Override
public String toString()
{
return "CardinalityAggregatorFactory{" +
"name='" + name + '\'' +
", fields='" + fields + '\'' +
", fields=" + fields +
", byRow=" + byRow +
", round=" + round +
'}';
}
}
Loading