Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/querying/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ Note that queries with first/last aggregators on a segment created with rollup e

(Double/Float/Long/String) ANY aggregator cannot be used in ingestion spec, and should only be specified as part of queries.

If `druid.generic.useDefaultValueForNull=true` aggregation can returns the default value for null and does not prefer "non-null" values over the default value for null. If `druid.generic.useDefaultValueForNull=false`, then aggregation will returns any non-null value.
Returns any value including null. This aggregator can simplify and optimize the performance by returning the first encountered value (including null)

#### `doubleAny` aggregator

Expand Down
2 changes: 1 addition & 1 deletion docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ Only the COUNT aggregation can accept DISTINCT.
|`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|
|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|
|`ANY_VALUE(expr)`|Returns any value of `expr`, which must be numeric. If `druid.generic.useDefaultValueForNull=true` this can return the default value for null and does not prefer "non-null" values over the default value for null. If `druid.generic.useDefaultValueForNull=false`, then this will return any non-null value of `expr`|
|`ANY_VALUE(expr)`|Returns any value of `expr` including null. `expr` must be numeric. This aggregator can simplify and optimize the performance by returning the first encountered value (including null)|
|`ANY_VALUE(expr, maxBytesPerString)`|Like `ANY_VALUE(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,31 @@

package org.apache.druid.query.aggregation.any;

import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;

/**
* This Aggregator is created by the {@link DoubleAnyAggregatorFactory} which extends from
* {@link NullableNumericAggregatorFactory}. If null needs to be handle, then {@link NullableNumericAggregatorFactory}
* will wrap this aggregator in {@link NullableNumericAggregator} and can handle all null in that class.
* Hence, no null will ever be pass into this aggregator from the valueSelector.
*/
public class DoubleAnyAggregator implements Aggregator
{
private final BaseDoubleColumnValueSelector valueSelector;
import javax.annotation.Nullable;

public class DoubleAnyAggregator extends NumericAnyAggregator<BaseDoubleColumnValueSelector>
{
private double foundValue;
private boolean isFound;

public DoubleAnyAggregator(BaseDoubleColumnValueSelector valueSelector)
{
this.valueSelector = valueSelector;
super(valueSelector);
this.foundValue = 0;
this.isFound = false;
}

@Override
public void aggregate()
void setFoundValue()
{
if (!isFound) {
foundValue = valueSelector.getDouble();
isFound = true;
}
foundValue = valueSelector.getDouble();
}

@Override
@Nullable
public Object get()
{
return foundValue;
return isNull ? null : foundValue;
}

@Override
Expand All @@ -76,10 +63,4 @@ public double getDouble()
{
return foundValue;
}

@Override
public void close()
{
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,103 +19,212 @@

package org.apache.druid.query.aggregation.any;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.math.expr.ExprMacroTable;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.SimpleDoubleAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnHolder;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;

public class DoubleAnyAggregatorFactory extends SimpleDoubleAggregatorFactory
public class DoubleAnyAggregatorFactory extends AggregatorFactory
{
private static final Comparator<Double> VALUE_COMPARATOR = Comparator.nullsFirst(Double::compare);

private static final Aggregator NIL_AGGREGATOR = new DoubleAnyAggregator(
NilColumnValueSelector.instance()
)
{
@Override
public void aggregate()
{
// no-op
}
};

private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new DoubleAnyBufferAggregator(
NilColumnValueSelector.instance()
)
{
@Override
public void aggregate(ByteBuffer buf, int position)
{
// no-op
}
};

private final String fieldName;
private final String name;
private final boolean storeDoubleAsFloat;

@JsonCreator
public DoubleAnyAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") final String fieldName,
@JsonProperty("expression") @Nullable String expression,
@JacksonInject ExprMacroTable macroTable
@JsonProperty("fieldName") final String fieldName
)
{
super(macroTable, name, fieldName, expression);
}
Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");

public DoubleAnyAggregatorFactory(String name, String fieldName)
{
this(name, fieldName, null, ExprMacroTable.nil());
this.name = name;
this.fieldName = fieldName;
this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat();
}

@Override
protected double nullValue()
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
return Double.NaN;
final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName);
if (valueSelector instanceof NilColumnValueSelector) {
return NIL_AGGREGATOR;
} else {
return new DoubleAnyAggregator(
valueSelector
);
}
}

@Override
protected Aggregator buildAggregator(BaseDoubleColumnValueSelector selector)
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
return new DoubleAnyAggregator(selector);
final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName);
if (valueSelector instanceof NilColumnValueSelector) {
return NIL_BUFFER_AGGREGATOR;
} else {
return new DoubleAnyBufferAggregator(
valueSelector
);
}
}

@Override
protected BufferAggregator buildBufferAggregator(BaseDoubleColumnValueSelector selector)
public Comparator getComparator()
{
return new DoubleAnyBufferAggregator(selector);
return VALUE_COMPARATOR;
}

@Override
@Nullable
public Object combine(@Nullable Object lhs, @Nullable Object rhs)
{
if (lhs != null) {
return lhs;
} else {
return rhs;
}
return lhs;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think from like a .. user satisfaction perspective, it might still be nice to prefer non-null values since it is still legitimate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be consistent with the new policy of not discriminating null values #equality
Decided not to make assumption in preferring non-null value.

}

@Override
public AggregateCombiner makeAggregateCombiner()
{
throw new UOE("DoubleAnyAggregatorFactory is not supported during ingestion for rollup");
}

@Override
public AggregatorFactory getCombiningFactory()
{
return new DoubleAnyAggregatorFactory(name, name, null, macroTable);
return new DoubleAnyAggregatorFactory(name, name);
}

@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(new DoubleAnyAggregatorFactory(fieldName, fieldName, expression, macroTable));
return Collections.singletonList(new DoubleAnyAggregatorFactory(fieldName, fieldName));
}

@Override
public Object deserialize(Object object)
{
// handle "NaN" / "Infinity" values serialized as strings in JSON
if (object instanceof String) {
return Double.parseDouble((String) object);
}
return object;
}

@Override
@Nullable
public Object finalizeComputation(@Nullable Object object)
{
return object;
}

@Override
@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public String getFieldName()
{
return fieldName;
}

@Override
public List<String> requiredFields()
{
return Collections.singletonList(fieldName);
}

@Override
public byte[] getCacheKey()
{
return new CacheKeyBuilder(AggregatorUtil.DOUBLE_ANY_CACHE_TYPE_ID)
.appendString(fieldName)
.appendString(expression)
.build();
}

@Override
public String getTypeName()
{
return storeDoubleAsFloat ? "float" : "double";
}

@Override
public int getMaxIntermediateSize()
{
return Double.BYTES + Byte.BYTES;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

DoubleAnyAggregatorFactory that = (DoubleAnyAggregatorFactory) o;

return fieldName.equals(that.fieldName) && name.equals(that.name);
}

@Override
public int hashCode()
{
return Objects.hash(fieldName, name);
}

@Override
public String toString()
{
return "DoubleAnyAggregatorFactory{" +
"fieldName='" + fieldName + '\'' +
", expression='" + expression + '\'' +
", name='" + name + '\'' +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}
Loading