From d4583feaef1ad0be3f22ea171bfbfbca7d459b42 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 16 Jan 2020 12:09:16 -0800 Subject: [PATCH 1/5] fix buff limit bug --- .../druid/query/aggregation/first/StringFirstLastUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java index 133c4ba1ffcc..630f70c045f5 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -87,7 +87,7 @@ public static void writePair( if (pair.rhs != null) { mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.limit(maxStringBytes); + mutationBuffer.limit(position + Long.BYTES + Integer.BYTES + maxStringBytes); final int len = StringUtils.toUtf8WithLimit(pair.rhs, mutationBuffer); mutationBuffer.putInt(position + Long.BYTES, len); } else { From c78be5043059121882dd02b1d649553f2ec8b04c Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 16 Jan 2020 13:31:01 -0800 Subject: [PATCH 2/5] add tests --- .../druid/sql/calcite/CalciteQueryTest.java | 147 +++++++++++++++++- 1 file changed, 142 insertions(+), 5 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 895b452e47c7..96bde72cd016 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -44,11 +44,13 @@ import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory; import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; @@ -1297,14 +1299,15 @@ public void testLatestAggregators() throws Exception ); } + // This test the off-heap (buffer) version of the LatestAggregator (Double/Float/Long) @Test - public void testLatestInSubquery() throws Exception + public void testPrimitiveLatestInSubquery() throws Exception { // Cannot vectorize LATEST aggregator. skipVectorize(); testQuery( - "SELECT SUM(val) FROM (SELECT dim2, LATEST(m1) AS val FROM foo GROUP BY dim2)", + "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, LATEST(m1) AS val1, LATEST(cnt) AS val2, LATEST(m2) AS val3 FROM foo GROUP BY dim2)", ImmutableList.of( GroupByQuery.builder() .setDataSource( @@ -1313,7 +1316,141 @@ public void testLatestInSubquery() throws Exception .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) - .setAggregatorSpecs(aggregators(new FloatLastAggregatorFactory("a0:a", "m1"))) + .setAggregatorSpecs(aggregators( + new FloatLastAggregatorFactory("a0:a", "m1"), + new LongLastAggregatorFactory("a1:a", "cnt"), + new DoubleLastAggregatorFactory("a2:a", "m2")) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new FinalizingFieldAccessPostAggregator("a0", "a0:a"), + new FinalizingFieldAccessPostAggregator("a1", "a1:a"), + new FinalizingFieldAccessPostAggregator("a2", "a2:a") + + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators( + new DoubleSumAggregatorFactory("_a0", "a0"), + new LongSumAggregatorFactory("_a1", "a1"), + new DoubleSumAggregatorFactory("_a2", "a2") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{18.0, 4L, 18.0}) : ImmutableList.of(new Object[]{15.0, 3L, 15.0}) + ); + } + + // This test the off-heap (buffer) version of the EarliestAggregator (Double/Float/Long) + @Test + public void testPrimitiveEarliestInSubquery() throws Exception + { + // Cannot vectorize EARLIEST aggregator. + skipVectorize(); + + testQuery( + "SELECT SUM(val1), SUM(val2), SUM(val3) FROM (SELECT dim2, EARLIEST(m1) AS val1, EARLIEST(cnt) AS val2, EARLIEST(m2) AS val3 FROM foo GROUP BY dim2)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators( + new FloatFirstAggregatorFactory("a0:a", "m1"), + new LongFirstAggregatorFactory("a1:a", "cnt"), + new DoubleFirstAggregatorFactory("a2:a", "m2")) + ) + .setPostAggregatorSpecs( + ImmutableList.of( + new FinalizingFieldAccessPostAggregator("a0", "a0:a"), + new FinalizingFieldAccessPostAggregator("a1", "a1:a"), + new FinalizingFieldAccessPostAggregator("a2", "a2:a") + + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators( + new DoubleSumAggregatorFactory("_a0", "a0"), + new LongSumAggregatorFactory("_a1", "a1"), + new DoubleSumAggregatorFactory("_a2", "a2") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() ? ImmutableList.of(new Object[]{11.0, 4L, 11.0}) : ImmutableList.of(new Object[]{8.0, 3L, 8.0}) + ); + } + + // This test the off-heap (buffer) version of the LatestAggregator (String) + @Test + public void testStringLatestInSubquery() throws Exception + { + // Cannot vectorize LATEST aggregator. + skipVectorize(); + + testQuery( + "SELECT SUM(val) FROM (SELECT dim2, LATEST(dim1, 10) AS val FROM foo GROUP BY dim2)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new StringLastAggregatorFactory("a0:a", "dim1", 10))) + .setPostAggregatorSpecs( + ImmutableList.of( + new FinalizingFieldAccessPostAggregator("a0", "a0:a") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", ExprMacroTable.nil()))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{NullHandling.sqlCompatible() ? 3 : 1.0} + ) + ); + } + + // This test the off-heap (buffer) version of the EarliestAggregator (String) + @Test + public void testStringEarliestInSubquery() throws Exception + { + // Cannot vectorize EARLIEST aggregator. + skipVectorize(); + + testQuery( + "SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1, 10) AS val FROM foo GROUP BY dim2)", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory("a0:a", "dim1", 10))) .setPostAggregatorSpecs( ImmutableList.of( new FinalizingFieldAccessPostAggregator("a0", "a0:a") @@ -1324,12 +1461,12 @@ public void testLatestInSubquery() throws Exception ) .setInterval(querySegmentSpec(Filtration.eternity())) .setGranularity(Granularities.ALL) - .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", "a0"))) + .setAggregatorSpecs(aggregators(new DoubleSumAggregatorFactory("_a0", null, "CAST(\"a0\", 'DOUBLE')", ExprMacroTable.nil()))) .setContext(QUERY_CONTEXT_DEFAULT) .build() ), ImmutableList.of( - new Object[]{NullHandling.sqlCompatible() ? 18.0 : 15.0} + new Object[]{NullHandling.sqlCompatible() ? 12.1 : 11.1} ) ); } From ba542f9bbc53f1dafe6c26f6079a54dc84f06f11 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 16 Jan 2020 14:34:02 -0800 Subject: [PATCH 3/5] add test --- .../first/StringFirstLastUtilsTest.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java new file mode 100644 index 000000000000..ccf4743da72e --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.junit.Test; + +public class StringFirstLastUtilsTest +{ + @Test + public void test_writePair + +} From e560d2b21a608e7624f1078b311088419ce3b4c6 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 16 Jan 2020 16:25:25 -0800 Subject: [PATCH 4/5] add tests --- .../first/StringFirstLastUtilsTest.java | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java index ccf4743da72e..2f4f5e515b6f 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java @@ -19,11 +19,40 @@ package org.apache.druid.query.aggregation.first; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.query.aggregation.SerializablePairLongString; import org.junit.Test; +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + public class StringFirstLastUtilsTest { + private static final SerializablePairLongString PAIR_TO_WRITE = new SerializablePairLongString( + DateTimes.MAX.getMillis(), + "asdasddsaasd" + ); + + private static final int BUFFER_CAPACITY = 100; + // PAIR_TO_WRITE Size is 12 so MAX_BYTE_TO_WRITE is set to 15 which is more than enough + private static final int MAX_BYTE_TO_WRITE = 15; + @Test - public void test_writePair + public void testWritePairThenReadPairAtBeginningBuffer() { + int positionAtBeginning = 0; + ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY); + StringFirstLastUtils.writePair(buf, positionAtBeginning, PAIR_TO_WRITE, MAX_BYTE_TO_WRITE); + SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, positionAtBeginning); + assertEquals(PAIR_TO_WRITE, actual); + } + @Test + public void testWritePairThenReadPairAtMiddleBuffer() { + int positionAtMiddle = 60; + ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY); + StringFirstLastUtils.writePair(buf, positionAtMiddle, PAIR_TO_WRITE, MAX_BYTE_TO_WRITE); + SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, positionAtMiddle); + assertEquals(PAIR_TO_WRITE, actual); + } } From a5f6347346bab619c04a0eb2d8e7c8361e02fcaf Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 16 Jan 2020 18:15:34 -0800 Subject: [PATCH 5/5] fix checkstyle --- .../aggregation/first/StringFirstLastUtilsTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java index 2f4f5e515b6f..b4e4088535cb 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstLastUtilsTest.java @@ -21,12 +21,11 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.junit.Assert; import org.junit.Test; import java.nio.ByteBuffer; -import static org.junit.Assert.assertEquals; - public class StringFirstLastUtilsTest { private static final SerializablePairLongString PAIR_TO_WRITE = new SerializablePairLongString( @@ -39,20 +38,22 @@ public class StringFirstLastUtilsTest private static final int MAX_BYTE_TO_WRITE = 15; @Test - public void testWritePairThenReadPairAtBeginningBuffer() { + public void testWritePairThenReadPairAtBeginningBuffer() + { int positionAtBeginning = 0; ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY); StringFirstLastUtils.writePair(buf, positionAtBeginning, PAIR_TO_WRITE, MAX_BYTE_TO_WRITE); SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, positionAtBeginning); - assertEquals(PAIR_TO_WRITE, actual); + Assert.assertEquals(PAIR_TO_WRITE, actual); } @Test - public void testWritePairThenReadPairAtMiddleBuffer() { + public void testWritePairThenReadPairAtMiddleBuffer() + { int positionAtMiddle = 60; ByteBuffer buf = ByteBuffer.allocate(BUFFER_CAPACITY); StringFirstLastUtils.writePair(buf, positionAtMiddle, PAIR_TO_WRITE, MAX_BYTE_TO_WRITE); SerializablePairLongString actual = StringFirstLastUtils.readPair(buf, positionAtMiddle); - assertEquals(PAIR_TO_WRITE, actual); + Assert.assertEquals(PAIR_TO_WRITE, actual); } }