Sql Single Value Aggregator for scalar queries#15700
Sql Single Value Aggregator for scalar queries#15700abhishekagarwal87 merged 18 commits intoapache:masterfrom
Conversation
| import org.apache.druid.sql.calcite.aggregation.builtin.StringSqlAggregator; | ||
| import org.apache.druid.sql.calcite.aggregation.builtin.SumSqlAggregator; | ||
| import org.apache.druid.sql.calcite.aggregation.builtin.SumZeroSqlAggregator; | ||
| import org.apache.druid.sql.calcite.aggregation.builtin.*; |
There was a problem hiding this comment.
I'm a bit against including it as * and include only those libraries that we need, in future if we come up with another Agg that we do not use this wont be correct
|
Thanks for the first PR to Druid! 🚀 A few high level comments with regards to this
SELECT
count(*)
FROM "wiki"
where
col1 >= STRLEN(SELECT dim1 FROM single_row_relation)and with complex types, it can be invoked with functions performing finalization on the complex types, or something like |
somu-imply
left a comment
There was a problem hiding this comment.
Thanks for the contribution Sree. Might be simple enough to add the vector aggs as this is on a single value. There's also a failure in static checks that needs to be addressed
| { | ||
| final BaseLongColumnValueSelector valueSelector; | ||
|
|
||
| Long value; |
There was a problem hiding this comment.
We can make these final, similar for the other classes
| skipVectorize(); | ||
| cannotVectorize(); | ||
| testQuery( | ||
| "SELECT count(*) FROM foo where m1 >= (select max(m1) - 4 from foo)", |
There was a problem hiding this comment.
Let's add more examples when these are string functions and also other aggs like count etc.
| ) | ||
| { | ||
| if (aggregateCall.getArgList().size() > 1) { | ||
| throw DruidException.defensive( |
There was a problem hiding this comment.
If this is visible to user, defensive might not be the best idea
| public SingleValueDoubleAggregator(BaseDoubleColumnValueSelector valueSelector) | ||
| { | ||
| this.valueSelector = valueSelector; | ||
| this.value = valueSelector.getDouble(); |
There was a problem hiding this comment.
i think numeric selectors probably need to check isNull() and set the value to null if true
| @Override | ||
| public Aggregator factorize(ColumnSelectorFactory metricFactory) | ||
| { | ||
| final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(getFieldName()); |
There was a problem hiding this comment.
instead of a separate aggregator factory for each type, why not call metricFactory.getColumnCapabilities(getFieldName()) and pick the right type of aggregator.
even further, do we even need aggs/buffer aggs for each type? Given that the typed aggs all appear to be storing values as Objects instead of java numeric primitives, why not just have a single Aggregator and BufferAggregator implementation. For the BufferAggregator, you could pass the ColumnType converted from getColumnCapabilities and use .getNullableStrategy() to write and read the value from the buffer.
This seems like a lot of classes for something that is basically a value holder...
There was a problem hiding this comment.
Thanks for pointing out the NullableTypeStrategy. I always forget that such a contract exists (even in #15559 😞 ), and misguided the author into thinking that we don't have such a contract, and it needs to be inferred on per type basis.
| import org.apache.druid.sql.calcite.util.CalciteTests; | ||
| import org.junit.Test; | ||
|
|
||
| public class CalciteSingleValueAggregatorTest extends CalciteQueryTest |
There was a problem hiding this comment.
this should not extend CalciteQueryTest since that will run all of those tests too... it doesn't seem like it needs to be its own test file in the first place, but if it does for some reason, it should extend BaseCalciteQueryTest
I don't think this could possibly be vectorized, can it? since this can only call aggregate once, while aggregate method for vector aggs take multiple values, unless i'm misunderstanding something about how this is used |
@clintropolis I don't have much insight into how the vector engines perform, however, I reasoned that it being a non-vector version only would prevent the query from being vectorized. While it won't gain any benefit from the vectorization, having a vectorized implementation would allow the rest of the query from being vectorized. Wdyt? |
Ah yeah, i guess it doesn't hurt to implement, I was just making a drive by comment and not entirely sure of the context of this agg. But it seems like any query with this agg can only process a single row, so I wasn't sure it would be much benefit either since the main benefit of vectorization is processing batches of rows. |
| @Override | ||
| public void aggregate() | ||
| { | ||
| if (isAggregateInvoked) { |
There was a problem hiding this comment.
if selector.isNull() we can just return.
There was a problem hiding this comment.
a null row is still a row in the context of this agg i think so it needs to set isAggregateInvoked
| switch (aggregationType.getType()) { | ||
| case LONG: | ||
| case FLOAT: | ||
| case DOUBLE: | ||
| case STRING: | ||
| return new SingleValueAggregatoractory(name, fieldName, aggregationType); | ||
| default: | ||
| // This error refers to the Druid type. But, we're in SQL validation. | ||
| // It should refer to the SQL type. | ||
| throw SimpleSqlAggregator.badTypeException(fieldName, "SINGLE_VALUE", aggregationType); | ||
| } |
There was a problem hiding this comment.
do we actually need this check at all? Thinking of things like ARRAY_AGG which can aggregate an array of values...
| import java.util.Arrays; | ||
| import java.util.HashSet; | ||
|
|
||
| public class CalciteSingleValueAggregatorTest extends BaseCalciteQueryTest |
There was a problem hiding this comment.
any reason not to just put these test cases in CalciteSubqueryTest?
There was a problem hiding this comment.
Not particularly, will move them
| if (columnType.is(ValueType.STRING)) { | ||
| isNotNull = (selector.getObject() != null); | ||
| } else { | ||
| isNotNull = !selector.isNull(); | ||
| } | ||
| if (isNotNull) { | ||
| if (buf.get(position) == NullHandling.IS_NULL_BYTE) { | ||
| buf.put(position, NullHandling.IS_NOT_NULL_BYTE); | ||
| } | ||
| updatevalue(buf, position + Byte.BYTES); | ||
| } |
There was a problem hiding this comment.
Part of the potentially attractive part about pushing the ColumnType in here is that we can also use the NullableTypeStrategy to implement aggregate and get methods.
I pulled your branch and tried this and it runs into some problems with floats ending up as doubles, though I think this is actually a problem with either the underlying expression selector (expressions don't support floats so I think this is likely the culprit) because testSingleValueFloatAgg fails, so we need a helper function to get the object from the underlying selector to ensure that the thing we get from the selector matches the columnType.
@Override
public void aggregate(ByteBuffer buf, int position)
{
if (isAggregateInvoked) {
throw InvalidInput.exception("Single Value Aggregator would not be applied to more than one row");
}
int written = typeStrategy.write(
buf,
position,
getSelectorObject(),
SingleValueAggregatoractory.DEFAULT_MAX_STRING_SIZE
);
if (written < 0) {
throw InvalidInput.exception("Single Value Aggregator value too big for buffer");
}
isAggregateInvoked = true;
}
(invalid input might not be the actual right exception here for exceeding size limit, since its only a single row we might want to figure out how to pass this in, or make a much larger limit since the user isn't calling this function directly..)
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
return typeStrategy.read(buf, position);
}
@Nullable
private Object getSelectorObject()
{
if (selector.isNull()) {
return null;
}
switch (columnType.getType()) {
case LONG:
return selector.getLong();
case FLOAT:
return selector.getFloat();
case DOUBLE:
return selector.getDouble();
default:
return selector.getObject();
}
}
The getLong/getFloat/getDouble methods could also be updated, there are some static methods in TypeStrategies which can help with this isNullableNull, readNotNullNullableLong, etc.
There was a problem hiding this comment.
would look more cleaner the way you pointed. will update it.
| import java.util.Objects; | ||
|
|
||
| @JsonTypeName("singleValue") | ||
| public class SingleValueAggregatoractory extends AggregatorFactory |
There was a problem hiding this comment.
typo, should be SingleValueAggregatorFactory instead of SingleValueAggregatoractory
There was a problem hiding this comment.
oh! thanks for pointing.
| @Override | ||
| public void aggregate() | ||
| { | ||
| if (isAggregateInvoked) { |
There was a problem hiding this comment.
a null row is still a row in the context of this agg i think so it needs to set isAggregateInvoked
| /** | ||
| * | ||
| */ |
There was a problem hiding this comment.
nit: can just delete if not filling out javadocs
| @JsonTypeName("singleValue") | ||
| public class SingleValueAggregatorFactory extends AggregatorFactory |
There was a problem hiding this comment.
this might be nice to add javadocs for since its primarily to support SQL planner and probably doesn't have many direct use cases
| if (isNullResult) { | ||
| return null; | ||
| } | ||
| return value; |
There was a problem hiding this comment.
this can just return value directly?
| if (isAggregateInvoked) { | ||
| throw InvalidInput.exception("Single Value Aggregator would not be applied to more than one row"); | ||
| } | ||
| boolean isNotNull = !selector.isNull(); |
There was a problem hiding this comment.
like in the buffer agg, this is probably only accurate for numeric primitive selectors, since typically things that call getObject do not check isNull. isNull is mainly used when you plan to call getLong or the like (since java primitives cannot be null, so they use this method instead).
You could potentially just always call getObject, or could have a method similar to the method for the buffer agg though not sure its as useful here since getObject usually always works (i think)
| switch (aggregationType.getType()) { | ||
| case LONG: | ||
| case FLOAT: | ||
| case DOUBLE: | ||
| case STRING: | ||
| return new SingleValueAggregatorFactory(name, fieldName, aggregationType); | ||
| default: | ||
| // This error refers to the Druid type. But, we're in SQL validation. | ||
| // It should refer to the SQL type. | ||
| throw SimpleSqlAggregator.badTypeException(fieldName, "SINGLE_VALUE", aggregationType); | ||
| } |
There was a problem hiding this comment.
i think i already asked this, but is this needed or can it just always make a SingleValueAggregatorFactory with any aggregationType?
| new QueryDataSource(GroupByQuery.builder() | ||
| .setDataSource(new QueryDataSource( | ||
| Druids.newTimeseriesQueryBuilder() | ||
| .dataSource(CalciteTests.DATASOURCE1) | ||
| .intervals(querySegmentSpec(Filtration.eternity())) | ||
| .granularity(Granularities.ALL) | ||
| .aggregators(new FloatMaxAggregatorFactory("a0", "m1")) | ||
| .build() | ||
| )) | ||
| .setInterval(querySegmentSpec(Filtration.eternity())) | ||
| .setGranularity(Granularities.ALL) | ||
| .setVirtualColumns(expressionVirtualColumn( | ||
| "v0", | ||
| "(\"a0\" - 3.5)", | ||
| ColumnType.DOUBLE | ||
| ) | ||
| ) | ||
| .setAggregatorSpecs( | ||
| aggregators( | ||
| new SingleValueAggregatorFactory( | ||
| "_a0", | ||
| "v0", | ||
| ColumnType.DOUBLE | ||
| ) | ||
| ) | ||
| ) | ||
| .setLimitSpec(NoopLimitSpec.instance()) | ||
| .setContext(QUERY_CONTEXT_DEFAULT) | ||
| .build() |
There was a problem hiding this comment.
hmm, this doesn't need to be part of this PR, but it seems like there is room for improvement in the planner here... Like in this case i think the planner should just collapse this subquery into a single timeseries query with an expression post-aggregator, instead of a group by on a timeseries
| public void aggregate() | ||
| { | ||
| if (isAggregateInvoked) { | ||
| throw InvalidInput.exception("Single Value Aggregator would not be applied to more than one row"); |
There was a problem hiding this comment.
Since this is unlikely to be used directly, I wonder if this error message should mention sub-queries? I was playing around with postgres and it returns something like
error: more than one row returned by a subquery used as an expression so i wonder if we should do something similar (probably don't copy it exactly :p). It might be nice to include the name of the input column. Be sure to update the buffer agg error if we change this too
| public float getFloat(ByteBuffer buf, int position) | ||
| { | ||
| if (TypeStrategies.isNullableNull(buf, position)) { | ||
| throw new IllegalStateException("Cannot return float for Null Value"); |
There was a problem hiding this comment.
nit: can use DruidException.defensive since this shouldn't happen in practice because callers should know to call isNull if values can be null (if it happens it is a coding error probably)
| Object value; | ||
|
|
||
| private boolean isNullResult = true; | ||
|
|
There was a problem hiding this comment.
nit: Spacing not needed
| if (isNullResult) { | ||
| throw DruidException.defensive("Cannot return double for Null Value"); | ||
| } |
There was a problem hiding this comment.
Probably for SQL-incompatible behavior, let's just return the default value. It's the onus of the caller to call .isNull and then the .getDouble. I am wondering if any SQL-compatible path that just calls .getDouble without calling .isNull() can cause it to throw. Anyways, it will also prevent an additional check in this path. Same goes for other selectors.
| boolean isNotNull = (selector.getObject() != null); | ||
| if (isNotNull) { | ||
| isNullResult = false; | ||
| value = selector.getObject(); |
There was a problem hiding this comment.
Should probably reuse the .getObject() called above, because for primitives that would cause autoboxing.
I was going to suggest using .isNull() check at the top instead, however, I don't think they might behave correctly for object selectors, therefore refraining from the suggestion.
| @Test | ||
| public void testSingleValueEmptyInnerAgg() | ||
| { | ||
| msqIncompatible(); |
There was a problem hiding this comment.
Why msqIncompatible()? I think this test isn't subclassed by MSQ anyways, so we can get rid of these calls, however, I do expect it to be MSQ-compatible.
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| public class SingleValueSqlAggregator extends SimpleSqlAggregator |
There was a problem hiding this comment.
nit: Can add javadoc as to how this will be called by the SQL query, since its not something that the user will supply.
| public SingleValueAggregationTest() throws Exception | ||
| { | ||
| String longAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"lng\", \"fieldName\": \"lngFld\", \"columnType\": \"LONG\"}"; | ||
| longAggFatory = TestHelper.makeJsonMapper().readValue(longAggSpecJson, SingleValueAggregatorFactory.class); |
There was a problem hiding this comment.
| longAggFatory = TestHelper.makeJsonMapper().readValue(longAggSpecJson, SingleValueAggregatorFactory.class); | |
| longAggFactory = TestHelper.makeJsonMapper().readValue(longAggSpecJson, SingleValueAggregatorFactory.class); |
| doubleAggFatory = TestHelper.makeJsonMapper().readValue(doubleAggSpecJson, SingleValueAggregatorFactory.class); | ||
|
|
||
| String strAggSpecJson = "{\"type\": \"singleValue\", \"name\": \"str\", \"fieldName\": \"strFld\", \"columnType\": \"STRING\"}"; | ||
| stringAggFatory = TestHelper.makeJsonMapper().readValue(strAggSpecJson, SingleValueAggregatorFactory.class); | ||
| } |
| @Before | ||
| public void setup() | ||
| { | ||
| NullHandling.initializeForTests(); |
There was a problem hiding this comment.
nit: Instead of calling it in the setup(), we can subclass the test class with extends InitializedNullHandlingTest
| /** | ||
| */ |
| EasyMock.expect(colSelectorFactoryLong.makeColumnValueSelector("lngFld")).andReturn(selectorLong); | ||
| EasyMock.expect(colSelectorFactoryLong.getColumnCapabilities("lngFld")).andReturn(columnCapabilitiesLong); | ||
|
|
||
| EasyMock.replay(columnCapabilitiesLong); | ||
| EasyMock.replay(colSelectorFactoryLong); | ||
|
|
||
| selectorDouble = new TestDoubleColumnSelectorImpl(doubleValues); | ||
| columnCapabilitiesDouble = EasyMock.createMock(ColumnCapabilities.class); | ||
| EasyMock.expect(columnCapabilitiesDouble.getType()).andReturn(ValueType.DOUBLE); | ||
|
|
||
| colSelectorFactoryDouble = EasyMock.createMock(ColumnSelectorFactory.class); | ||
| EasyMock.expect(colSelectorFactoryDouble.makeColumnValueSelector("dblFld")).andReturn(selectorDouble); | ||
| EasyMock.expect(colSelectorFactoryDouble.getColumnCapabilities("dblFld")).andReturn(columnCapabilitiesDouble); | ||
|
|
||
| EasyMock.replay(columnCapabilitiesDouble); | ||
| EasyMock.replay(colSelectorFactoryDouble); |
There was a problem hiding this comment.
There should be a cleaner way than mocking the selector factory and the column capabilities. For column capabilities, you can use the ColumnCapabilitiesImpl#createSimpleNumericColumnCapabilities et al
. I was going through the code to find any reusable class for the column selector factory and I found TestColumnSelectorFactory. Perhaps others can achieve this more cleanly.
| if (isNullResult) { | ||
| throw DruidException.defensive("Cannot return float for Null Value"); | ||
| } | ||
| return (float) value; |
There was a problem hiding this comment.
| return (float) value; | |
| return ((Number) value).floatValue(); |
| final ColumnValueSelector selector; | ||
| @Nullable | ||
| Object value; |
There was a problem hiding this comment.
nit: private for consistency.
| boolean isNotNull = (selectorObject != null); | ||
| if (isNotNull) { | ||
| isNullResult = false; | ||
| value = selectorObject; | ||
| } |
There was a problem hiding this comment.
nit: Coding style:
Now that it's written this way, it seems redundant to have isNullResult and isNotNull.
Following seems a much cleaner way
| boolean isNotNull = (selectorObject != null); | |
| if (isNotNull) { | |
| isNullResult = false; | |
| value = selectorObject; | |
| } | |
| value = selector.getObject(); | |
| } |
We don't use any null check here.
The method isNull will do something like:
public boolean isNull()
{
return value == null;
}Then the methods relying on the isNullResult can use isNull() instead.
public float getFloat()
{
return isNull() ? NullHandling.ZERO_FLOAT : ((Number) value).floatValue();
}This will prevent the code from maintaining two variables denoting same information.
| return "SingleValueAggregator{" + | ||
| "selector=" + selector + | ||
| '}'; |
There was a problem hiding this comment.
Shouldn't this also print the value and aggregateInvoked?
| @JsonProperty | ||
| @JsonInclude(JsonInclude.Include.NON_NULL) | ||
| private final ColumnType columnType; | ||
| public static final int DEFAULT_MAX_BUFFER_SIZE = 1025; |
There was a problem hiding this comment.
Why 1025, and not 1024? (Latter seems more "correct")
| if (columnType.isNumeric()) { | ||
| return Byte.BYTES + Double.BYTES; | ||
| } | ||
| return DEFAULT_MAX_BUFFER_SIZE; |
There was a problem hiding this comment.
nit: Is this correct, for long values?
| final ColumnValueSelector selector; | ||
| final ColumnType columnType; | ||
| final NullableTypeStrategy typeStrategy; | ||
| private boolean isAggregateInvoked = false; |
There was a problem hiding this comment.
nit: mark private for consistency
| final NullableTypeStrategy typeStrategy; | ||
| private boolean isAggregateInvoked = false; | ||
|
|
||
| SingleValueBufferAggregator(ColumnValueSelector selector, ColumnType columnType) |
There was a problem hiding this comment.
public, since the SingleValueAggregator is also public scope
| { | ||
| this.selector = selector; | ||
| this.columnType = columnType; | ||
| this.typeStrategy = columnType.getNullableStrategy(); |
There was a problem hiding this comment.
This would NPE if columnType is null, hence we should add the null check in the aggregator factory.
| @Override | ||
| public float getFloat(ByteBuffer buf, int position) | ||
| { | ||
| return TypeStrategies.isNullableNull(buf, position) |
There was a problem hiding this comment.
I wonder if two calls are required, since we already have the nullable type strategy, so perhaps we can read it once, and check if that is null. However, not important, since its called for a single row only.
| @Override | ||
| public long getLong() | ||
| { | ||
| return isNullResult ? NullHandling.ZERO_LONG : ((Number) value).longValue(); |
There was a problem hiding this comment.
Let's add an assertion in the primitive selectors that if the mode is sql compatible, then the selector cannot be null.
| return isNullResult ? NullHandling.ZERO_LONG : ((Number) value).longValue(); | |
| assert NullHandling.replaceWithDefault() || !isNull(); | |
| return isNull() ? NullHandling.ZERO_LONG : ((Number) value).longValue(); |
| public class SingleValueAggregatorFactory extends AggregatorFactory | ||
| { | ||
| @JsonProperty | ||
| @JsonInclude |
There was a problem hiding this comment.
We don't need JsonInclude annotation
| @JsonInclude |
LakshSingla
left a comment
There was a problem hiding this comment.
Thanks for being accommodating with the reviews. Final thoughts on the PR.
| { | ||
| ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); | ||
| ColumnCapabilities columnCapabilities = metricFactory.getColumnCapabilities(fieldName); | ||
| Preconditions.checkNotNull(columnCapabilities, "Unable to get the capabilities of [%s]", fieldName); |
There was a problem hiding this comment.
null checks are better done using an if clause. Preconditions is a shorthand, we mostly use in the constructor. It doesn't play nice with the DruidExcption system, because now we don't know the target persona for the error message. Therefore, let's throw a DruidException here, because, now it will force us to think of the persona of the error message, and word it accordingly.
I think it should be aimed at devs only, because users/admins/operators won't know what to do if such an error occurs, and won't make sense for them.
Also, check out https://github.com/apache/druid/blob/master/dev/style-conventions.md#message-formatting-for-logs-and-exceptions. The message should make sense if all the extrapolation ([%s]) is removed. Therefore, the message should be
| Preconditions.checkNotNull(columnCapabilities, "Unable to get the capabilities of [%s]", fieldName); | |
| Preconditions.checkNotNull(columnCapabilities, "Unable to get the capabilities of field [%s]", fieldName); |
| /** | ||
| * Combine method would never be invoked as the broker sends the subquery to multiple segments | ||
| * and gather the results to a single value on which the single value aggregator is applied. | ||
| * Though getCombiningFactory would be invoked for understanding the fieldname. | ||
| */ |
| public long getLong() | ||
| { | ||
| assert validObjectValue(); | ||
| return (value == null) ? NullHandling.ZERO_LONG : ((Number) value).longValue(); |
There was a problem hiding this comment.
@LakshSingla had to modify the check to value rather than relying on isNull(). Else it was causing NPE.
we are checking the validity in the assertion above.
| return NullHandling.sqlCompatible() && value == null; | ||
| } | ||
|
|
||
| private boolean validObjectValue() |
There was a problem hiding this comment.
nit:
| private boolean validObjectValue() | |
| private boolean validPrimitiveValue() |
| @Nullable | ||
| private Object getSelectorObject() | ||
| { | ||
| if (columnType.isNumeric() && selector.isNull()) { | ||
| return null; | ||
| } | ||
| switch (columnType.getType()) { | ||
| case LONG: | ||
| return selector.getLong(); | ||
| case FLOAT: | ||
| return selector.getFloat(); | ||
| case DOUBLE: | ||
| return selector.getDouble(); | ||
| default: | ||
| return selector.getObject(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Since we are boxing it to Object, I guess we can remove this method and call selector.getObject() wherever this method is used.
The benefit of using .getLong() is that we don't need to auto box-unbox, whenever we are working with primitive selectors, however, that is getting lost in the translation here. Therefore, we can remove this method, since it is don't what selector.getObject() will be doing for primitive types anyways.
There was a problem hiding this comment.
i was a bit less confident that all of the numeric column value selectors (or things that present themselves as numbers) are implementing getObject equivalently to isNull/get primitive methods, which is why I advised doing this as a defensive measure, that said, it is probably safe to just call getObject...
There was a problem hiding this comment.
If that's the case, I am cool with the current code.
|
Thank you, @sreemanamala for your first contribution |
Description
Added the Single Value Aggregator functionality for scalar queries in group by queries
Fixed the bug ...
Executing single value correlated queries will throw an exception today since
single_valuefunction is not available in druid.With these added classes, this provides druid, the capability to plan and run such queries.
Renamed the class ...
Added a forbidden-apis entry ...
Release note
Supporting Single Value aggregated group by queries for scalars
Key changed/added classes in this PR
SingleValueSqlAggregatorSingleValueAggregatorFactory,SingleValueBufferAggregator,SingleValueAggregatorAggregatorsModule,AggregatorUtilDruidOperatorTableCalciteSingleValueAggregatorTestThis PR has: