Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private GroupByQuery(
verifyOutputNames(this.dimensions, this.aggregatorSpecs, this.postAggregatorSpecs);

this.universalTimestamp = computeUniversalTimestamp();
this.resultRowSignature = computeResultRowSignature();
this.resultRowSignature = computeResultRowSignature(RowSignature.Finalization.UNKNOWN);
this.havingSpec = havingSpec;
this.limitSpec = LimitSpec.nullToNoopLimitSpec(limitSpec);
this.subtotalsSpec = verifySubtotalsSpec(subtotalsSpec, this.dimensions);
Expand Down Expand Up @@ -320,8 +320,7 @@ public List<List<String>> getSubtotalsSpec()
}

/**
* Returns a list of field names, of the same size as {@link #getResultRowSizeWithPostAggregators()}, in the
* order that they will appear in ResultRows for this query.
* Equivalent to {@code getResultRowSignature(Finalization.UNKNOWN)}.
*
* @see ResultRow for documentation about the order that fields will be in
*/
Expand All @@ -330,6 +329,25 @@ public RowSignature getResultRowSignature()
return resultRowSignature;
}

/**
* Returns a result row signature, of the same size as {@link #getResultRowSizeWithPostAggregators()}, in the
* order that they will appear in ResultRows for this query.
*
* Aggregator types in the signature depend on the value of {@code finalization}.
*
* If finalization is {@link RowSignature.Finalization#UNKNOWN}, this method returns a cached object.
*
* @see ResultRow for documentation about the order that fields will be in
*/
public RowSignature getResultRowSignature(final RowSignature.Finalization finalization)
{
if (finalization == RowSignature.Finalization.UNKNOWN) {
return resultRowSignature;
} else {
return computeResultRowSignature(finalization);
}
}

/**
* Returns the size of ResultRows for this query when they do not include post-aggregators.
*/
Expand Down Expand Up @@ -481,7 +499,7 @@ private boolean validateAndGetForceLimitPushDown()
return forcePushDown;
}

private RowSignature computeResultRowSignature()
private RowSignature computeResultRowSignature(final RowSignature.Finalization finalization)
{
final RowSignature.Builder builder = RowSignature.builder();

Expand All @@ -490,7 +508,7 @@ private RowSignature computeResultRowSignature()
}

return builder.addDimensions(dimensions)
.addAggregators(aggregatorSpecs)
.addAggregators(aggregatorSpecs, finalization)
.addPostAggregators(postAggregatorSpecs)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.BooleanValueMatcher;
Expand Down Expand Up @@ -203,7 +204,8 @@ public static Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, ResultRow>

ColumnSelectorFactory columnSelectorFactory = createResultRowBasedColumnSelectorFactory(
combining ? query : subquery,
columnSelectorRow::get
columnSelectorRow::get,
RowSignature.Finalization.UNKNOWN
);

