From d6dadb3c2ac60b015119c5405da9349e7b5c9a18 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn <52679095+maytasm3@users.noreply.github.com> Date: Thu, 16 Jan 2020 21:02:37 -0800 Subject: [PATCH] Fix LATEST / EARLIEST Buffer Aggregator does not work on String column (#9197) * fix buff limit bug * add tests * add test * add tests * fix checkstyle --- .../first/StringFirstLastUtils.java | 2 +- .../first/StringFirstLastUtilsTest.java | 59 +++++++ .../druid/sql/calcite/CalciteQueryTest.java | 146 +++++++++++++++++- 3 files changed, 201 insertions(+), 6 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java index 133c4ba1ffcc..630f70c045f5 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -87,7 +87,7 @@ public static void writePair( if (pair.rhs != null) { mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.limit(maxStringBytes); + mutationBuffer.limit(position + Long.BYTES + Integer.BYTES + maxStringBytes); final int len = StringUtils.toUtf8WithLimit(pair.rhs, mutationBuffer); mutationBuffer.putInt(position + Long.BYTES, len); } else { diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java new file mode 100644 index 000000000000..b4e4088535cb --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; + +public class StringFirstLastUtilsTest +{ + private static final SerializablePairLongString PAIR_TO_WRITE = new SerializablePairLongString( + DateTimes.MAX.getMillis(), + "asdasddsaasd" + ); + + private static final int BUFFER_CAPACITY = 100; + // PAIR_TO_WRITE Size is 12 so MAX_BYTE_TO_WRITE is set to 15 which is more than enough + private static final int MAX_BYTE_TO_WRITE = 15; + + @Test + public void testWritePairThenReadPairAtBeginningBuffer() + { + int positionAtBeginning = 0; + ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY); + StringFirstLastUtils.writePair(buf, positionAtBeginning, PAIR_TO_WRITE, MAX_BYTE_TO_WRITE); + SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, positionAtBeginning); + Assert.assertEquals(PAIR_TO_WRITE, actual); + } + + @Test + public void testWritePairThenReadPairAtMiddleBuffer() + { + int positionAtMiddle = 60; + ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY); + StringFirstLastUtils.writePair(buf, positionAtMiddle, PAIR_TO_WRITE, MAX_BYTE_TO_WRITE); + SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, positionAtMiddle); + Assert.assertEquals(PAIR_TO_WRITE, actual); + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 895b452e47c7..8fbb1adeb87a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -44,11 +44,13 @@ import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory; import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; @@ -1298,13 +1300,13 @@ public void testLatestAggregators() throws Exception } @Test - public void testLatestInSubquery() throws Exception + public void testPrimitiveLatestInSubquery() throws Exception { // Cannot vectorize LATEST aggregator. skipVectorize(); testQuery( - "SELECT SUM(val) FROM (SELECT dim2, LATEST(m1) AS val FROM foo GROUP BY dim2)", + "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, LATEST(m1) AS val1, LATEST(cnt) AS val2, LATEST(m2) AS val3 FROM foo GROUP BY dim2)", ImmutableList.of( GroupByQuery.builder() .setDataSource( @@ -1313,7 +1315,141 @@ public void testLatestInSubquery() throws Exception .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) - .setAggregatorSpecs(aggregators(new FloatLastAggregatorFactory("a0:a", "m1"))) + .setAggregatorSpecs(aggregators( + new FloatLastAggregatorFactory("a0:a", "m1"), + new LongLastAggregatorFactory("a1:a", "cnt"), + new DoubleLastAggregatorFactory("a2:a", "m2")) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new FinalizingFieldAccessPostAggregator("a0", "a0:a"), + new FinalizingFieldAccessPostAggregator("a1", "a1:a"), + new FinalizingFieldAccessPostAggregator("a2", "a2:a") + + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators( + new DoubleSumAggregatorFactory("_a0", "a0"), + new LongSumAggregatorFactory("_a1", "a1"), + new DoubleSumAggregatorFactory("_a2", "a2") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{18.0, 4L, 18.0}) : ImmutableList.of(new Object[]{15.0, 3L, 15.0}) + ); + } + + // This test the off-heap (buffer) version of the EarliestAggregator (Double/Float/Long) + @Test + public void testPrimitiveEarliestInSubquery() throws Exception + { + // Cannot vectorize EARLIEST aggregator. + skipVectorize(); + + testQuery( + "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo GROUP BY dim2)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators( + new FloatFirstAggregatorFactory("a0:a", "m1"), + new LongFirstAggregatorFactory("a1:a", "cnt"), + new DoubleFirstAggregatorFactory("a2:a", "m2")) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new FinalizingFieldAccessPostAggregator("a0", "a0:a"), + new FinalizingFieldAccessPostAggregator("a1", "a1:a"), + new FinalizingFieldAccessPostAggregator("a2", "a2:a") + + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators( + new DoubleSumAggregatorFactory("_a0", "a0"), + new LongSumAggregatorFactory("_a1", "a1"), + new DoubleSumAggregatorFactory("_a2", "a2") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{11.0, 4L, 11.0}) : ImmutableList.of(new Object[]{8.0, 3L, 8.0}) + ); + } + + // This test the off-heap (buffer) version of the LatestAggregator (String) + @Test + public void testStringLatestInSubquery() throws Exception + { + // Cannot vectorize LATEST aggregator. + skipVectorize(); + + testQuery( + "SELECT SUM(val) FROM (SELECT dim2, LATEST(dim1, 10) AS val FROM foo GROUP BY dim2)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new StringLastAggregatorFactory("a0:a", "dim1", 10))) + .setPostAggregatorSpecs( + ImmutableList.of( + new FinalizingFieldAccessPostAggregator("a0", "a0:a") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", ExprMacroTable.nil()))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{NullHandling.sqlCompatible() ? 3 : 1.0} + ) + ); + } + + // This test the off-heap (buffer) version of the EarliestAggregator (String) + @Test + public void testStringEarliestInSubquery() throws Exception + { + // Cannot vectorize EARLIEST aggregator. + skipVectorize(); + + testQuery( + "SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1, 10) AS val FROM foo GROUP BY dim2)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory("a0:a", "dim1", 10))) .setPostAggregatorSpecs( ImmutableList.of( new FinalizingFieldAccessPostAggregator("a0", "a0:a") @@ -1324,12 +1460,12 @@ public void testLatestInSubquery() throws Exception ) .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) - .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", "a0"))) + .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", ExprMacroTable.nil()))) .setContext(QUERY_CONTEXT_DEFAULT) .build() ), ImmutableList.of( - new Object[]{NullHandling.sqlCompatible() ? 18.0 : 15.0} + new Object[]{NullHandling.sqlCompatible() ? 12.1 : 11.1} ) ); }