From 14842a06afb5060b7542caa9f94e49860b2ac998 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 9 Aug 2017 22:02:25 +0900 Subject: [PATCH 1/4] Add IntGrouper --- .../epinephelinae/BufferArrayGrouper.java | 7 +- .../epinephelinae/GroupByQueryEngineV2.java | 16 ++++- .../groupby/epinephelinae/IntGrouper.java | 70 +++++++++++++++++++ 3 files changed, 86 insertions(+), 7 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/groupby/epinephelinae/IntGrouper.java diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index a17761090c14..87355603ee56 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -32,7 +32,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.function.ToIntFunction; /** * A buffer grouper for array-based aggregation. This grouper stores aggregated values in the buffer using the grouping @@ -48,7 +47,7 @@ * different segments cannot be currently retrieved, this grouper can be used only when performing per-segment query * execution. */ -public class BufferArrayGrouper implements Grouper +public class BufferArrayGrouper implements IntGrouper { private static final Logger LOG = new Logger(BufferArrayGrouper.class); @@ -137,7 +136,7 @@ public boolean isInitialized() } @Override - public AggregateResult aggregate(Integer key, int dimIndex) + public AggregateResult aggregate(int key, int dimIndex) { Preconditions.checkArgument( dimIndex >= 0 && dimIndex < cardinalityWithMissingValue, @@ -209,7 +208,7 @@ public void reset() } @Override - public ToIntFunction hashFunction() + public IntGrouperHashFunction hashFunction() { return key -> key + 1; } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 601886a90bc1..30f598684a61 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -582,7 +582,7 @@ public ArrayAggregateIterator( } @Override - protected Grouper newGrouper() + protected IntGrouper newGrouper() { return new BufferArrayGrouper( Suppliers.ofInstance(buffer), @@ -595,6 +595,17 @@ protected Grouper newGrouper() @Override protected void aggregateSingleValueDims(Grouper grouper) + { + aggregateSingleValueDims((IntGrouper) grouper); + } + + @Override + protected void aggregateMultiValueDims(Grouper grouper) + { + aggregateMultiValueDims((IntGrouper) grouper); + } + + private void aggregateSingleValueDims(IntGrouper grouper) { while (!cursor.isDone()) { final int key; @@ -612,8 +623,7 @@ protected void aggregateSingleValueDims(Grouper grouper) } } - @Override - protected void aggregateMultiValueDims(Grouper grouper) + private void aggregateMultiValueDims(IntGrouper grouper) { if (dim == null) { throw new ISE("dim must exist"); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/IntGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/IntGrouper.java new file mode 100644 index 000000000000..81b23e3f5134 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/IntGrouper.java @@ -0,0 +1,70 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby.epinephelinae; + +import java.util.function.ToIntFunction; + +/** + * {@link Grouper} specialized for the primitive int type + */ +public interface IntGrouper extends Grouper +{ + default AggregateResult aggregate(int key) + { + return aggregate(key, hashFunction().apply(key)); + } + + /** + * {@inheritDoc} + */ + @Override + default AggregateResult aggregate(Integer key) + { + return aggregate(key.intValue()); + } + + AggregateResult aggregate(int key, int keyHash); + + /** + * {@inheritDoc} + */ + @Override + default AggregateResult aggregate(Integer key, int keyHash) + { + return aggregate(key.intValue(), keyHash); + } + + @Override + default IntGrouperHashFunction hashFunction() + { + return Integer::hashCode; + } + + interface IntGrouperHashFunction extends ToIntFunction + { + @Override + default int applyAsInt(Integer value) + { + return apply(value); + } + + int apply(int value); + } +} From ff8760b5ad0d1e0d5aba112e280c2fecf894ebf0 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 9 Aug 2017 23:18:18 +0900 Subject: [PATCH 2/4] Fix build --- .../druid/query/groupby/epinephelinae/BufferArrayGrouper.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index 87355603ee56..eb5e4d194651 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -144,8 +144,6 @@ public AggregateResult aggregate(int key, int dimIndex) dimIndex ); - Preconditions.checkNotNull(key); - final int recordOffset = dimIndex * recordSize; if (recordOffset + recordSize > valBuffer.capacity()) { From 8f04b744a7ec278e5556d64402af371e1e1caff8 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 10 Aug 2017 10:57:20 +0900 Subject: [PATCH 3/4] Address comments --- .../groupby/epinephelinae/IntGrouper.java | 21 ++++++++++++------- .../epinephelinae/BufferArrayGrouperTest.java | 2 +- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/IntGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/IntGrouper.java index 81b23e3f5134..6ee8d3cd03d1 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/IntGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/IntGrouper.java @@ -19,6 +19,8 @@ package io.druid.query.groupby.epinephelinae; +import com.google.common.base.Preconditions; + import java.util.function.ToIntFunction; /** @@ -31,38 +33,43 @@ default AggregateResult aggregate(int key) return aggregate(key, hashFunction().apply(key)); } + AggregateResult aggregate(int key, int keyHash); + /** * {@inheritDoc} + * + * @deprecated Please use {@link #aggregate(int)} instead. */ + @Deprecated @Override default AggregateResult aggregate(Integer key) { + Preconditions.checkNotNull(key); return aggregate(key.intValue()); } - AggregateResult aggregate(int key, int keyHash); - /** * {@inheritDoc} + * + * @deprecated Please use {@link #aggregate(int, int)} instead. */ + @Deprecated @Override default AggregateResult aggregate(Integer key, int keyHash) { + Preconditions.checkNotNull(key); return aggregate(key.intValue(), keyHash); } @Override - default IntGrouperHashFunction hashFunction() - { - return Integer::hashCode; - } + IntGrouperHashFunction hashFunction(); interface IntGrouperHashFunction extends ToIntFunction { @Override default int applyAsInt(Integer value) { - return apply(value); + return apply(value.intValue()); } int apply(int value); diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouperTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouperTest.java index 83a16a7a45f5..6ca584f2b8d3 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouperTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/BufferArrayGrouperTest.java @@ -43,7 +43,7 @@ public class BufferArrayGrouperTest public void testAggregate() { final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory(); - final Grouper grouper = newGrouper(columnSelectorFactory, 1024); + final IntGrouper grouper = newGrouper(columnSelectorFactory, 1024); columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 10L))); grouper.aggregate(12); From ce5fc355d908b7af9759d1193cd472b61a3df06f Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 10 Aug 2017 20:27:16 +0900 Subject: [PATCH 4/4] Add a benchmark query --- .../benchmark/query/GroupByBenchmark.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index 7720cb1358b8..01bb6c865b41 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -266,6 +266,29 @@ private void setupQueries() basicQueries.put("filter", queryA); } + + { // basic.singleZipf + final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList(basicSchema.getDataInterval()) + ); + // Use multiple aggregators to see how the number of aggregators impact to the query performance + List queryAggs = ImmutableList.of( + new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"), + new LongSumAggregatorFactory("rows", "rows"), + new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal"), + new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf") + ); + GroupByQuery queryA = GroupByQuery + .builder() + .setDataSource("blah") + .setQuerySegmentSpec(intervalSpec) + .setDimensions(ImmutableList.of(new DefaultDimensionSpec("dimZipf", null))) + .setAggregatorSpecs(queryAggs) + .setGranularity(Granularity.fromString(queryGranularity)) + .build(); + + basicQueries.put("singleZipf", queryA); + } SCHEMA_QUERY_MAP.put("basic", basicQueries); // simple one column schema, for testing performance difference between querying on numeric values as Strings and