diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java index 3f28a18596ae..36a549ff0dea 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -73,7 +73,7 @@ public static void writePair( if (pair.rhs != null) { mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.limit(maxStringBytes); + mutationBuffer.limit(position + Long.BYTES + Integer.BYTES + maxStringBytes); final int len = StringUtils.toUtf8WithLimit(pair.rhs, mutationBuffer); mutationBuffer.putInt(position + Long.BYTES, len); } else { diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java new file mode 100644 index 000000000000..b4e4088535cb --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class StringFirstLastUtilsTest +{ + private static final SerializablePairLongString PAIR_TO_WRITE = new SerializablePairLongString( + DateTimes.MAX.getMillis(), + "asdasddsaasd" + ); + + private static final int BUFFER_CAPACITY = 100; + // PAIR_TO_WRITE Size is 12 so MAX_BYTE_TO_WRITE is set to 15 which is more than enough + private static final int MAX_BYTE_TO_WRITE = 15; + + @Test + public void testWritePairThenReadPairAtBeginningBuffer() + { + int positionAtBeginning = 0; + ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY); + StringFirstLastUtils.writePair(buf, positionAtBeginning, PAIR_TO_WRITE, MAX_BYTE_TO_WRITE); + SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, positionAtBeginning); + Assert.assertEquals(PAIR_TO_WRITE, actual); + } + + @Test + public void testWritePairThenReadPairAtMiddleBuffer() + { + int positionAtMiddle = 60; + ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY); + StringFirstLastUtils.writePair(buf, positionAtMiddle, PAIR_TO_WRITE, MAX_BYTE_TO_WRITE); + SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, positionAtMiddle); + Assert.assertEquals(PAIR_TO_WRITE, actual); + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 899ffa289751..208db301e077 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -48,11 +48,13 @@ import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory; import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory; import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory; import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; @@ -1413,14 +1415,15 @@ public void testAnyAggregatorsOffHeapNumericNulls() throws Exception ); } + // This test the off-heap (buffer) version of the LatestAggregator (Double/Float/Long) @Test - public void testLatestInSubquery() throws Exception + public void testPrimitiveLatestInSubquery() throws Exception { // Cannot vectorize LATEST aggregator. skipVectorize(); testQuery( - "SELECT SUM(val) FROM (SELECT dim2, LATEST(m1) AS val FROM foo GROUP BY dim2)", + "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, LATEST(m1) AS val1, LATEST(cnt) AS val2, LATEST(m2) AS val3 FROM foo GROUP BY dim2)", ImmutableList.of( GroupByQuery.builder() .setDataSource( @@ -1429,7 +1432,103 @@ public void testLatestInSubquery() throws Exception .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) - .setAggregatorSpecs(aggregators(new FloatLastAggregatorFactory("a0:a", "m1"))) + .setAggregatorSpecs(aggregators( + new FloatLastAggregatorFactory("a0:a", "m1"), + new LongLastAggregatorFactory("a1:a", "cnt"), + new DoubleLastAggregatorFactory("a2:a", "m2")) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new FinalizingFieldAccessPostAggregator("a0", "a0:a"), + new FinalizingFieldAccessPostAggregator("a1", "a1:a"), + new FinalizingFieldAccessPostAggregator("a2", "a2:a") + + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators( + new DoubleSumAggregatorFactory("_a0", "a0"), + new LongSumAggregatorFactory("_a1", "a1"), + new DoubleSumAggregatorFactory("_a2", "a2") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{18.0, 4L, 18.0}) : ImmutableList.of(new Object[]{15.0, 3L, 15.0}) + ); + } + + // This test the off-heap (buffer) version of the EarliestAggregator (Double/Float/Long) + @Test + public void testPrimitiveEarliestInSubquery() throws Exception + { + // Cannot vectorize EARLIEST aggregator. + skipVectorize(); + + testQuery( + "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo GROUP BY dim2)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators( + new FloatFirstAggregatorFactory("a0:a", "m1"), + new LongFirstAggregatorFactory("a1:a", "cnt"), + new DoubleFirstAggregatorFactory("a2:a", "m2")) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new FinalizingFieldAccessPostAggregator("a0", "a0:a"), + new FinalizingFieldAccessPostAggregator("a1", "a1:a"), + new FinalizingFieldAccessPostAggregator("a2", "a2:a") + + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators( + new DoubleSumAggregatorFactory("_a0", "a0"), + new LongSumAggregatorFactory("_a1", "a1"), + new DoubleSumAggregatorFactory("_a2", "a2") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{11.0, 4L, 11.0}) : ImmutableList.of(new Object[]{8.0, 3L, 8.0}) + ); + } + + // This test the off-heap (buffer) version of the LatestAggregator (String) + @Test + public void testStringLatestInSubquery() throws Exception + { + // Cannot vectorize LATEST aggregator. + skipVectorize(); + + testQuery( + "SELECT SUM(val) FROM (SELECT dim2, LATEST(dim1, 10) AS val FROM foo GROUP BY dim2)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new StringLastAggregatorFactory("a0:a", "dim1", 10))) .setPostAggregatorSpecs( ImmutableList.of( new FinalizingFieldAccessPostAggregator("a0", "a0:a") @@ -1440,12 +1539,50 @@ public void testLatestInSubquery() throws Exception ) .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) - .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", "a0"))) + .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", ExprMacroTable.nil()))) .setContext(QUERY_CONTEXT_DEFAULT) .build() ), ImmutableList.of( - new Object[]{NullHandling.sqlCompatible() ? 18.0 : 15.0} + new Object[]{NullHandling.sqlCompatible() ? 3 : 1.0} + ) + ); + } + + // This test the off-heap (buffer) version of the EarliestAggregator (String) + @Test + public void testStringEarliestInSubquery() throws Exception + { + // Cannot vectorize EARLIEST aggregator. + skipVectorize(); + + testQuery( + "SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1, 10) AS val FROM foo GROUP BY dim2)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory("a0:a", "dim1", 10))) + .setPostAggregatorSpecs( + ImmutableList.of( + new FinalizingFieldAccessPostAggregator("a0", "a0:a") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", ExprMacroTable.nil()))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{NullHandling.sqlCompatible() ? 12.1 : 11.1} ) ); }