Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 17 additions & 34 deletions processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.druid.query.ordering.StringComparators;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.DimensionHandlerUtils;
import io.druid.segment.VirtualColumn;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.Column;
Expand Down Expand Up @@ -377,7 +378,7 @@ private Ordering<Row> getRowOrderingForPushDown(
final List<String> orderedFieldNames = new ArrayList<>();
final Set<Integer> dimsInOrderBy = new HashSet<>();
final List<Boolean> needsReverseList = new ArrayList<>();
final List<Boolean> isNumericField = new ArrayList<>();
final List<ValueType> dimensionTypes = new ArrayList<>();
final List<StringComparator> comparators = new ArrayList<>();

for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) {
Expand All @@ -389,7 +390,7 @@ private Ordering<Row> getRowOrderingForPushDown(
dimsInOrderBy.add(dimIndex);
needsReverseList.add(needsReverse);
final ValueType type = dimensions.get(dimIndex).getOutputType();
isNumericField.add(ValueType.isNumeric(type));
dimensionTypes.add(type);
comparators.add(orderSpec.getDimensionComparator());
}
}
Expand All @@ -399,7 +400,7 @@ private Ordering<Row> getRowOrderingForPushDown(
orderedFieldNames.add(dimensions.get(i).getOutputName());
needsReverseList.add(false);
final ValueType type = dimensions.get(i).getOutputType();
isNumericField.add(ValueType.isNumeric(type));
dimensionTypes.add(type);
comparators.add(StringComparators.LEXICOGRAPHIC);
}
}
Expand All @@ -416,7 +417,7 @@ public int compare(Row lhs, Row rhs)
return compareDimsForLimitPushDown(
orderedFieldNames,
needsReverseList,
isNumericField,
dimensionTypes,
comparators,
lhs,
rhs
Expand All @@ -434,7 +435,7 @@ public int compare(Row lhs, Row rhs)
final int cmp = compareDimsForLimitPushDown(
orderedFieldNames,
needsReverseList,
isNumericField,
dimensionTypes,
comparators,
lhs,
rhs
Expand Down Expand Up @@ -463,7 +464,7 @@ public int compare(Row lhs, Row rhs)
return compareDimsForLimitPushDown(
orderedFieldNames,
needsReverseList,
isNumericField,
dimensionTypes,
comparators,
lhs,
rhs
Expand Down Expand Up @@ -530,28 +531,12 @@ private Comparator<Row> getTimeComparator(boolean granular)
private static int compareDims(List<DimensionSpec> dimensions, Row lhs, Row rhs)
{
for (DimensionSpec dimension : dimensions) {
final int dimCompare;
if (dimension.getOutputType() == ValueType.LONG) {
dimCompare = Long.compare(
((Number) lhs.getRaw(dimension.getOutputName())).longValue(),
((Number) rhs.getRaw(dimension.getOutputName())).longValue()
);
} else if (dimension.getOutputType() == ValueType.FLOAT) {
dimCompare = Float.compare(
((Number) lhs.getRaw(dimension.getOutputName())).floatValue(),
((Number) rhs.getRaw(dimension.getOutputName())).floatValue()
);
} else if (dimension.getOutputType() == ValueType.DOUBLE) {
dimCompare = Double.compare(
((Number) lhs.getRaw(dimension.getOutputName())).doubleValue(),
((Number) rhs.getRaw(dimension.getOutputName())).doubleValue()
);
} else {
dimCompare = ((Ordering) Comparators.naturalNullsFirst()).compare(
lhs.getRaw(dimension.getOutputName()),
rhs.getRaw(dimension.getOutputName())
);
}
//noinspection unchecked
final int dimCompare = DimensionHandlerUtils.compareObjectsAsType(
lhs.getRaw(dimension.getOutputName()),
rhs.getRaw(dimension.getOutputName()),
dimension.getOutputType()
);
if (dimCompare != 0) {
return dimCompare;
}
Expand All @@ -563,7 +548,7 @@ private static int compareDims(List<DimensionSpec> dimensions, Row lhs, Row rhs)
private static int compareDimsForLimitPushDown(
final List<String> fields,
final List<Boolean> needsReverseList,
final List<Boolean> isNumericField,
final List<ValueType> dimensionTypes,
final List<StringComparator> comparators,
Row lhs,
Row rhs
Expand All @@ -572,17 +557,15 @@ private static int compareDimsForLimitPushDown(
for (int i = 0; i < fields.size(); i++) {
final String fieldName = fields.get(i);
final StringComparator comparator = comparators.get(i);
final ValueType dimensionType = dimensionTypes.get(i);

final int dimCompare;
final Object lhsObj = lhs.getRaw(fieldName);
final Object rhsObj = rhs.getRaw(fieldName);

if (isNumericField.get(i)) {
if (ValueType.isNumeric(dimensionType)) {
if (comparator.equals(StringComparators.NUMERIC)) {
dimCompare = ((Ordering) Comparators.naturalNullsFirst()).compare(
lhsObj,
rhsObj
);
dimCompare = DimensionHandlerUtils.compareObjectsAsType(lhsObj, rhsObj, dimensionType);
} else {
dimCompare = comparator.compare(String.valueOf(lhsObj), String.valueOf(rhsObj));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.druid.query.groupby.resource.GroupByQueryResource;
import io.druid.query.groupby.strategy.GroupByStrategy;
import io.druid.query.groupby.strategy.GroupByStrategySelector;
import io.druid.segment.DimensionHandlerUtils;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -451,8 +452,13 @@ public Row apply(Object input)
Map<String, Object> event = Maps.newLinkedHashMap();
Iterator<DimensionSpec> dimsIter = dims.iterator();
while (dimsIter.hasNext() && results.hasNext()) {
final DimensionSpec factory = dimsIter.next();
event.put(factory.getOutputName(), results.next());
final DimensionSpec dimensionSpec = dimsIter.next();

// Must convert generic Jackson-deserialized type into the proper type.
event.put(
dimensionSpec.getOutputName(),
DimensionHandlerUtils.convertObjectToType(results.next(), dimensionSpec.getOutputType())
);
}

Iterator<AggregatorFactory> aggsIter = aggs.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,28 +682,7 @@ private static void convertRowTypesToOutputTypes(List<DimensionSpec> dimensionSp
final ValueType outputType = dimSpec.getOutputType();
rowMap.compute(
dimSpec.getOutputName(),
(dimName, baseVal) -> {
switch (outputType) {
case STRING:
baseVal = baseVal == null ? "" : baseVal.toString();
break;
case LONG:
baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal);
baseVal = baseVal == null ? 0L : baseVal;
break;
case FLOAT:
baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal);
baseVal = baseVal == null ? 0.f : baseVal;
break;
case DOUBLE:
baseVal = DimensionHandlerUtils.convertObjectToDouble(baseVal);
baseVal = baseVal == null ? 0.d : baseVal;
break;
default:
throw new IAE("Unsupported type: " + outputType);
}
return baseVal;
}
(dimName, baseVal) -> DimensionHandlerUtils.convertObjectToTypeNonNull(baseVal, outputType)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,38 +602,10 @@ private static Function<Comparable, Comparable>[] makeValueConvertFunctions(
{
final Function<Comparable, Comparable>[] functions = new Function[valueTypes.size()];
for (int i = 0; i < functions.length; i++) {
ValueType type = valueTypes.get(i);
// Subquery post-aggs aren't added to the rowSignature (see rowSignatureFor() in GroupByQueryHelper) because
// their types aren't known, so default to String handling.
type = type == null ? ValueType.STRING : type;
switch (type) {
case STRING:
functions[i] = input -> input == null ? "" : input.toString();
break;

case LONG:
functions[i] = input -> {
final Long val = DimensionHandlerUtils.convertObjectToLong(input);
return val == null ? 0L : val;
};
break;

case FLOAT:
functions[i] = input -> {
final Float val = DimensionHandlerUtils.convertObjectToFloat(input);
return val == null ? 0.f : val;
};
break;

case DOUBLE:
functions[i] = input -> {
Double val = DimensionHandlerUtils.convertObjectToDouble(input);
return val == null ? 0.0 : val;
};
break;
default:
throw new IAE("invalid type: [%s]", type);
}
final ValueType type = valueTypes.get(i) == null ? ValueType.STRING : valueTypes.get(i);
functions[i] = input -> DimensionHandlerUtils.convertObjectToTypeNonNull(input, type);
}
return functions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package io.druid.query.topn;

import com.google.common.base.Function;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.topn.types.TopNColumnSelectorStrategy;
Expand Down Expand Up @@ -110,14 +109,8 @@ protected void updateResults(
)
{
final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
final boolean needsResultTypeConversion = needsResultTypeConversion(params);
final Function<Object, Object> valueTransformer = TopNMapFn.getValueTransformer(
query.getDimensionSpec().getOutputType()
);

selectorPlus.getColumnSelectorStrategy().updateDimExtractionResults(
aggregatesStore,
needsResultTypeConversion ? valueTransformer : null,
resultBuilder
);
}
Expand All @@ -136,11 +129,4 @@ protected void closeAggregators(Map<Comparable, Aggregator[]> valueMap)
public void cleanup(TopNParams params)
{
}

private boolean needsResultTypeConversion(TopNParams params)
{
ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
TopNColumnSelectorStrategy strategy = selectorPlus.getColumnSelectorStrategy();
return query.getDimensionSpec().getOutputType() != strategy.getValueType();
}
}
20 changes: 10 additions & 10 deletions processing/src/main/java/io/druid/query/topn/DimValHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@
public class DimValHolder
{
private final Object topNMetricVal;
private final Comparable dimName;
private final Comparable dimValue;
private final Object dimValIndex;
private final Map<String, Object> metricValues;

public DimValHolder(
Object topNMetricVal,
Comparable dimName,
Comparable dimValue,
Object dimValIndex,
Map<String, Object> metricValues
)
{
this.topNMetricVal = topNMetricVal;
this.dimName = dimName;
this.dimValue = dimValue;
this.dimValIndex = dimValIndex;
this.metricValues = metricValues;
}
Expand All @@ -48,9 +48,9 @@ public Object getTopNMetricVal()
return topNMetricVal;
}

public Comparable getDimName()
public Comparable getDimValue()
{
return dimName;
return dimValue;
}

public Object getDimValIndex()
Expand All @@ -66,14 +66,14 @@ public Map<String, Object> getMetricValues()
public static class Builder
{
private Object topNMetricVal;
private Comparable dimName;
private Comparable dimValue;
private Object dimValIndex;
private Map<String, Object> metricValues;

public Builder()
{
topNMetricVal = null;
dimName = null;
dimValue = null;
dimValIndex = null;
metricValues = null;
}
Expand All @@ -84,9 +84,9 @@ public Builder withTopNMetricVal(Object topNMetricVal)
return this;
}

public Builder withDimName(Comparable dimName)
public Builder withDimValue(Comparable dimValue)
{
this.dimName = dimName;
this.dimValue = dimValue;
return this;
}

Expand All @@ -104,7 +104,7 @@ public Builder withMetricValues(Map<String, Object> metricValues)

public DimValHolder build()
{
return new DimValHolder(topNMetricVal, dimName, dimValIndex, metricValues);
return new DimValHolder(topNMetricVal, dimValue, dimValIndex, metricValues);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.druid.query.topn;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import io.druid.collections.NonBlockingPool;
import io.druid.collections.ResourceHolder;
Expand All @@ -37,7 +36,6 @@
import io.druid.segment.DimensionSelector;
import io.druid.segment.FilteredOffset;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.Offset;
import io.druid.segment.historical.HistoricalColumnSelector;
Expand Down Expand Up @@ -736,10 +734,6 @@ protected void updateResults(
final int[] aggregatorSizes = params.getAggregatorSizes();
final DimensionSelector dimSelector = params.getDimSelector();

final ValueType outType = query.getDimensionSpec().getOutputType();
final boolean needsResultConversion = outType != ValueType.STRING;
final Function<Object, Object> valueTransformer = TopNMapFn.getValueTransformer(outType);

for (int i = 0; i < positions.length; i++) {
int position = positions[i];
if (position >= 0) {
Expand All @@ -749,14 +743,9 @@ protected void updateResults(
position += aggregatorSizes[j];
}

Object retVal = dimSelector.lookupName(i);
if (needsResultConversion) {
retVal = valueTransformer.apply(retVal);
}


// Output type must be STRING in order for PooledTopNAlgorithm to make sense; so no need to convert value.
resultBuilder.addEntry(
(Comparable) retVal,
dimSelector.lookupName(i),
i,
vals
);
Expand Down Expand Up @@ -854,18 +843,6 @@ public static class Builder
private int numValuesPerPass;
private TopNMetricSpecBuilder<int[]> arrayProvider;

public Builder()
{
selectorPlus = null;
cursor = null;
resultsBufHolder = null;
resultsBuf = null;
aggregatorSizes = null;
numBytesPerRecord = 0;
numValuesPerPass = 0;
arrayProvider = null;
}

public Builder withSelectorPlus(ColumnSelectorPlus selectorPlus)
{
this.selectorPlus = selectorPlus;
Expand Down
Loading