From e8a5d0e0c3305dd50b3e5d31a0db4c487b28efbc Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Mon, 25 Apr 2022 12:58:42 -0700 Subject: [PATCH 1/7] Initial commit for vectorizing numeric first aggregator --- .../first/DoubleFirstVectorAggregator.java | 66 +++++++ .../first/FloatFirstVectorAggregator.java | 66 +++++++ .../first/LongFirstVectorAggregator.java | 66 +++++++ .../first/NumericFirstVectorAggregator.java | 177 ++++++++++++++++++ .../DoubleFirstVectorAggregationTest.java | 134 +++++++++++++ .../FloatFirstVectorAggregationTest.java | 135 +++++++++++++ .../first/LongFirstVectorAggregationTest.java | 134 +++++++++++++ 7 files changed, 778 insertions(+) create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java create mode 100644 processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java create mode 100644 processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java new file mode 100644 index 000000000000..2521b787cdee --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Vectorized version of on heap 'last' aggregator for column selectors with type LONG.. + */ +public class DoubleFirstVectorAggregator extends NumericFirstVectorAggregator +{ + double firstValue; + + public DoubleFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) + { + super(timeSelector, valueSelector); + firstValue = 0; + } + + @Override + public void initValue(ByteBuffer buf, int position) + { + buf.putLong(position, 0); + } + + + @Override + void putValue(ByteBuffer buf, int position, int index) + { + firstValue = valueSelector.getDoubleVector()[index]; + buf.putDouble(position, firstValue); + } + + + /** + * @return The primitive object stored at the position in the buffer. + */ + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + final boolean rhsNull = isValueNull(buf, position); + return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getDouble(position + VALUE_OFFSET)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java new file mode 100644 index 000000000000..8b9c7af48e3b --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Vectorized version of on heap 'last' aggregator for column selectors with type LONG.. + */ +public class FloatFirstVectorAggregator extends NumericFirstVectorAggregator +{ + float firstValue; + + public FloatFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) + { + super(timeSelector, valueSelector); + firstValue = 0; + } + + @Override + public void initValue(ByteBuffer buf, int position) + { + buf.putFloat(position, 0); + } + + + @Override + void putValue(ByteBuffer buf, int position, int index) + { + firstValue = valueSelector.getFloatVector()[index]; + buf.putFloat(position, firstValue); + } + + + /** + * @return The primitive object stored at the position in the buffer. + */ + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + final boolean rhsNull = isValueNull(buf, position); + return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java new file mode 100644 index 000000000000..0c999281c516 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Vectorized version of on heap 'last' aggregator for column selectors with type LONG.. + */ +public class LongFirstVectorAggregator extends NumericFirstVectorAggregator +{ + long firstValue; + + public LongFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) + { + super(timeSelector, valueSelector); + firstValue = 0; + } + + @Override + public void initValue(ByteBuffer buf, int position) + { + buf.putLong(position, 0); + } + + + @Override + void putValue(ByteBuffer buf, int position, int index) + { + firstValue = valueSelector.getLongVector()[index]; + buf.putLong(position, firstValue); + } + + + /** + * @return The primitive object stored at the position in the buffer. + */ + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + final boolean rhsNull = isValueNull(buf, position); + return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET)); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java new file mode 100644 index 000000000000..9fdecaad0e74 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public abstract class NumericFirstVectorAggregator implements VectorAggregator +{ + static final int NULL_OFFSET = Long.BYTES; + static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES; + final VectorValueSelector valueSelector; + private final boolean useDefault = NullHandling.replaceWithDefault(); + private final VectorValueSelector timeSelector; + private long firstTime; + protected boolean rhsNull; + + public NumericFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) + { + this.timeSelector = timeSelector; + this.valueSelector = valueSelector; + firstTime = Long.MAX_VALUE; + rhsNull = !useDefault; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, Long.MAX_VALUE); + buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE); + initValue(buf, position + VALUE_OFFSET); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final long[] timeVector = timeSelector.getLongVector(); + final boolean[] nullValueVector = valueSelector.getNullVector(); + boolean nullAbsent = false; + firstTime = buf.getLong(position); + //check if nullVector is found or not + // the nullVector is null if no null values are found + // set the nullAbsent flag accordingly + if (nullValueVector == null) { + nullAbsent = true; + } + + //the time vector is already sorted so the first element would be the earliest + //traverse accordingly + int index = startRow; + if (!useDefault && !nullAbsent) { + for (int i = startRow; i < endRow; i++) { + if (!nullValueVector[i]) { + index = i; + break; + } + } + } + + //find the first non-null value + final long earliestTime = timeVector[index]; + if (earliestTime <= firstTime) { + firstTime = earliestTime; + if (useDefault || nullValueVector == null || !nullValueVector[index]) { + updateTimeWithValue(buf, position, firstTime, index); + rhsNull = false; + } else { + updateTimeWithNull(buf, position, firstTime); + rhsNull = true; + } + } + } + + /** + * + * Checks if the aggregated value at a position in the buffer is null or not + * + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the current aggregate value is stored + * @return + */ + boolean isValueNull(ByteBuffer buf, int position) + { + return buf.get(position + NULL_OFFSET) == NullHandling.IS_NULL_BYTE; + } + + @Override + public void aggregate( + ByteBuffer buf, + int numRows, + int[] positions, + @Nullable int[] rows, + int positionOffset + ) + { + boolean[] nulls = useDefault ? null : valueSelector.getNullVector(); + long[] timeVector = timeSelector.getLongVector(); + + for (int i = 0; i < numRows; i++) { + int position = positions[i] + positionOffset; + int row = rows == null ? i : rows[i]; + long firstTime = buf.getLong(position); + if (timeVector[row] <= firstTime) { + if (useDefault || nulls == null || !nulls[row]) { + updateTimeWithValue(buf, position, timeVector[row], row); + } else { + updateTimeWithNull(buf, position, timeVector[row]); + } + } + } + } + + /** + * + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the current aggregate value is stored + * @param time the time to be updated in the buffer as the last time + * @param index the index of the vectorized vector which is the last value + */ + void updateTimeWithValue(ByteBuffer buf, int position, long time, int index) + { + buf.putLong(position, time); + buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + putValue(buf, position + VALUE_OFFSET, index); + } + + /** + * + * @param buf byte buffer storing the byte array representation of the aggregate + * @param position offset within the byte buffer at which the current aggregate value is stored + * @param time the time to be updated in the buffer as the last time + */ + void updateTimeWithNull(ByteBuffer buf, int position, long time) + { + buf.putLong(position, time); + buf.put(position + NULL_OFFSET, NullHandling.IS_NULL_BYTE); + } + + /** + *Abstract function which needs to be overridden by subclasses to set the initial value + */ + abstract void initValue(ByteBuffer buf, int position); + + /** + *Abstract function which needs to be overridden by subclasses to set the + * latest value in the buffer depending on the datatype + */ + abstract void putValue(ByteBuffer buf, int position, int index); + + @Override + public void close() + { + // no resources to cleanup + } + +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java new file mode 100644 index 000000000000..7bd3c98511a3 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +@RunWith(MockitoJUnitRunner.class) +public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTest +{ + private static final double EPSILON = 1e-5; + private static final double[] VALUES = new double[]{7.8d, 11, 23.67, 60}; + private static final boolean[] NULLS = new boolean[]{false, false, true, false}; + private long[] times = {2436, 6879, 7888, 8224}; + + @Mock + private VectorValueSelector selector; + @Mock + private VectorValueSelector timeSelector; + private ByteBuffer buf; + + private DoubleFirstVectorAggregator target; + + @Before + public void setup() + { + byte[] randomBytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + Mockito.doReturn(VALUES).when(selector).getDoubleVector(); + Mockito.doReturn(times).when(timeSelector).getLongVector(); + target = new DoubleFirstVectorAggregator(timeSelector, selector); + clearBufferForPositions(0, 0); + } + + @Test + public void initValueShouldInitZero() + { + target.initValue(buf, 0); + double initVal = buf.getDouble(0); + Assert.assertEquals(0, initVal, EPSILON); + } + + @Test + public void aggregate() + { + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateWithNulls() + { + mockNullsVector(); + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateBatchWithoutRows() + { + int[] positions = new int[]{0, 43, 70}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, null, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[i], result.lhs.longValue()); + Assert.assertEquals(VALUES[i], result.rhs, EPSILON); + } + } + + @Test + public void aggregateBatchWithRows() + { + int[] positions = new int[]{0, 43, 70}; + int[] rows = new int[]{3, 2, 0}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, rows, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[rows[i]], result.lhs.longValue()); + Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON); + } + } + + private void clearBufferForPositions(int offset, int... positions) + { + for (int position : positions) { + target.init(buf, offset + position); + } + } + + private void mockNullsVector() + { + if (!NullHandling.replaceWithDefault()) { + Mockito.doReturn(NULLS).when(selector).getNullVector(); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java new file mode 100644 index 000000000000..dfc8e543f74d --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +@RunWith(MockitoJUnitRunner.class) +public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest +{ + private static final double EPSILON = 1e-5; + private static final float[] VALUES = new float[]{7.2f, 15.6f, 2.1f, 150.0f}; + private static final boolean[] NULLS = new boolean[]{false, false, true, false}; + private long[] times = {2436, 6879, 7888, 8224}; + + @Mock + private VectorValueSelector selector; + @Mock + private VectorValueSelector timeSelector; + private ByteBuffer buf; + + private FloatFirstVectorAggregator target; + + @Before + public void setup() + { + byte[] randomBytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + Mockito.doReturn(VALUES).when(selector).getFloatVector(); + Mockito.doReturn(times).when(timeSelector).getLongVector(); + target = new FloatFirstVectorAggregator(timeSelector, selector); + clearBufferForPositions(0, 0); + } + + @Test + public void initValueShouldBeZero() + { + target.initValue(buf, 0); + float initVal = buf.getFloat(0); + Assert.assertEquals(0.0f, initVal, EPSILON); + } + + @Test + public void aggregate() + { + target.init(buf, 0); + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateWithNulls() + { + mockNullsVector(); + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateBatchWithoutRows() + { + int[] positions = new int[]{0, 43, 70}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, null, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[i], result.lhs.longValue()); + Assert.assertEquals(VALUES[i], result.rhs, EPSILON); + } + } + + @Test + public void aggregateBatchWithRows() + { + int[] positions = new int[]{0, 43, 70}; + int[] rows = new int[]{3, 2, 0}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, rows, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[rows[i]], result.lhs.longValue()); + Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON); + } + } + + private void clearBufferForPositions(int offset, int... positions) + { + for (int position : positions) { + target.init(buf, offset + position); + } + } + + private void mockNullsVector() + { + if (!NullHandling.replaceWithDefault()) { + Mockito.doReturn(NULLS).when(selector).getNullVector(); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java new file mode 100644 index 000000000000..387eaa597abf --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.vector.VectorValueSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +@RunWith(MockitoJUnitRunner.class) +public class LongFirstVectorAggregationTest extends InitializedNullHandlingTest +{ + private static final double EPSILON = 1e-5; + private static final long[] VALUES = new long[]{7, 15, 2, 150}; + private static final boolean[] NULLS = new boolean[]{false, false, true, false}; + private long[] times = {2436, 6879, 7888, 8224}; + + @Mock + private VectorValueSelector selector; + @Mock + private VectorValueSelector timeSelector; + private ByteBuffer buf; + + private LongFirstVectorAggregator target; + + @Before + public void setup() + { + byte[] randomBytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(randomBytes); + buf = ByteBuffer.wrap(randomBytes); + Mockito.doReturn(VALUES).when(selector).getLongVector(); + Mockito.doReturn(times).when(timeSelector).getLongVector(); + target = new LongFirstVectorAggregator(timeSelector, selector); + clearBufferForPositions(0, 0); + } + + @Test + public void initValueShouldInitZero() + { + target.initValue(buf, 0); + long initVal = buf.getLong(0); + Assert.assertEquals(0, initVal); + } + + @Test + public void aggregate() + { + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateWithNulls() + { + mockNullsVector(); + target.aggregate(buf, 0, 0, VALUES.length); + Pair result = (Pair) target.get(buf, 0); + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } + + @Test + public void aggregateBatchWithoutRows() + { + int[] positions = new int[]{0, 43, 70}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, null, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[i], result.lhs.longValue()); + Assert.assertEquals(VALUES[i], result.rhs, EPSILON); + } + } + + @Test + public void aggregateBatchWithRows() + { + int[] positions = new int[]{0, 43, 70}; + int[] rows = new int[]{3, 2, 0}; + int positionOffset = 2; + clearBufferForPositions(positionOffset, positions); + target.aggregate(buf, 3, positions, rows, positionOffset); + for (int i = 0; i < positions.length; i++) { + Pair result = (Pair) target.get(buf, positions[i] + positionOffset); + Assert.assertEquals(times[rows[i]], result.lhs.longValue()); + Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON); + } + } + + private void clearBufferForPositions(int offset, int... positions) + { + for (int position : positions) { + target.init(buf, offset + position); + } + } + + private void mockNullsVector() + { + if (!NullHandling.replaceWithDefault()) { + Mockito.doReturn(NULLS).when(selector).getNullVector(); + } + } +} From cf46d4f051346e7b7215b24daa8cd85dd599007d Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Mon, 25 Apr 2022 16:53:42 -0700 Subject: [PATCH 2/7] Adding test cases and updating agg factory --- .../first/DoubleFirstAggregatorFactory.java | 30 +++++++ .../first/FloatFirstAggregatorFactory.java | 30 +++++++ .../first/LongFirstAggregatorFactory.java | 29 +++++++ .../druid/sql/calcite/CalciteQueryTest.java | 83 ++++++++++++++++--- 4 files changed, 160 insertions(+), 12 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java index 444ade46139d..4272420a99fc 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java @@ -29,14 +29,21 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -125,6 +132,29 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) } } + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + + @Override + public VectorAggregator factorizeVector( + VectorColumnSelectorFactory columnSelectorFactory + ) + { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); + //time is always long + BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( + timeColumn); + if (capabilities == null || capabilities.isNumeric()) { + return new DoubleFirstVectorAggregator(timeSelector, valueSelector); + } else { + return NumericNilVectorAggregator.doubleNilVectorAggregator(); + } + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java index f8a592d58685..01719d3324ad 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java @@ -29,14 +29,21 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -123,6 +130,29 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) } } + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + + @Override + public VectorAggregator factorizeVector( + VectorColumnSelectorFactory columnSelectorFactory + ) + { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); + //time is always long + BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( + timeColumn); + if (capabilities == null || capabilities.isNumeric()) { + return new FloatFirstVectorAggregator(timeSelector, valueSelector); + } else { + return NumericNilVectorAggregator.floatNilVectorAggregator(); + } + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java index bfc8ae48eaac..18380a139728 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java @@ -29,14 +29,21 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -122,6 +129,28 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) } } + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return true; + } + + @Override + public VectorAggregator factorizeVector( + VectorColumnSelectorFactory columnSelectorFactory + ) + { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); + BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( + timeColumn); + if (capabilities == null || capabilities.isNumeric()) { + return new LongFirstVectorAggregator(timeSelector, valueSelector); + } else { + return NumericNilVectorAggregator.longNilVectorAggregator(); + } + } + @Override public Comparator getComparator() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 7898606e39ea..2ff0720d3dc1 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -602,7 +602,7 @@ public void testGroupBySingleColumnDescendingNoTopN() throws Exception @Test public void testEarliestAggregators() throws Exception { - // Cannot vectorize EARLIEST aggregator. + // Cannot vectorize EARLIEST aggregator for Strings. skipVectorize(); testQuery( @@ -885,8 +885,6 @@ public void testPrimitiveLatestInSubquery() throws Exception @Test public void testPrimitiveEarliestInSubquery() throws Exception { - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); testQuery( "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo GROUP BY dim2)", @@ -932,6 +930,40 @@ public void testPrimitiveEarliestInSubquery() throws Exception ); } + @Test + public void testPrimitiveEarliestInSubqueryGroupBy() throws Exception + { + testQuery( + "SELECT dim2, EARLIEST(m1) AS val1 FROM foo GROUP BY dim2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators( + new FloatFirstAggregatorFactory("a0", "m1", null) + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() + ? ImmutableList.of( + new Object[]{null, 2.0f}, + new Object[]{"", 3.0f}, + new Object[]{"a", 1.0f}, + new Object[]{"abc", 5.0f} + ) + : ImmutableList.of( + new Object[]{"", 2.0f}, + new Object[]{"a", 1.0f}, + new Object[]{"abc", 5.0f} + + ) + ); + } + // This test the off-heap (buffer) version of the LatestAggregator (String) @Test public void testStringLatestInSubquery() throws Exception @@ -987,7 +1019,7 @@ public void testStringLatestInSubquery() throws Exception @Test public void testStringEarliestInSubquery() throws Exception { - // Cannot vectorize EARLIEST aggregator. + // Cannot vectorize EARLIEST aggregator for Strings skipVectorize(); testQuery( @@ -1147,8 +1179,6 @@ public void testStringAnyInSubquery() throws Exception @Test public void testEarliestAggregatorsNumericNulls() throws Exception { - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); testQuery( "SELECT EARLIEST(l1), EARLIEST(d1), EARLIEST(f1) FROM druid.numfoo", @@ -1324,8 +1354,7 @@ public void testAnyAggregatorsSkipNullsWithFilter() throws Exception @Test public void testOrderByEarliestFloat() throws Exception { - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); + List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -1371,8 +1400,6 @@ public void testOrderByEarliestFloat() throws Exception @Test public void testOrderByEarliestDouble() throws Exception { - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -1418,8 +1445,7 @@ public void testOrderByEarliestDouble() throws Exception @Test public void testOrderByEarliestLong() throws Exception { - // Cannot vectorize EARLIEST aggregator. - skipVectorize(); + List expected; if (NullHandling.replaceWithDefault()) { expected = ImmutableList.of( @@ -1604,6 +1630,39 @@ public void testOrderByLatestLong() throws Exception ); } + @Test + public void testEarliestVectorAggregators() throws Exception + { + testQuery( + "SELECT " + + "EARLIEST(cnt), EARLIEST(cnt + 1), EARLIEST(m1), EARLIEST(m1+1) " + + "FROM druid.numfoo", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(Granularities.ALL) + .virtualColumns( + expressionVirtualColumn("v0", "(\"cnt\" + 1)", ColumnType.LONG), + expressionVirtualColumn("v1", "(\"m1\" + 1)", ColumnType.FLOAT) + ) + .aggregators( + aggregators( + new LongFirstAggregatorFactory("a0", "cnt", null), + new LongFirstAggregatorFactory("a1", "v0", null), + new FloatFirstAggregatorFactory("a2", "m1", null), + new FloatFirstAggregatorFactory("a3", "v1", null) + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{1L, 2L, 1.0f, 2.0f} + ) + ); + } + @Test public void testOrderByAnyFloat() throws Exception { From d1aac3441a56462e8870abd3986893b977d61c38 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Mon, 25 Apr 2022 18:29:05 -0700 Subject: [PATCH 3/7] Updating an unit testcase to be more meaningful --- .../first/FloatFirstVectorAggregationTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java index dfc8e543f74d..7f16c5959397 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java @@ -39,7 +39,7 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest { private static final double EPSILON = 1e-5; private static final float[] VALUES = new float[]{7.2f, 15.6f, 2.1f, 150.0f}; - private static final boolean[] NULLS = new boolean[]{false, false, true, false}; + private static final boolean[] NULLS = new boolean[]{true, false, true, false}; private long[] times = {2436, 6879, 7888, 8224}; @Mock @@ -86,8 +86,13 @@ public void aggregateWithNulls() mockNullsVector(); target.aggregate(buf, 0, 0, VALUES.length); Pair result = (Pair) target.get(buf, 0); - Assert.assertEquals(times[0], result.lhs.longValue()); - Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + if (!NullHandling.replaceWithDefault()) { + Assert.assertEquals(times[1], result.lhs.longValue()); + Assert.assertEquals(VALUES[1], result.rhs, EPSILON); + } else { + Assert.assertEquals(times[0], result.lhs.longValue()); + Assert.assertEquals(VALUES[0], result.rhs, EPSILON); + } } @Test From 19a4beb99542c925bef4506ad2ab12c27227f9ac Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Tue, 26 Apr 2022 13:52:44 -0700 Subject: [PATCH 4/7] Adding benchmark tests --- .../query/SqlExpressionBenchmark.java | 20 +++++++++++++++---- .../first/DoubleFirstVectorAggregator.java | 2 +- .../first/NumericFirstVectorAggregator.java | 4 ---- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java index 395336ff2ec6..dafc6ca3e70c 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java @@ -205,10 +205,18 @@ public String getFormatString() "SELECT LATEST(long1) FROM foo", // 39: LATEST aggregator double "SELECT LATEST(double4) FROM foo", - // 40: LATEST aggregator double + // 40: LATEST aggregator float "SELECT LATEST(float3) FROM foo", - // 41: LATEST aggregator double - "SELECT LATEST(float3), LATEST(long1), LATEST(double4) FROM foo" + // 41: LATEST aggregator all + "SELECT LATEST(float3), LATEST(long1), LATEST(double4) FROM foo", + // 42: EARLIEST aggregator + "SELECT EARLIEST(long1) FROM foo", + // 43: EARLIEST aggregator double + "SELECT EARLIEST(double4) FROM foo", + // 44: EARLIEST aggregator float + "SELECT EARLIEST(float3) FROM foo", + // 45: EARLIEST aggregator all + "SELECT EARLIEST(float3), EARLIEST(long1), EARLIEST(double4) FROM foo" ); @Param({"5000000"}) @@ -264,7 +272,11 @@ public String getFormatString() "38", "39", "40", - "41" + "41", + "42", + "43", + "44", + "45" }) private String query; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java index 2521b787cdee..40b30c1c0ab3 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java @@ -41,7 +41,7 @@ public DoubleFirstVectorAggregator(VectorValueSelector timeSelector, VectorValue @Override public void initValue(ByteBuffer buf, int position) { - buf.putLong(position, 0); + buf.putDouble(position, 0); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java index 9fdecaad0e74..f5370468f218 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java @@ -34,14 +34,12 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator private final boolean useDefault = NullHandling.replaceWithDefault(); private final VectorValueSelector timeSelector; private long firstTime; - protected boolean rhsNull; public NumericFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) { this.timeSelector = timeSelector; this.valueSelector = valueSelector; firstTime = Long.MAX_VALUE; - rhsNull = !useDefault; } @Override @@ -84,10 +82,8 @@ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) firstTime = earliestTime; if (useDefault || nullValueVector == null || !nullValueVector[index]) { updateTimeWithValue(buf, position, firstTime, index); - rhsNull = false; } else { updateTimeWithNull(buf, position, firstTime); - rhsNull = true; } } } From 602f3458869e90591fca6cf48be4dd1ecd12034c Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Sat, 30 Apr 2022 10:43:11 -0700 Subject: [PATCH 5/7] Updating tests and addressing comments --- .../first/DoubleFirstVectorAggregator.java | 2 +- .../first/FloatFirstVectorAggregator.java | 2 +- .../first/LongFirstVectorAggregator.java | 2 +- .../first/NumericFirstVectorAggregator.java | 13 +++++--- .../DoubleFirstVectorAggregationTest.java | 28 +++++++++++++++- .../FloatFirstVectorAggregationTest.java | 29 ++++++++++++++++- .../first/LongFirstVectorAggregationTest.java | 32 +++++++++++++++++-- .../druid/sql/calcite/CalciteQueryTest.java | 4 +-- 8 files changed, 98 insertions(+), 14 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java index 40b30c1c0ab3..c05a5c162f5f 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregator.java @@ -26,7 +26,7 @@ import java.nio.ByteBuffer; /** - * Vectorized version of on heap 'last' aggregator for column selectors with type LONG.. + * Vectorized version of on heap 'earliest' aggregator for column selectors with type LONG.. */ public class DoubleFirstVectorAggregator extends NumericFirstVectorAggregator { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java index 8b9c7af48e3b..da5edded9488 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregator.java @@ -26,7 +26,7 @@ import java.nio.ByteBuffer; /** - * Vectorized version of on heap 'last' aggregator for column selectors with type LONG.. + * Vectorized version of on heap 'earliest' aggregator for column selectors with type LONG.. */ public class FloatFirstVectorAggregator extends NumericFirstVectorAggregator { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java index 0c999281c516..521228f2be61 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregator.java @@ -26,7 +26,7 @@ import java.nio.ByteBuffer; /** - * Vectorized version of on heap 'last' aggregator for column selectors with type LONG.. + * Vectorized version of on heap 'earliest' aggregator for column selectors with type LONG.. */ public class LongFirstVectorAggregator extends NumericFirstVectorAggregator { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java index f5370468f218..38fee0cc2abd 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java @@ -26,6 +26,9 @@ import javax.annotation.Nullable; import java.nio.ByteBuffer; +/** + * Class for vectorized version of first/earliest aggregator over numeric types + */ public abstract class NumericFirstVectorAggregator implements VectorAggregator { static final int NULL_OFFSET = Long.BYTES; @@ -57,15 +60,15 @@ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) final boolean[] nullValueVector = valueSelector.getNullVector(); boolean nullAbsent = false; firstTime = buf.getLong(position); - //check if nullVector is found or not + // check if nullVector is found or not // the nullVector is null if no null values are found // set the nullAbsent flag accordingly if (nullValueVector == null) { nullAbsent = true; } - //the time vector is already sorted so the first element would be the earliest - //traverse accordingly + // the time vector is already sorted so the first element would be the earliest + // traverse accordingly int index = startRow; if (!useDefault && !nullAbsent) { for (int i = startRow; i < endRow; i++) { @@ -76,7 +79,7 @@ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) } } - //find the first non-null value + // find the first non-null value final long earliestTime = timeVector[index]; if (earliestTime <= firstTime) { firstTime = earliestTime; @@ -128,6 +131,7 @@ public void aggregate( } /** + * Updates the time and the non null values to the appropriate position in buffer * * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the current aggregate value is stored @@ -142,6 +146,7 @@ void updateTimeWithValue(ByteBuffer buf, int position, long time, int index) } /** + *Updates the time only to the appropriate position in buffer as the value is null * * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the current aggregate value is stored diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java index 7bd3c98511a3..1575b33dcef0 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstVectorAggregationTest.java @@ -21,12 +21,16 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Answers; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @@ -42,14 +46,22 @@ public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTes private static final boolean[] NULLS = new boolean[]{false, false, true, false}; private long[] times = {2436, 6879, 7888, 8224}; + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final String TIME_COL = "__time"; + @Mock private VectorValueSelector selector; @Mock - private VectorValueSelector timeSelector; + private BaseLongVectorValueSelector timeSelector; private ByteBuffer buf; private DoubleFirstVectorAggregator target; + private DoubleFirstAggregatorFactory doubleFirstAggregatorFactory; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private VectorColumnSelectorFactory selectorFactory; + @Before public void setup() { @@ -60,6 +72,20 @@ public void setup() Mockito.doReturn(times).when(timeSelector).getLongVector(); target = new DoubleFirstVectorAggregator(timeSelector, selector); clearBufferForPositions(0, 0); + + Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(selector).when(selectorFactory).makeValueSelector(FIELD_NAME); + Mockito.doReturn(timeSelector).when(selectorFactory).makeValueSelector(TIME_COL); + doubleFirstAggregatorFactory = new DoubleFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL); + } + + @Test + public void testFactory() + { + Assert.assertTrue(doubleFirstAggregatorFactory.canVectorize(selectorFactory)); + VectorAggregator vectorAggregator = doubleFirstAggregatorFactory.factorizeVector(selectorFactory); + Assert.assertNotNull(vectorAggregator); + Assert.assertEquals(DoubleFirstVectorAggregator.class, vectorAggregator.getClass()); } @Test diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java index 7f16c5959397..0eb7afe46e65 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java @@ -21,12 +21,16 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Answers; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @@ -42,14 +46,22 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest private static final boolean[] NULLS = new boolean[]{true, false, true, false}; private long[] times = {2436, 6879, 7888, 8224}; + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final String TIME_COL = "__time"; + @Mock private VectorValueSelector selector; @Mock - private VectorValueSelector timeSelector; + private BaseLongVectorValueSelector timeSelector; private ByteBuffer buf; private FloatFirstVectorAggregator target; + private FloatFirstAggregatorFactory floatFirstAggregatorFactory; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private VectorColumnSelectorFactory selectorFactory; + @Before public void setup() { @@ -60,6 +72,21 @@ public void setup() Mockito.doReturn(times).when(timeSelector).getLongVector(); target = new FloatFirstVectorAggregator(timeSelector, selector); clearBufferForPositions(0, 0); + + Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(selector).when(selectorFactory).makeValueSelector(FIELD_NAME); + Mockito.doReturn(timeSelector).when(selectorFactory).makeValueSelector(TIME_COL); + floatFirstAggregatorFactory = new FloatFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL); + + } + + @Test + public void testFactory() + { + Assert.assertTrue(floatFirstAggregatorFactory.canVectorize(selectorFactory)); + VectorAggregator vectorAggregator = floatFirstAggregatorFactory.factorizeVector(selectorFactory); + Assert.assertNotNull(vectorAggregator); + Assert.assertEquals(FloatFirstVectorAggregator.class, vectorAggregator.getClass()); } @Test diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java index 387eaa597abf..5f2072ef5b25 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstVectorAggregationTest.java @@ -21,12 +21,16 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Answers; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @@ -34,22 +38,28 @@ import java.nio.ByteBuffer; import java.util.concurrent.ThreadLocalRandom; + @RunWith(MockitoJUnitRunner.class) public class LongFirstVectorAggregationTest extends InitializedNullHandlingTest { private static final double EPSILON = 1e-5; private static final long[] VALUES = new long[]{7, 15, 2, 150}; private static final boolean[] NULLS = new boolean[]{false, false, true, false}; + private static final String NAME = "NAME"; + private static final String FIELD_NAME = "FIELD_NAME"; + private static final String TIME_COL = "__time"; private long[] times = {2436, 6879, 7888, 8224}; - @Mock private VectorValueSelector selector; @Mock - private VectorValueSelector timeSelector; + private BaseLongVectorValueSelector timeSelector; private ByteBuffer buf; - private LongFirstVectorAggregator target; + private LongFirstAggregatorFactory longFirstAggregatorFactory; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private VectorColumnSelectorFactory selectorFactory; + @Before public void setup() { @@ -60,6 +70,22 @@ public void setup() Mockito.doReturn(times).when(timeSelector).getLongVector(); target = new LongFirstVectorAggregator(timeSelector, selector); clearBufferForPositions(0, 0); + + + Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME); + Mockito.doReturn(selector).when(selectorFactory).makeValueSelector(FIELD_NAME); + Mockito.doReturn(timeSelector).when(selectorFactory).makeValueSelector(TIME_COL); + longFirstAggregatorFactory = new LongFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL); + + } + + @Test + public void testFactory() + { + Assert.assertTrue(longFirstAggregatorFactory.canVectorize(selectorFactory)); + VectorAggregator vectorAggregator = longFirstAggregatorFactory.factorizeVector(selectorFactory); + Assert.assertNotNull(vectorAggregator); + Assert.assertEquals(LongFirstVectorAggregator.class, vectorAggregator.getClass()); } @Test diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 4af92bf91d8f..2254859e682f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -911,7 +911,7 @@ public void testPrimitiveLatestInSubquery() throws Exception } @Test - public void testPrimitiveLatestInSubqueryGroupBy() throws Exception + public void testLatestOffHeapGroupBy() throws Exception { testQuery( "SELECT dim2, LATEST(m1) AS val1 FROM foo GROUP BY dim2", @@ -994,7 +994,7 @@ public void testPrimitiveEarliestInSubquery() throws Exception } @Test - public void testPrimitiveEarliestInSubqueryGroupBy() throws Exception + public void testOffHeapEarliestGroupBy() throws Exception { testQuery( "SELECT dim2, EARLIEST(m1) AS val1 FROM foo GROUP BY dim2", From 04277a166d1bb06c4a58470982dd858ed3965d43 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Mon, 2 May 2022 08:49:28 -0700 Subject: [PATCH 6/7] Updating first agg --- .../query/aggregation/first/NumericFirstVectorAggregator.java | 4 ++-- .../druid/query/timeseries/TimeseriesQueryRunnerTest.java | 3 --- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java index 38fee0cc2abd..6f75b20d0461 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java @@ -81,7 +81,7 @@ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) // find the first non-null value final long earliestTime = timeVector[index]; - if (earliestTime <= firstTime) { + if (earliestTime < firstTime) { firstTime = earliestTime; if (useDefault || nullValueVector == null || !nullValueVector[index]) { updateTimeWithValue(buf, position, firstTime, index); @@ -120,7 +120,7 @@ public void aggregate( int position = positions[i] + positionOffset; int row = rows == null ? i : rows[i]; long firstTime = buf.getLong(position); - if (timeVector[row] <= firstTime) { + if (timeVector[row] < firstTime) { if (useDefault || nulls == null || !nulls[row]) { updateTimeWithValue(buf, position, timeVector[row], row); } else { diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java index 05848a5e4dd9..fba5f13fc908 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -1948,9 +1948,6 @@ public void testTimeseriesWithMultiValueFilteringJavascriptAggregatorAndAlsoRegu @Test public void testTimeseriesWithFirstLastAggregator() { - // Cannot vectorize due to "doubleFirst", "doubleLast" aggregators. - cannotVectorize(); - TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.DATA_SOURCE) .granularity(QueryRunnerTestHelper.MONTH_GRAN) From 366a92a353a6d299ab018e9c46d04feba8fea945 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Mon, 2 May 2022 09:48:22 -0700 Subject: [PATCH 7/7] Fixing testcases which were non vectorized before but now needs to change --- .../apache/druid/query/groupby/GroupByQueryRunnerTest.java | 7 ------- .../druid/query/timeseries/TimeseriesQueryRunnerTest.java | 3 --- 2 files changed, 10 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index bd8e4905aeb8..90b75d40706c 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -3437,8 +3437,6 @@ public void testGroupByWithCardinality() @Test public void testGroupByWithFirstLast() { - // Cannot vectorize due to "first", "last" aggregators. - cannotVectorize(); GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) @@ -3527,8 +3525,6 @@ public void testGroupByWithFirstLast() @Test public void testGroupByWithNoResult() { - // Cannot vectorize due to first, last aggregators. - cannotVectorize(); GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) @@ -7226,9 +7222,6 @@ public void testSubqueryWithHyperUniquesPostAggregator() @Test public void testSubqueryWithFirstLast() { - // Cannot vectorize due to "first", "last" aggregators. - cannotVectorize(); - GroupByQuery subquery = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setQuerySegmentSpec(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java index fba5f13fc908..ad0b35a3dddc 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -167,9 +167,6 @@ public TimeseriesQueryRunnerTest( @Test public void testEmptyTimeseries() { - // Cannot vectorize due to "doubleFirst" aggregator. - cannotVectorize(); - TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.DATA_SOURCE) .granularity(QueryRunnerTestHelper.ALL_GRAN)