Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.java.util.common.Pair;

import javax.annotation.Nullable;
import java.util.Comparator;

public class SerializablePair<T1, T2> extends Pair<T1, T2>
{
Expand All @@ -45,4 +46,25 @@ public T2 getRhs()
{
return rhs;
}

public static <T1, T2> Comparator<SerializablePair<T1, T2>> createNullHandlingComparator(
Comparator<T2> delegate,
boolean nullsFirst
)
{
final int firstIsNull = nullsFirst ? -1 : 1;
final int secondIsNull = nullsFirst ? 1 : -1;
return (o1, o2) -> {
if (o1 == null || o1.rhs == null) {
if (o2 == null || o2.rhs == null) {
return 0;
}
return firstIsNull;
}
if (o2 == null || o2.rhs == null) {
return secondIsNull;
}
return delegate.compare(o1.rhs, o2.rhs);
};
}
}
12 changes: 6 additions & 6 deletions docs/querying/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

#### `doubleFirst` aggregator

`doubleFirst` computes the metric value with the minimum timestamp or 0 if no row exist
`doubleFirst` computes the metric value with the minimum timestamp or 0 in default mode or `null` in SQL compatible mode if no row exist

```json
{
Expand All @@ -148,7 +148,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

#### `doubleLast` aggregator

`doubleLast` computes the metric value with the maximum timestamp or 0 if no row exist
`doubleLast` computes the metric value with the maximum timestamp or 0 in default mode or `null` in SQL compatible mode if no row exist

```json
{
Expand All @@ -160,7 +160,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

#### `floatFirst` aggregator

`floatFirst` computes the metric value with the minimum timestamp or 0 if no row exist
`floatFirst` computes the metric value with the minimum timestamp or 0 in default mode or `null` in SQL compatible mode if no row exist

```json
{
Expand All @@ -172,7 +172,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

#### `floatLast` aggregator

`floatLast` computes the metric value with the maximum timestamp or 0 if no row exist
`floatLast` computes the metric value with the maximum timestamp or 0 in default mode or `null` in SQL compatible mode if no row exist

```json
{
Expand All @@ -184,7 +184,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

#### `longFirst` aggregator

`longFirst` computes the metric value with the minimum timestamp or 0 if no row exist
`longFirst` computes the metric value with the minimum timestamp or 0 in default mode or `null` in SQL compatible mode if no row exist

```json
{
Expand All @@ -196,7 +196,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

#### `longLast` aggregator

`longLast` computes the metric value with the maximum timestamp or 0 if no row exist
`longLast` computes the metric value with the maximum timestamp or 0 in default mode or `null` in SQL compatible mode if no row exist

```json
{
Expand Down
4 changes: 2 additions & 2 deletions docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ Only the COUNT aggregation can accept DISTINCT.
|`STDDEV_POP(expr)`|Computes standard deviation population of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|`EARLIEST(expr)`|Returns the earliest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.|
|`EARLIEST(expr)`|Returns the earliest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.|
|`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
|`LATEST(expr)`|Returns the latest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|
|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|
|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|

For advice on choosing approximate aggregation functions, check out our [approximate aggregations documentation](aggregations.html#approx).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,29 @@
package org.apache.druid.query.aggregation.first;

import org.apache.druid.collections.SerializablePair;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;

public class DoubleFirstAggregator implements Aggregator
public class DoubleFirstAggregator extends NumericFirstAggregator<BaseDoubleColumnValueSelector>
{

private final BaseDoubleColumnValueSelector valueSelector;
private final BaseLongColumnValueSelector timeSelector;

protected long firstTime;
protected double firstValue;
double firstValue;

public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector, BaseDoubleColumnValueSelector valueSelector)
{
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;

firstTime = Long.MAX_VALUE;
super(timeSelector, valueSelector);
firstValue = 0;
}

@Override
public void aggregate()
void setCurrentValue()
{
long time = timeSelector.getLong();
if (time < firstTime) {
firstTime = time;
firstValue = valueSelector.getDouble();
}
firstValue = valueSelector.getDouble();
}

@Override
public Object get()
{
return new SerializablePair<>(firstTime, firstValue);
return new SerializablePair<>(firstTime, rhsNull ? null : firstValue);
}

@Override
Expand All @@ -75,11 +62,5 @@ public long getLong()
{
return (long) firstValue;
}

@Override
public void close()
{

}
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnHolder;

import javax.annotation.Nullable;
Expand All @@ -45,10 +46,34 @@
import java.util.Map;
import java.util.Objects;

public class DoubleFirstAggregatorFactory extends NullableNumericAggregatorFactory<ColumnValueSelector>
public class DoubleFirstAggregatorFactory extends AggregatorFactory
{
private static final Aggregator NIL_AGGREGATOR = new DoubleFirstAggregator(
NilColumnValueSelector.instance(),
NilColumnValueSelector.instance()
)
{
@Override
public void aggregate()
{
// no-op
}
};

private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new DoubleFirstBufferAggregator(
NilColumnValueSelector.instance(),
NilColumnValueSelector.instance()
)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
// no-op
}
};

public static final Comparator<SerializablePair<Long, Double>> VALUE_COMPARATOR =
Comparator.comparingDouble(o -> o.rhs);
SerializablePair.createNullHandlingComparator(Double::compare, true);

private final String fieldName;
private final String name;
Expand All @@ -69,24 +94,31 @@ public DoubleFirstAggregatorFactory(
}

@Override
protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return metricFactory.makeColumnValueSelector(fieldName);
}

@Override
protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector)
{
return new DoubleFirstAggregator(metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), selector);
final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName);
if (valueSelector instanceof NilColumnValueSelector) {
return NIL_AGGREGATOR;
} else {
return new DoubleFirstAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
valueSelector
);
}
}

@Override
protected BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, ColumnValueSelector selector)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleFirstBufferAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector
);
final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName);
if (valueSelector instanceof NilColumnValueSelector) {
return NIL_BUFFER_AGGREGATOR;
} else {
return new DoubleFirstBufferAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
valueSelector
);
}
}

@Override
Expand Down Expand Up @@ -126,35 +158,54 @@ public AggregatorFactory getCombiningFactory()
return new DoubleFirstAggregatorFactory(name, name)
{
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final ColumnValueSelector<SerializablePair<Long, Double>> selector =
metricFactory.makeColumnValueSelector(name);
return new DoubleFirstAggregator(null, null)
{
@Override
public void aggregate()
{
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.getObject();
SerializablePair<Long, Double> pair = selector.getObject();
if (pair.lhs < firstTime) {
firstTime = pair.lhs;
firstValue = pair.rhs;
if (pair.rhs != null) {
firstValue = pair.rhs;
rhsNull = false;
} else {
rhsNull = true;
}
}
}
};
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, ColumnValueSelector selector)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final ColumnValueSelector<SerializablePair<Long, Double>> selector =
metricFactory.makeColumnValueSelector(name);
return new DoubleFirstBufferAggregator(null, null)
{
@Override
public void putValue(ByteBuffer buf, int position)
{
SerializablePair<Long, Double> pair = selector.getObject();
buf.putDouble(position, pair.rhs);
}

@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.getObject();
long firstTime = buf.getLong(position);
if (pair.lhs < firstTime) {
buf.putLong(position, pair.lhs);
buf.putDouble(position + Long.BYTES, pair.rhs);
if (pair.rhs != null) {
updateTimeWithValue(buf, position, pair.lhs);
} else {
updateTimeWithNull(buf, position, pair.lhs);
}
}
}

Expand All @@ -178,6 +229,9 @@ public List<AggregatorFactory> getRequiredColumns()
public Object deserialize(Object object)
{
Map map = (Map) object;
if (map.get("rhs") == null) {
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), null);
}
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue());
}

Expand Down Expand Up @@ -221,16 +275,15 @@ public byte[] getCacheKey()
@Override
public String getTypeName()
{
if (storeDoubleAsFloat) {
return "float";
}
return "double";
// if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde
return storeDoubleAsFloat ? "float" : "double";
}

@Override
public int getMaxIntermediateSize()
{
return Long.BYTES + Double.BYTES;
// timestamp, is null, value
return Long.BYTES + Byte.BYTES + Double.BYTES;
}

@Override
Expand Down
Loading