// Apply virtual columns if we are in subquery (non-combining) mode.
Expand Down Expand Up @@ -341,14 +343,18 @@ public static Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, ResultRow>
/**
* Creates a {@link ColumnSelectorFactory} that can read rows which originate as results of the provided "query".
*
* @param query a groupBy query
* @param supplier supplier of result rows from the query
* @param query a groupBy query
* @param supplier supplier of result rows from the query
* @param finalization whether the column capabilities reported by this factory should reflect finalized types
*/
public static ColumnSelectorFactory createResultRowBasedColumnSelectorFactory(
final GroupByQuery query,
final Supplier<ResultRow> supplier
final Supplier<ResultRow> supplier,
final RowSignature.Finalization finalization
)
{
final RowSignature signature = query.getResultRowSignature(finalization);

final RowAdapter<ResultRow> adapter =
new RowAdapter<ResultRow>()
{
Expand All @@ -366,7 +372,7 @@ public ToLongFunction<ResultRow> timestampFunction()
@Override
public Function<ResultRow, Object> columnFunction(final String columnName)
{
final int columnIndex = query.getResultRowSignature().indexOf(columnName);
final int columnIndex = signature.indexOf(columnName);
if (columnIndex < 0) {
return row -> null;
} else {
Expand All @@ -378,7 +384,7 @@ public Function<ResultRow, Object> columnFunction(final String columnName)
return RowBasedColumnSelectorFactory.create(
adapter,
supplier::get,
() -> query.getResultRowSignature(),
() -> signature,
false
);
}
Expand All @@ -400,7 +406,14 @@ private static Predicate<ResultRow> getResultRowPredicate(final GroupByQuery que

final SettableSupplier<ResultRow> rowSupplier = new SettableSupplier<>();
final ColumnSelectorFactory columnSelectorFactory =
query.getVirtualColumns().wrap(RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(subquery, rowSupplier));
query.getVirtualColumns()
.wrap(
RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
subquery,
rowSupplier,
RowSignature.Finalization.UNKNOWN
)
);

final ValueMatcher filterMatcher = filter == null
? BooleanValueMatcher.of(true)
Expand Down Expand Up @@ -965,7 +978,12 @@ private Comparator<Grouper.Entry<RowBasedKey>> objectComparatorWithAggs()
}
}

private static int compareDimsInRows(RowBasedKey key1, RowBasedKey key2, final List<ColumnType> fieldTypes, int dimStart)
private static int compareDimsInRows(
RowBasedKey key1,
RowBasedKey key2,
final List<ColumnType> fieldTypes,
int dimStart
)
{
for (int i = dimStart; i < key1.getKey().length; i++) {
final int cmp;
Expand Down Expand Up @@ -1337,7 +1355,12 @@ private RowBasedKeySerdeHelper makeSerdeHelper(
case LONG:
case FLOAT:
case DOUBLE:
return makeNullHandlingNumericserdeHelper(valueType.getType(), keyBufferPosition, pushLimitDown, stringComparator);
return makeNullHandlingNumericserdeHelper(
valueType.getType(),
keyBufferPosition,
pushLimitDown,
stringComparator
);
default:
throw new IAE("invalid type: %s", valueType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper;
import org.apache.druid.segment.column.RowSignature;

import java.util.Objects;
import java.util.function.Function;
Expand Down Expand Up @@ -83,7 +84,8 @@ public void setQuery(GroupByQuery query)
this.matcher = dimFilter.toFilter().makeMatcher(
RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
query,
rowSupplier
rowSupplier,
finalize ? RowSignature.Finalization.YES : RowSignature.Finalization.NO
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,16 @@ private Result<TimeseriesResultValue> getNullTimeseriesResultValue(TimeseriesQue
List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()];
String[] aggregatorNames = new String[aggregatorSpecs.size()];
RowSignature aggregatorsSignature =
RowSignature.builder().addAggregators(aggregatorSpecs, RowSignature.Finalization.UNKNOWN).build();
for (int i = 0; i < aggregatorSpecs.size(); i++) {
aggregators[i] =
aggregatorSpecs.get(i)
.factorize(
RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
() -> new MapBasedRow(null, null),
() -> RowSignature.builder().addAggregators(aggregatorSpecs).build(),
() -> aggregatorsSignature,
false
)
);
Expand Down Expand Up @@ -417,7 +419,7 @@ public RowSignature resultArraySignature(TimeseriesQuery query)
if (StringUtils.isNotEmpty(query.getTimestampResultField())) {
rowSignatureBuilder.add(query.getTimestampResultField(), ColumnType.LONG);
}
rowSignatureBuilder.addAggregators(query.getAggregatorSpecs());
rowSignatureBuilder.addAggregators(query.getAggregatorSpecs(), RowSignature.Finalization.UNKNOWN);
rowSignatureBuilder.addPostAggregators(query.getPostAggregatorSpecs());
return rowSignatureBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ public RowSignature resultArraySignature(TopNQuery query)
return RowSignature.builder()
.addTimeColumn()
.addDimensions(Collections.singletonList(query.getDimensionSpec()))
.addAggregators(query.getAggregatorSpecs())
.addAggregators(query.getAggregatorSpecs(), RowSignature.Finalization.UNKNOWN)
.addPostAggregators(query.getPostAggregatorSpecs())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public String toString()
}
final String columnName = columnNames.get(i);
s.append(columnName).append(":").append(columnTypes.get(columnName));

}
return s.append("}").toString();
}
Expand Down Expand Up @@ -249,24 +250,57 @@ public Builder addDimensions(final List<DimensionSpec> dimensions)
return this;
}

public Builder addAggregators(final List<AggregatorFactory> aggregators)
/**
* Adds aggregations to a signature.
*
* {@link Finalization#YES} will add finalized types and {@link Finalization#NO} will add intermediate types.
* {@link Finalization#UNKNOWN} will add the intermediate / finalized type when they are the same. Otherwise, it
* will add a null type.
*
* @param aggregators list of aggregation functions
* @param finalization whether the aggregator results will be finalized
*/
public Builder addAggregators(final List<AggregatorFactory> aggregators, final Finalization finalization)
{
for (final AggregatorFactory aggregator : aggregators) {
final ColumnType type = aggregator.getType();

if (type.equals(aggregator.getFinalizedType())) {
add(aggregator.getName(), type);
} else {
// Use null if the type depends on whether or not the aggregator is finalized, since
// we don't know if it will be finalized or not. So null (i.e. unknown) is the proper
// thing to do (currently).
add(aggregator.getName(), null);
final ColumnType type;

switch (finalization) {
case YES:
type = aggregator.getFinalizedType();
break;

case NO:
type = aggregator.getType();
break;

default:
assert finalization == Finalization.UNKNOWN;

if (aggregator.getType() == aggregator.getFinalizedType()) {
type = aggregator.getType();
} else {
// Use null if the type depends on whether the aggregator is finalized, since we don't know if
// it will be finalized or not.
type = null;
}
break;
}

add(aggregator.getName(), type);
}

return this;
}

/**
* Adds post-aggregators to a signature.
*
* Note: to ensure types are computed properly, post-aggregators must be added *after* any columns that they
* depend on, and they must be added in the order that the query engine will compute them. This method assumes
* that post-aggregators are computed in order, and that they can refer to earlier post-aggregators but not
* to later ones.
*/
public Builder addPostAggregators(final List<PostAggregator> postAggregators)
{
for (final PostAggregator postAggregator : postAggregators) {
Expand All @@ -289,4 +323,22 @@ public RowSignature build()
return new RowSignature(columnTypeList);
}
}

public enum Finalization
{
/**
* Aggregation results will be finalized.
*/
YES,

/**
* Aggregation results will not be finalized.
*/
NO,

/**
* Aggregation results may or may not be finalized.
*/
UNKNOWN
}
}