diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java index 395336ff2ec6..dafc6ca3e70c 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java @@ -205,10 +205,18 @@ public String getFormatString() "SELECT LATEST(long1) FROM foo", // 39: LATEST aggregator double "SELECT LATEST(double4) FROM foo", - // 40: LATEST aggregator double + // 40: LATEST aggregator float "SELECT LATEST(float3) FROM foo", - // 41: LATEST aggregator double - "SELECT LATEST(float3), LATEST(long1), LATEST(double4) FROM foo" + // 41: LATEST aggregator all + "SELECT LATEST(float3), LATEST(long1), LATEST(double4) FROM foo", + // 42: EARLIEST aggregator + "SELECT EARLIEST(long1) FROM foo", + // 43: EARLIEST aggregator double + "SELECT EARLIEST(double4) FROM foo", + // 44: EARLIEST aggregator float + "SELECT EARLIEST(float3) FROM foo", + // 45: EARLIEST aggregator all + "SELECT EARLIEST(float3), EARLIEST(long1), EARLIEST(double4) FROM foo" ); @Param({"5000000"}) @@ -264,7 +272,11 @@ public String getFormatString() "38", "39", "40", - "41" + "41", + "42", + "43", + "44", + "45" }) private String query; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java index 444ade46139d..4272420a99fc 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java @@ -29,14 +29,21 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -125,6 +132,29 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) } } + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + + @Override + public VectorAggregator factorizeVector( + VectorColumnSelectorFactory columnSelectorFactory + ) + { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); + //time is always long + BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( + timeColumn); + if (capabilities == null || capabilities.isNumeric()) { + return new DoubleFirstVectorAggregator(timeSelector, valueSelector); + } else { + return NumericNilVectorAggregator.doubleNilVectorAggregator(); + } + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java new file mode 100644 index 000000000000..c05a5c162f5f --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Vectorized version of on heap 'earliest' aggregator for column selectors with type LONG.. + */ +public class DoubleFirstVectorAggregator extends NumericFirstVectorAggregator +{ + double firstValue; + + public DoubleFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) + { + super(timeSelector, valueSelector); + firstValue = 0; + } + + @Override + public void initValue(ByteBuffer buf, int position) + { + buf.putDouble(position, 0); + } + + + @Override + void putValue(ByteBuffer buf, int position, int index) + { + firstValue = valueSelector.getDoubleVector()[index]; + buf.putDouble(position, firstValue); + } + + + /** + * @return The primitive object stored at the position in the buffer. + */ + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + final boolean rhsNull = isValueNull(buf, position); + return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getDouble(position + VALUE_OFFSET)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java index f8a592d58685..01719d3324ad 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java @@ -29,14 +29,21 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -123,6 +130,29 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) } } + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + + @Override + public VectorAggregator factorizeVector( + VectorColumnSelectorFactory columnSelectorFactory + ) + { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); + //time is always long + BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( + timeColumn); + if (capabilities == null || capabilities.isNumeric()) { + return new FloatFirstVectorAggregator(timeSelector, valueSelector); + } else { + return NumericNilVectorAggregator.floatNilVectorAggregator(); + } + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java new file mode 100644 index 000000000000..da5edded9488 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Vectorized version of on heap 'earliest' aggregator for column selectors with type LONG.. + */ +public class FloatFirstVectorAggregator extends NumericFirstVectorAggregator +{ + float firstValue; + + public FloatFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) + { + super(timeSelector, valueSelector); + firstValue = 0; + } + + @Override + public void initValue(ByteBuffer buf, int position) + { + buf.putFloat(position, 0); + } + + + @Override + void putValue(ByteBuffer buf, int position, int index) + { + firstValue = valueSelector.getFloatVector()[index]; + buf.putFloat(position, firstValue); + } + + + /** + * @return The primitive object stored at the position in the buffer. + */ + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + final boolean rhsNull = isValueNull(buf, position); + return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java index bfc8ae48eaac..18380a139728 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java @@ -29,14 +29,21 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -122,6 +129,28 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) } } + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + + @Override + public VectorAggregator factorizeVector( + VectorColumnSelectorFactory columnSelectorFactory + ) + { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); + BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( + timeColumn); + if (capabilities == null || capabilities.isNumeric()) { + return new LongFirstVectorAggregator(timeSelector, valueSelector); + } else { + return NumericNilVectorAggregator.longNilVectorAggregator(); + } + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java new file mode 100644 index 000000000000..521228f2be61 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Vectorized version of on heap 'earliest' aggregator for column selectors with type LONG.. + */ +public class LongFirstVectorAggregator extends NumericFirstVectorAggregator +{ + long firstValue; + + public LongFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) + { + super(timeSelector, valueSelector); + firstValue = 0; + } + + @Override + public void initValue(ByteBuffer buf, int position) + { + buf.putLong(position, 0); + } + + + @Override + void putValue(ByteBuffer buf, int position, int index) + { + firstValue = valueSelector.getLongVector()[index]; + buf.putLong(position, firstValue); + } + + + /** + * @return The primitive object stored at the position in the buffer. + */ + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + final boolean rhsNull = isValueNull(buf, position); + return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java new file mode 100644 index 000000000000..6f75b20d0461 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Class for vectorized version of first/earliest aggregator over numeric types + */ +public abstract class NumericFirstVectorAggregator implements VectorAggregator +{ + static final int NULL_OFFSET = Long.BYTES; + static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES; + final VectorValueSelector valueSelector; + private final boolean useDefault = NullHandling.replaceWithDefault(); + private final VectorValueSelector timeSelector; + private long firstTime; + + public NumericFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) + { + this.timeSelector = timeSelector; + this.valueSelector = valueSelector; + firstTime = Long.MAX_VALUE; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, Long.MAX_VALUE); + buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE); + initValue(buf, position + VALUE_OFFSET); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final long[] timeVector = timeSelector.getLongVector(); + final boolean[] nullValueVector = valueSelector.getNullVector(); + boolean nullAbsent = false; + firstTime = buf.getLong(position); + // check if nullVector is found or not + // the nullVector is null if no null values are found + // set the nullAbsent flag accordingly + if (nullValueVector == null) { + nullAbsent = true; + } + + // the time vector is already sorted so the first element would be the earliest + // traverse accordingly + int index = startRow; + if (!useDefault && !nullAbsent) { + for (int i = startRow; i < endRow; i++) { + if (!nullValueVector[i]) { + index = i; + break; + } + } + } + + // find the first non-null value + final long earliestTime = timeVector[index]; + if (earliestTime < firstTime) { + firstTime = earliestTime; + if (useDefault || nullValueVector == null || !nullValueVector[index]) { + updateTimeWithValue(buf, position, firstTime, index); + } else { + updateTimeWithNull(buf, position, firstTime); + } + } + } + + /** + * + * Checks if the aggregated value at a position in the buffer is null or not + * + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the current aggregate value is stored + * @return + */ + boolean isValueNull(ByteBuffer buf, int position) + { + return buf.get(position + NULL_OFFSET) == NullHandling.IS_NULL_BYTE; + } + + @Override + public void aggregate( + ByteBuffer buf, + int numRows, + int[] positions, + @Nullable int[] rows, + int positionOffset + ) + { + boolean[] nulls = useDefault ? null : valueSelector.getNullVector(); + long[] timeVector = timeSelector.getLongVector(); + + for (int i = 0; i < numRows; i++) { + int position = positions[i] + positionOffset; + int row = rows == null ? i : rows[i]; + long firstTime = buf.getLong(position); + if (timeVector[row] < firstTime) { + if (useDefault || nulls == null || !nulls[row]) { + updateTimeWithValue(buf, position, timeVector[row], row); + } else { + updateTimeWithNull(buf, position, timeVector[row]); + } + } + } + } + + /** + * Updates the time and the non null values to the appropriate position in buffer + * + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the current aggregate value is stored + * @param time the time to be updated in the buffer as the last time + * @param index the index of the vectorized vector which is the last value + */ + void updateTimeWithValue(ByteBuffer buf, int position, long time, int index) + { + buf.putLong(position, time); + buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + putValue(buf, position + VALUE_OFFSET, index); + } + + /** + *Updates the time only to the appropriate position in buffer as the value is null + * + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the current aggregate value is stored + * @param time the time to be updated in the buffer as the last time + */ + void updateTimeWithNull(ByteBuffer buf, int position, long time) + { + buf.putLong(position, time); + buf.put(position + NULL_OFFSET, NullHandling.IS_NULL_BYTE); + } + + /** + *Abstract function which needs to be overridden by subclasses to set the initial value + */ + abstract void initValue(ByteBuffer buf, int position); + + /** + *Abstract function which needs to be overridden by subclasses to set the + * latest value in the buffer depending on the datatype + */ + abstract void putValue(ByteBuffer buf, int position, int index); + + @Override + public void close() + { + // no resources to cleanup + } + +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java new file mode 100644 index 000000000000..1575b33dcef0 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +@RunWith(MockitoJUnitRunner.class) +public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTest +{ + private static final double EPSILON = 1e-5; + private static final double[] VALUES = new double[]{7.8d, 11, 23.67, 60}; + private static final boolean[] NULLS = new boolean[]{false, false, true, false}; + private long[] times = {2436, 6879, 7888, 8224}; + + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final String TIME_COL = "__time"; + + @Mock + private VectorValueSelector selector; + @Mock + private BaseLongVectorValueSelector timeSelector; + private ByteBuffer buf; + + private DoubleFirstVectorAggregator target; + + private DoubleFirstAggregatorFactory doubleFirstAggregatorFactory; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private VectorColumnSelectorFactory selectorFactory; + + @Before + public void setup() + { + byte[] randomBytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + Mockito.doReturn(VALUES).when(selector).getDoubleVector(); + Mockito.doReturn(times).when(timeSelector).getLongVector(); + target = new DoubleFirstVectorAggregator(timeSelector, selector); + clearBufferForPositions(0, 0); + + Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(selector).when(selectorFactory).makeValueSelector(FIELD_NAME); + Mockito.doReturn(timeSelector).when(selectorFactory).makeValueSelector(TIME_COL); + doubleFirstAggregatorFactory = new DoubleFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL); + } + + @Test + public void testFactory() + { + Assert.assertTrue(doubleFirstAggregatorFactory.canVectorize(selectorFactory)); + VectorAggregator vectorAggregator = doubleFirstAggregatorFactory.factorizeVector(selectorFactory); + Assert.assertNotNull(vectorAggregator); + Assert.assertEquals(DoubleFirstVectorAggregator.class, vectorAggregator.getClass()); + } + + @Test + public void initValueShouldInitZero() + { + target.initValue(buf, 0); + double initVal = buf.getDouble(0); + Assert.assertEquals(0, initVal, EPSILON); + } + + @Test + public void aggregate() + { + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateWithNulls() + { + mockNullsVector(); + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateBatchWithoutRows() + { + int[] positions = new int[]{0, 43, 70}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, null, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[i], result.lhs.longValue()); + Assert.assertEquals(VALUES[i], result.rhs, EPSILON); + } + } + + @Test + public void aggregateBatchWithRows() + { + int[] positions = new int[]{0, 43, 70}; + int[] rows = new int[]{3, 2, 0}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, rows, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[rows[i]], result.lhs.longValue()); + Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON); + } + } + + private void clearBufferForPositions(int offset, int... positions) + { + for (int position : positions) { + target.init(buf, offset + position); + } + } + + private void mockNullsVector() + { + if (!NullHandling.replaceWithDefault()) { + Mockito.doReturn(NULLS).when(selector).getNullVector(); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java new file mode 100644 index 000000000000..0eb7afe46e65 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +@RunWith(MockitoJUnitRunner.class) +public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest +{ + private static final double EPSILON = 1e-5; + private static final float[] VALUES = new float[]{7.2f, 15.6f, 2.1f, 150.0f}; + private static final boolean[] NULLS = new boolean[]{true, false, true, false}; + private long[] times = {2436, 6879, 7888, 8224}; + + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final String TIME_COL = "__time"; + + @Mock + private VectorValueSelector selector; + @Mock + private BaseLongVectorValueSelector timeSelector; + private ByteBuffer buf; + + private FloatFirstVectorAggregator target; + + private FloatFirstAggregatorFactory floatFirstAggregatorFactory; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private VectorColumnSelectorFactory selectorFactory; + + @Before + public void setup() + { + byte[] randomBytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + Mockito.doReturn(VALUES).when(selector).getFloatVector(); + Mockito.doReturn(times).when(timeSelector).getLongVector(); + target = new FloatFirstVectorAggregator(timeSelector, selector); + clearBufferForPositions(0, 0); + + Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(selector).when(selectorFactory).makeValueSelector(FIELD_NAME); + Mockito.doReturn(timeSelector).when(selectorFactory).makeValueSelector(TIME_COL); + floatFirstAggregatorFactory = new FloatFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL); + + } + + @Test + public void testFactory() + { + Assert.assertTrue(floatFirstAggregatorFactory.canVectorize(selectorFactory)); + VectorAggregator vectorAggregator = floatFirstAggregatorFactory.factorizeVector(selectorFactory); + Assert.assertNotNull(vectorAggregator); + Assert.assertEquals(FloatFirstVectorAggregator.class, vectorAggregator.getClass()); + } + + @Test + public void initValueShouldBeZero() + { + target.initValue(buf, 0); + float initVal = buf.getFloat(0); + Assert.assertEquals(0.0f, initVal, EPSILON); + } + + @Test + public void aggregate() + { + target.init(buf, 0); + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateWithNulls() + { + mockNullsVector(); + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + if (!NullHandling.replaceWithDefault()) { + Assert.assertEquals(times[1], result.lhs.longValue()); + Assert.assertEquals(VALUES[1], result.rhs, EPSILON); + } else { + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + } + + @Test + public void aggregateBatchWithoutRows() + { + int[] positions = new int[]{0, 43, 70}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, null, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[i], result.lhs.longValue()); + Assert.assertEquals(VALUES[i], result.rhs, EPSILON); + } + } + + @Test + public void aggregateBatchWithRows() + { + int[] positions = new int[]{0, 43, 70}; + int[] rows = new int[]{3, 2, 0}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, rows, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[rows[i]], result.lhs.longValue()); + Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON); + } + } + + private void clearBufferForPositions(int offset, int... positions) + { + for (int position : positions) { + target.init(buf, offset + position); + } + } + + private void mockNullsVector() + { + if (!NullHandling.replaceWithDefault()) { + Mockito.doReturn(NULLS).when(selector).getNullVector(); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java new file mode 100644 index 000000000000..5f2072ef5b25 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + + +@RunWith(MockitoJUnitRunner.class) +public class LongFirstVectorAggregationTest extends InitializedNullHandlingTest +{ + private static final double EPSILON = 1e-5; + private static final long[] VALUES = new long[]{7, 15, 2, 150}; + private static final boolean[] NULLS = new boolean[]{false, false, true, false}; + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final String TIME_COL = "__time"; + private long[] times = {2436, 6879, 7888, 8224}; + @Mock + private VectorValueSelector selector; + @Mock + private BaseLongVectorValueSelector timeSelector; + private ByteBuffer buf; + private LongFirstVectorAggregator target; + + private LongFirstAggregatorFactory longFirstAggregatorFactory; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private VectorColumnSelectorFactory selectorFactory; + + @Before + public void setup() + { + byte[] randomBytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + Mockito.doReturn(VALUES).when(selector).getLongVector(); + Mockito.doReturn(times).when(timeSelector).getLongVector(); + target = new LongFirstVectorAggregator(timeSelector, selector); + clearBufferForPositions(0, 0); + + + Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(selector).when(selectorFactory).makeValueSelector(FIELD_NAME); + Mockito.doReturn(timeSelector).when(selectorFactory).makeValueSelector(TIME_COL); + longFirstAggregatorFactory = new LongFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL); + + } + + @Test + public void testFactory() + { + Assert.assertTrue(longFirstAggregatorFactory.canVectorize(selectorFactory)); + VectorAggregator vectorAggregator = longFirstAggregatorFactory.factorizeVector(selectorFactory); + Assert.assertNotNull(vectorAggregator); + Assert.assertEquals(LongFirstVectorAggregator.class, vectorAggregator.getClass()); + } + + @Test + public void initValueShouldInitZero() + { + target.initValue(buf, 0); + long initVal = buf.getLong(0); + Assert.assertEquals(0, initVal); + } + + @Test + public void aggregate() + { + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateWithNulls() + { + mockNullsVector(); + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateBatchWithoutRows() + { + int[] positions = new int[]{0, 43, 70}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, null, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[i], result.lhs.longValue()); + Assert.assertEquals(VALUES[i], result.rhs, EPSILON); + } + } + + @Test + public void aggregateBatchWithRows() + { + int[] positions = new int[]{0, 43, 70}; + int[] rows = new int[]{3, 2, 0}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, rows, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[rows[i]], result.lhs.longValue()); + Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON); + } + } + + private void clearBufferForPositions(int offset, int... positions) + { + for (int position : positions) { + target.init(buf, offset + position); + } + } + + private void mockNullsVector() + { + if (!NullHandling.replaceWithDefault()) { + Mockito.doReturn(NULLS).when(selector).getNullVector(); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index bd8e4905aeb8..90b75d40706c 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -3437,8 +3437,6 @@ public void testGroupByWithCardinality() @Test public void testGroupByWithFirstLast() { - // Cannot vectorize due to "first", "last" aggregators. - cannotVectorize(); GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) @@ -3527,8 +3525,6 @@ public void testGroupByWithFirstLast() @Test public void testGroupByWithNoResult() { - // Cannot vectorize due to first, last aggregators. - cannotVectorize(); GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) @@ -7226,9 +7222,6 @@ public void testSubqueryWithHyperUniquesPostAggregator() @Test public void testSubqueryWithFirstLast() { - // Cannot vectorize due to "first", "last" aggregators. - cannotVectorize(); - GroupByQuery subquery = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setQuerySegmentSpec(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java index 05848a5e4dd9..ad0b35a3dddc 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -167,9 +167,6 @@ public TimeseriesQueryRunnerTest( @Test public void testEmptyTimeseries() { - // Cannot vectorize due to "doubleFirst" aggregator. - cannotVectorize(); - TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.DATA_SOURCE) .granularity(QueryRunnerTestHelper.ALL_GRAN) @@ -1948,9 +1945,6 @@ public void testTimeseriesWithMultiValueFilteringJavascriptAggregatorAndAlsoRegu @Test public void testTimeseriesWithFirstLastAggregator() { - // Cannot vectorize due to "doubleFirst", "doubleLast" aggregators. - cannotVectorize(); - TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.DATA_SOURCE) .granularity(QueryRunnerTestHelper.MONTH_GRAN) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 9feb8226797f..5aac2c8acde9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -602,7 +602,7 @@ public void testGroupBySingleColumnDescendingNoTopN() throws Exception @Test public void testEarliestAggregators() throws Exception { - // Cannot vectorize EARLIEST aggregator. + // Cannot vectorize EARLIEST aggregator for Strings. skipVectorize(); testQuery( @@ -911,7 +911,7 @@ public void testPrimitiveLatestInSubquery() throws Exception } @Test - public void testPrimitiveLatestInSubqueryGroupBy() throws Exception + public void testLatestOffHeapGroupBy() throws Exception { testQuery( "SELECT dim2, LATEST(m1) AS val1 FROM foo GROUP BY dim2", @@ -948,8 +948,6 @@ public void testPrimitiveLatestInSubqueryGroupBy() throws Exception @Test public void testPrimitiveEarliestInSubquery() throws Exception { - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); testQuery( "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo GROUP BY dim2)", @@ -995,6 +993,40 @@ public void testPrimitiveEarliestInSubquery() throws Exception ); } + @Test + public void testOffHeapEarliestGroupBy() throws Exception + { + testQuery( + "SELECT dim2, EARLIEST(m1) AS val1 FROM foo GROUP BY dim2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators( + new FloatFirstAggregatorFactory("a0", "m1", null) + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() + ? ImmutableList.of( + new Object[]{null, 2.0f}, + new Object[]{"", 3.0f}, + new Object[]{"a", 1.0f}, + new Object[]{"abc", 5.0f} + ) + : ImmutableList.of( + new Object[]{"", 2.0f}, + new Object[]{"a", 1.0f}, + new Object[]{"abc", 5.0f} + + ) + ); + } + // This test the off-heap (buffer) version of the LatestAggregator (String) @Test public void testStringLatestInSubquery() throws Exception @@ -1050,7 +1082,7 @@ public void testStringLatestInSubquery() throws Exception @Test public void testStringEarliestInSubquery() throws Exception { - // Cannot vectorize EARLIEST aggregator. + // Cannot vectorize EARLIEST aggregator for Strings skipVectorize(); testQuery( @@ -1210,8 +1242,6 @@ public void testStringAnyInSubquery() throws Exception @Test public void testEarliestAggregatorsNumericNulls() throws Exception { - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); testQuery( "SELECT EARLIEST(l1), EARLIEST(d1), EARLIEST(f1) FROM druid.numfoo", @@ -1384,8 +1414,7 @@ public void testAnyAggregatorsSkipNullsWithFilter() throws Exception @Test public void testOrderByEarliestFloat() throws Exception { - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); + List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -1431,8 +1460,6 @@ public void testOrderByEarliestFloat() throws Exception @Test public void testOrderByEarliestDouble() throws Exception { - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -1478,8 +1505,7 @@ public void testOrderByEarliestDouble() throws Exception @Test public void testOrderByEarliestLong() throws Exception { - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); + List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -1659,6 +1685,39 @@ public void testOrderByLatestLong() throws Exception ); } + @Test + public void testEarliestVectorAggregators() throws Exception + { + testQuery( + "SELECT " + + "EARLIEST(cnt), EARLIEST(cnt + 1), EARLIEST(m1), EARLIEST(m1+1) " + + "FROM druid.numfoo", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .virtualColumns( + expressionVirtualColumn("v0", "(\"cnt\" + 1)", ColumnType.LONG), + expressionVirtualColumn("v1", "(\"m1\" + 1)", ColumnType.FLOAT) + ) + .aggregators( + aggregators( + new LongFirstAggregatorFactory("a0", "cnt", null), + new LongFirstAggregatorFactory("a1", "v0", null), + new FloatFirstAggregatorFactory("a2", "m1", null), + new FloatFirstAggregatorFactory("a3", "v1", null) + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{1L, 2L, 1.0f, 2.0f} + ) + ); + } + @Test public void testOrderByAnyFloat() throws Exception {