Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.java.util.common.Pair;

import javax.annotation.Nullable;
import java.util.Comparator;

public class SerializablePair<T1, T2> extends Pair<T1, T2>
{
Expand All @@ -45,4 +46,25 @@ public T2 getRhs()
{
return rhs;
}

public static <T1, T2> Comparator<SerializablePair<T1, T2>> createNullHandlingComparator(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this is missing unit tests? Also javadocs since this is a utility that would be used by many other classes

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is covered by calcite query tests that order by the first/last aggregators; afaik SerializedPair is only used by first/last despite it's generic name, to store a timestamp and value.

Comparator<T2> delegate,
boolean nullsFirst
)
{
final int firstIsNull = nullsFirst ? -1 : 1;
final int secondIsNull = nullsFirst ? 1 : -1;
return (o1, o2) -> {
if (o1 == null || o1.rhs == null) {
if (o2 == null || o2.rhs == null) {
return 0;
}
return firstIsNull;
}
if (o2 == null || o2.rhs == null) {
return secondIsNull;
}
return delegate.compare(o1.rhs, o2.rhs);
};
}
}
12 changes: 6 additions & 6 deletions docs/querying/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

#### `doubleFirst` aggregator

`doubleFirst` computes the metric value with the minimum timestamp or 0 if no row exist
`doubleFirst` computes the metric value with the minimum timestamp or 0 in default mode or `null` in SQL compatible mode if no row exist
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to this property - https://github.com/apache/druid/blob/master/docs/configuration/index.md#sql-compatible-null-handling ? Would be nice to link to the configuration here.

nit: I'd re-phrase slightly

computes the metric value with the minimum timestamp. If no row exists, it will return 0 or `null` if [SQL compatible mode](../configuration/index.md#sql-compatible-null-handling) is enabled


```json
{
Expand All @@ -148,7 +148,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

#### `doubleLast` aggregator

`doubleLast` computes the metric value with the maximum timestamp or 0 if no row exist
`doubleLast` computes the metric value with the maximum timestamp or 0 in default mode or `null` in SQL compatible mode if no row exist

```json
{
Expand All @@ -160,7 +160,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

#### `floatFirst` aggregator

`floatFirst` computes the metric value with the minimum timestamp or 0 if no row exist
`floatFirst` computes the metric value with the minimum timestamp or 0 in default mode or `null` in SQL compatible mode if no row exist

```json
{
Expand All @@ -172,7 +172,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

#### `floatLast` aggregator

`floatLast` computes the metric value with the maximum timestamp or 0 if no row exist
`floatLast` computes the metric value with the maximum timestamp or 0 in default mode or `null` in SQL compatible mode if no row exist

```json
{
Expand All @@ -184,7 +184,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

#### `longFirst` aggregator

`longFirst` computes the metric value with the minimum timestamp or 0 if no row exist
`longFirst` computes the metric value with the minimum timestamp or 0 in default mode or `null` in SQL compatible mode if no row exist

```json
{
Expand All @@ -196,7 +196,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

#### `longLast` aggregator

`longLast` computes the metric value with the maximum timestamp or 0 if no row exist
`longLast` computes the metric value with the maximum timestamp or 0 in default mode or `null` in SQL compatible mode if no row exist

```json
{
Expand Down
4 changes: 2 additions & 2 deletions docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ Only the COUNT aggregation can accept DISTINCT.
|`STDDEV_POP(expr)`|Computes standard deviation population of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.html) documentation for additional details.|
|`EARLIEST(expr)`|Returns the earliest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.|
|`EARLIEST(expr)`|Returns the earliest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.|
|`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
|`LATEST(expr)`|Returns the latest non-null value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|
|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|
|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
|`ANY_VALUE(expr)`|Returns any value of `expr`, which must be numeric. If `druid.generic.useDefaultValueForNull=true` this can return the default value for null and does not prefer "non-null" values over the default value for null. If `druid.generic.useDefaultValueForNull=false`, then this will return any non-null value of `expr`|
|`ANY_VALUE(expr, maxBytesPerString)`|Like `ANY_VALUE(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,29 @@
package org.apache.druid.query.aggregation.first;

import org.apache.druid.collections.SerializablePair;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;

public class DoubleFirstAggregator implements Aggregator
public class DoubleFirstAggregator extends NumericFirstAggregator<BaseDoubleColumnValueSelector>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice abstraction! 🎉

note to self: can the get call be abstracted into the base class?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not without boxing the primitive

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I guess that is going to happen anyway in making the pair.. so I guess maybe the on heap version of get could be shared, but not really possible for the buffer aggregator.

{

private final BaseDoubleColumnValueSelector valueSelector;
private final BaseLongColumnValueSelector timeSelector;

protected long firstTime;
protected double firstValue;
double firstValue;

public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector, BaseDoubleColumnValueSelector valueSelector)
{
this.valueSelector = valueSelector;
this.timeSelector = timeSelector;

firstTime = Long.MAX_VALUE;
super(timeSelector, valueSelector);
firstValue = 0;
}

@Override
public void aggregate()
void setCurrentValue()
{
long time = timeSelector.getLong();
if (time < firstTime) {
firstTime = time;
firstValue = valueSelector.getDouble();
}
firstValue = valueSelector.getDouble();
}

@Override
public Object get()
{
return new SerializablePair<>(firstTime, firstValue);
return new SerializablePair<>(firstTime, rhsNull ? null : firstValue);
}

@Override
Expand All @@ -75,11 +62,5 @@ public long getLong()
{
return (long) firstValue;
}

@Override
public void close()
{

}
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnHolder;

import javax.annotation.Nullable;
Expand All @@ -45,10 +46,34 @@
import java.util.Map;
import java.util.Objects;

public class DoubleFirstAggregatorFactory extends NullableNumericAggregatorFactory<ColumnValueSelector>
public class DoubleFirstAggregatorFactory extends AggregatorFactory
{
private static final Aggregator NIL_AGGREGATOR = new DoubleFirstAggregator(
NilColumnValueSelector.instance(),
NilColumnValueSelector.instance()
)
{
@Override
public void aggregate()
{
// no-op
}
};

private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new DoubleFirstBufferAggregator(
NilColumnValueSelector.instance(),
NilColumnValueSelector.instance()
)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
// no-op
}
};

public static final Comparator<SerializablePair<Long, Double>> VALUE_COMPARATOR =
Comparator.comparingDouble(o -> o.rhs);
SerializablePair.createNullHandlingComparator(Double::compare, true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a long time to try and figure out what the comparator was used for. I got wrapped up in the fact that the aggregator was meant compare timestamps, that I didn't realize this was for ordering. I think a javadoc on #AggregatorFactory#getComparator would have cleared up my confusion pretty quickly


private final String fieldName;
private final String name;
Expand All @@ -69,24 +94,31 @@ public DoubleFirstAggregatorFactory(
}

@Override
protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return metricFactory.makeColumnValueSelector(fieldName);
}

@Override
protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector)
{
return new DoubleFirstAggregator(metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), selector);
final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName);
if (valueSelector instanceof NilColumnValueSelector) {
return NIL_AGGREGATOR;
} else {
return new DoubleFirstAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
valueSelector
);
}
}

