Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -144,6 +145,7 @@ private Function<Sequence<Row>, Sequence<Row>> makePostProcessingFn()
postProcessingFn,
(Sequence<Row> input) -> {
havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this));
havingSpec.setAggregators(getAggregatorsMap(aggregatorSpecs));
return Sequences.filter(input, havingSpec::eval);
}
);
Expand Down Expand Up @@ -696,6 +698,13 @@ private static void verifyOutputNames(
}
}

private static Map<String, AggregatorFactory> getAggregatorsMap(List<AggregatorFactory> aggregatorSpecs)
{
Map<String, AggregatorFactory> map = new HashMap<>(aggregatorSpecs.size());
aggregatorSpecs.stream().forEach(v -> map.put(v.getName(), v));
return map;
}

public static class Builder
{
private DataSource dataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.data.input.Row;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.ValueType;

import java.util.List;
Expand Down Expand Up @@ -55,6 +56,14 @@ public void setRowSignature(Map<String, ValueType> rowSignature)
}
}

@Override
public void setAggregators(Map<String, AggregatorFactory> aggregators)
{
for (HavingSpec havingSpec : havingSpecs) {
havingSpec.setAggregators(aggregators);
}
}

@Override
public boolean eval(Row row)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.query.groupby.having;

import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.ValueType;

import java.util.Map;
Expand All @@ -30,4 +31,10 @@ public void setRowSignature(Map<String, ValueType> rowSignature)
{
// Do nothing.
}

@Override
public void setAggregators(Map<String, AggregatorFactory> aggregators)
{

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.Row;
import io.druid.query.aggregation.AggregatorFactory;

import java.util.Map;

/**
* The "=" operator in a "having" clause. This is similar to SQL's "having aggregation = value",
Expand All @@ -32,6 +35,8 @@ public class EqualToHavingSpec extends BaseHavingSpec
private final String aggregationName;
private final Number value;

private volatile Map<String, AggregatorFactory> aggregators;

@JsonCreator
public EqualToHavingSpec(
@JsonProperty("aggregation") String aggName,
Expand All @@ -54,10 +59,16 @@ public String getAggregationName()
return aggregationName;
}

@Override
public void setAggregators(Map<String, AggregatorFactory> aggregators)
{
this.aggregators = aggregators;
}

@Override
public boolean eval(Row row)
{
return HavingSpecMetricComparator.compare(row, aggregationName, value) == 0;
return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) == 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.Row;
import io.druid.query.aggregation.AggregatorFactory;

import java.util.Map;

/**
* The "&gt;" operator in a "having" clause. This is similar to SQL's "having aggregation &gt; value",
Expand All @@ -32,6 +35,8 @@ public class GreaterThanHavingSpec extends BaseHavingSpec
private final String aggregationName;
private final Number value;

private volatile Map<String, AggregatorFactory> aggregators;

@JsonCreator
public GreaterThanHavingSpec(
@JsonProperty("aggregation") String aggName,
Expand All @@ -54,10 +59,16 @@ public Number getValue()
return value;
}

@Override
public void setAggregators(Map<String, AggregatorFactory> aggregators)
{
this.aggregators = aggregators;
}

@Override
public boolean eval(Row row)
{
return HavingSpecMetricComparator.compare(row, aggregationName, value) > 0;
return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) > 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.data.input.Row;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.ValueType;

import java.util.Map;
Expand Down Expand Up @@ -58,6 +59,8 @@ public interface HavingSpec
*/
void setRowSignature(Map<String, ValueType> rowSignature);

void setAggregators(Map<String, AggregatorFactory> aggregators);

/**
* Evaluates if a given row satisfies the having spec.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@

package io.druid.query.groupby.having;

import com.google.common.primitives.Doubles;
import com.google.common.primitives.Floats;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.druid.data.input.Row;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.java.util.common.ISE;

import java.util.Map;
import java.util.regex.Pattern;

/**
Expand All @@ -30,16 +36,28 @@ class HavingSpecMetricComparator
{
static final Pattern LONG_PAT = Pattern.compile("[-|+]?\\d+");

static int compare(Row row, String aggregationName, Number value)
static int compare(Row row, String aggregationName, Number value, Map<String, AggregatorFactory> aggregators)
{

Object metricValueObj = row.getRaw(aggregationName);

if (metricValueObj != null) {
if (aggregators != null && aggregators.containsKey(aggregationName)) {
metricValueObj = aggregators.get(aggregationName).finalizeComputation(metricValueObj);
}

if (metricValueObj instanceof Long) {
return Long.compare((Long) metricValueObj, value.longValue());
long l = ((Long) metricValueObj).longValue();
return Longs.compare(l, value.longValue());
} else if (metricValueObj instanceof Float) {
return Float.compare((Float) metricValueObj, value.floatValue());
float l = ((Float) metricValueObj).floatValue();
return Floats.compare(l, value.floatValue());
} else if (metricValueObj instanceof Double) {
return Double.compare((Double) metricValueObj, value.doubleValue());
double l = ((Double) metricValueObj).longValue();
return Doubles.compare(l, value.doubleValue());
} else if (metricValueObj instanceof Integer) {
int l = ((Integer) metricValueObj).intValue();
return Ints.compare(l, value.intValue());
} else if (metricValueObj instanceof String) {
String metricValueStr = (String) metricValueObj;
if (LONG_PAT.matcher(metricValueStr).matches()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.Row;
import io.druid.query.aggregation.AggregatorFactory;

import java.util.Map;

/**
* The "&lt;" operator in a "having" clause. This is similar to SQL's "having aggregation &lt; value",
Expand All @@ -31,6 +34,8 @@ public class LessThanHavingSpec extends BaseHavingSpec
private final String aggregationName;
private final Number value;

private volatile Map<String, AggregatorFactory> aggregators;

public LessThanHavingSpec(
@JsonProperty("aggregation") String aggName,
@JsonProperty("value") Number value
Expand All @@ -52,10 +57,16 @@ public Number getValue()
return value;
}

@Override
public void setAggregators(Map<String, AggregatorFactory> aggregators)
{
this.aggregators = aggregators;
}

@Override
public boolean eval(Row row)
{
return HavingSpecMetricComparator.compare(row, aggregationName, value) < 0;
return HavingSpecMetricComparator.compare(row, aggregationName, value, aggregators) < 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.Row;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.ValueType;

import java.util.Map;
Expand Down Expand Up @@ -51,6 +52,12 @@ public void setRowSignature(Map<String, ValueType> rowSignature)
havingSpec.setRowSignature(rowSignature);
}

@Override
public void setAggregators(Map<String, AggregatorFactory> aggregators)
{
havingSpec.setAggregators(aggregators);
}

@Override
public boolean eval(Row row)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.druid.data.input.Row;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.ValueType;

import java.util.List;
Expand Down Expand Up @@ -55,6 +56,14 @@ public void setRowSignature(Map<String, ValueType> rowSignature)
}
}

@Override
public void setAggregators(Map<String, AggregatorFactory> aggregators)
{
for (HavingSpec havingSpec : havingSpecs) {
havingSpec.setAggregators(aggregators);
}
}

@Override
public boolean eval(Row row)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3318,10 +3318,6 @@ public void testGroupByWithHavingOnHyperUnique()
)
);

// havingSpec equalTo/greaterThan/lessThan do not work on complex aggregators, even if they could be finalized.
// See also: https://github.com/druid-io/druid/issues/2507
expectedException.expect(ISE.class);
expectedException.expectMessage("Unknown type of metric value");
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
}
Expand Down