Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/content/querying/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ Computes the sum of values as 64-bit floating point value. Similar to `longSum`
{ "type" : "longMax", "name" : <output_name>, "fieldName" : <metric_name> }
```

### Average aggregator

Computes the average of values as 64-bit floating point value

```json
{ "type" : "avg", "name" : <output_name>, "fieldName" : <metric_name>, "inputType" : <inputType> }
```

`name` – output name for the averaged value
`fieldName` – name of the metric column to average over
`inputType` is one of `float`/`long`/`avg`, by default it is `float`

If `fieldName` column is pre-averaged in ingestion phase, the `inputType` should be `avg` in query

### First / Last aggregator

First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import io.druid.query.aggregation.LongMinAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.avg.AvgAggregatorFactory;
import io.druid.query.aggregation.avg.AvgSerde;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
Expand Down Expand Up @@ -71,6 +73,10 @@ public AggregatorsModule()
ComplexMetrics.registerSerde("preComputedHyperUnique", new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault()));
}

if (ComplexMetrics.getSerdeForType("avg") == null) {
ComplexMetrics.registerSerde("avg", new AvgSerde());
}

setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
}
Expand All @@ -84,6 +90,7 @@ public AggregatorsModule()
@JsonSubTypes.Type(name = "doubleMin", value = DoubleMinAggregatorFactory.class),
@JsonSubTypes.Type(name = "longMax", value = LongMaxAggregatorFactory.class),
@JsonSubTypes.Type(name = "longMin", value = LongMinAggregatorFactory.class),
@JsonSubTypes.Type(name = "avg", value = AvgAggregatorFactory.class),
@JsonSubTypes.Type(name = "javascript", value = JavaScriptAggregatorFactory.class),
@JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class),
@JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.avg;

import io.druid.query.aggregation.Aggregator;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;

/**
*/
public abstract class AvgAggregator implements Aggregator
{
protected final AvgAggregatorCollector holder = new AvgAggregatorCollector();

@Override
public void reset()
{
holder.reset();
}

@Override
public Object get()
{
return holder;
}

@Override
public void close()
{
}

@Override
public float getFloat()
{
return (float) holder.compute();
}

@Override
public long getLong()
{
return (long) holder.compute();
}

public static final class FloatAvgAggregator extends AvgAggregator
{
private final FloatColumnSelector selector;

public FloatAvgAggregator(FloatColumnSelector selector)
{
this.selector = selector;
}

@Override
public void aggregate()
{
holder.add(selector.get());
}
}

public static final class LongAvgAggregator extends AvgAggregator
{
private final LongColumnSelector selector;

public LongAvgAggregator(LongColumnSelector selector)
{
this.selector = selector;
}

@Override
public void aggregate()
{
holder.add(selector.get());
}
}

public static final class ObjectAvgAggregator extends AvgAggregator
{
private final ObjectColumnSelector selector;

public ObjectAvgAggregator(ObjectColumnSelector selector)
{
this.selector = selector;
}

@Override
public void aggregate()
{
AvgAggregatorCollector.combineValues(holder, selector.get());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation.avg;

import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;

import java.nio.ByteBuffer;
import java.util.Comparator;

public class AvgAggregatorCollector
{
public static AvgAggregatorCollector from(ByteBuffer buffer)
{
return new AvgAggregatorCollector(buffer.getLong(), buffer.getDouble());
}

public static final Comparator<AvgAggregatorCollector> COMPARATOR = new Comparator<AvgAggregatorCollector>()
{
@Override
public int compare(AvgAggregatorCollector o1, AvgAggregatorCollector o2)
{
int compare = Longs.compare(o1.count, o2.count);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comparator should compare the computed averages from o1 and o2 instead of comparing the count/sum separately, otherwise anything sorting by an AvgAggregator could get results in an incorrect order, e.g.:

AvgAgg(count=5, sum=100) should be less than AvgAgg(count=1, sum=500000)

Can you add a test for this behavior where an AvgAggregator determines the sorting order?

if (compare != 0) {
return compare;
}
return Doubles.compare(o1.sum, o2.sum);
}
};

static Object combineValues(Object lhs, Object rhs)
{
final AvgAggregatorCollector holder1 = (AvgAggregatorCollector) lhs;
final AvgAggregatorCollector holder2 = (AvgAggregatorCollector) rhs;

if (holder2.count == 0) {
return holder1;
}
if (holder1.count == 0) {
holder1.count = holder2.count;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this just return holder2 instead of copying its values into holder1?

holder1.sum = holder2.sum;
return holder1;
}

holder1.count += holder2.count;
holder1.sum += holder2.sum;

return holder1;
}

static int getMaxIntermediateSize()
{
return Longs.BYTES + Doubles.BYTES;
}

long count; // number of elements
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have these use getters/setters

double sum; // sum of elements

public AvgAggregatorCollector()
{
this(0, 0);
}

public void reset()
{
count = 0;
sum = 0;
}

public AvgAggregatorCollector(long count, double sum)
{
this.count = count;
this.sum = sum;
}

public AvgAggregatorCollector add(float v)
{
count++;
sum += v;
return this;
}

public AvgAggregatorCollector add(long v)
{
count++;
sum += v;
return this;
}

public double compute()
{
if (count == 0) {
throw new IllegalStateException("should not be empty holder");
Copy link
Copy Markdown
Contributor

@jon-wei jon-wei Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should return NaN or 0 instead of throwing the exception, it's possible for an aggregator to not receive any values (suppose the AvgAggregator is wrapped in a FilteredAggregator that doesn't match any rows in a particular segment)

Can you also add a test for this case?

}
return sum / count;
}

@JsonValue
public byte[] toByteArray()
{
final ByteBuffer buffer = toByteBuffer();
buffer.flip();
byte[] theBytes = new byte[buffer.remaining()];
buffer.get(theBytes);

return theBytes;
}

public ByteBuffer toByteBuffer()
{
return ByteBuffer.allocate(Longs.BYTES + Doubles.BYTES)
.putLong(count)
.putDouble(sum);
}

@VisibleForTesting
boolean equalsWithEpsilon(AvgAggregatorCollector o, double epsilon)
{
if (this == o) {
return true;
}

if (count != o.count) {
return false;
}
if (Math.abs(sum - o.sum) > epsilon) {
return false;
}

return true;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

AvgAggregatorCollector that = (AvgAggregatorCollector) o;

if (count != that.count) {
return false;
}
if (Double.compare(that.sum, sum) != 0) {
return false;
}

return true;
}

@Override
public int hashCode()
{
int result;
long temp;
result = (int) (count ^ (count >>> 32));
temp = Double.doubleToLongBits(sum);
result = 31 * result + (int) (temp ^ (temp >>> 32));
return result;
}

@Override
public String toString()
{
return "AvgAggregatorCollector{" +
"count=" + count +
", sum=" + sum +
'}';
}
}
Loading