@Override
protected BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, ColumnValueSelector selector)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleFirstBufferAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
selector
);
final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName);
if (valueSelector instanceof NilColumnValueSelector) {
return NIL_BUFFER_AGGREGATOR;
} else {
return new DoubleFirstBufferAggregator(
metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
valueSelector
);
}
}

@Override
Expand Down Expand Up @@ -126,35 +158,54 @@ public AggregatorFactory getCombiningFactory()
return new DoubleFirstAggregatorFactory(name, name)
{
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector)
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final ColumnValueSelector<SerializablePair<Long, Double>> selector =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on javadocs in makeColumnValueSelector this selector can be NilColumnValueSelector

in which case selector.getObject() on line 170 would return null and line 171 would throw an NPE?
similar comment for the factorizeBuffered method

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is for combining, the selector will be of the serialized pair complex objects, which will not be null.

metricFactory.makeColumnValueSelector(name);
return new DoubleFirstAggregator(null, null)
{
@Override
public void aggregate()
{
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.getObject();
SerializablePair<Long, Double> pair = selector.getObject();
if (pair.lhs < firstTime) {
firstTime = pair.lhs;
firstValue = pair.rhs;
if (pair.rhs != null) {
firstValue = pair.rhs;
rhsNull = false;
} else {
rhsNull = true;
}
}
}
};
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, ColumnValueSelector selector)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final ColumnValueSelector<SerializablePair<Long, Double>> selector =
metricFactory.makeColumnValueSelector(name);
return new DoubleFirstBufferAggregator(null, null)
{
@Override
public void putValue(ByteBuffer buf, int position)
{
SerializablePair<Long, Double> pair = selector.getObject();
buf.putDouble(position, pair.rhs);
}

@Override
public void aggregate(ByteBuffer buf, int position)
{
SerializablePair<Long, Double> pair = (SerializablePair<Long, Double>) selector.getObject();
long firstTime = buf.getLong(position);
if (pair.lhs < firstTime) {
buf.putLong(position, pair.lhs);
buf.putDouble(position + Long.BYTES, pair.rhs);
if (pair.rhs != null) {
updateTimeWithValue(buf, position, pair.lhs);
} else {
updateTimeWithNull(buf, position, pair.lhs);
}
}
}

Expand All @@ -178,6 +229,9 @@ public List<AggregatorFactory> getRequiredColumns()
public Object deserialize(Object object)
{
Map map = (Map) object;
if (map.get("rhs") == null) {
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), null);
}
return new SerializablePair<>(((Number) map.get("lhs")).longValue(), ((Number) map.get("rhs")).doubleValue());
}

Expand Down Expand Up @@ -221,16 +275,15 @@ public byte[] getCacheKey()
@Override
public String getTypeName()
{
if (storeDoubleAsFloat) {
return "float";
}
return "double";
// if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde
return storeDoubleAsFloat ? "float" : "double";
}

@Override
public int getMaxIntermediateSize()
{
return Long.BYTES + Double.BYTES;
// timestamp, is null, value
return Long.BYTES + Byte.BYTES + Double.BYTES;
}

@Override
Expand Down
Loading