From f5d563439b49587196c896348b00e51abf95bace Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sun, 4 Dec 2016 16:58:10 -0800 Subject: [PATCH 1/2] GroupByBenchmark: Add serde, spilling, all-gran benchmarks. Also use more iterations. --- .../benchmark/query/GroupByBenchmark.java | 104 ++++++++++++++---- .../query/SerializingQueryRunner.java | 71 ++++++++++++ 2 files changed, 152 insertions(+), 23 deletions(-) create mode 100644 benchmarks/src/main/java/io/druid/benchmark/query/SerializingQueryRunner.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index a520ff338d17..c50b01ea3653 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -24,11 +24,11 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.hash.Hashing; import com.google.common.io.Files; - import io.druid.benchmark.datagen.BenchmarkDataGenerator; import io.druid.benchmark.datagen.BenchmarkSchemaInfo; import io.druid.benchmark.datagen.BenchmarkSchemas; @@ -39,6 +39,7 @@ import io.druid.data.input.Row; import io.druid.data.input.impl.DimensionsSpec; import io.druid.granularity.QueryGranularities; +import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -105,14 +106,14 @@ @State(Scope.Benchmark) @Fork(jvmArgsPrepend = "-server", value = 1) -@Warmup(iterations = 10) -@Measurement(iterations = 25) +@Warmup(iterations = 15) +@Measurement(iterations = 30) public class GroupByBenchmark { @Param({"4"}) private int numSegments; - @Param({"4"}) + @Param({"2", "4"}) private int numProcessingThreads; @Param({"-1"}) @@ -127,6 +128,9 @@ public class GroupByBenchmark @Param({"v1", "v2"}) private String defaultStrategy; + @Param({"all", "day"}) + private String queryGranularity; + private static final Logger log = new Logger(GroupByBenchmark.class); private static final int RNG_SEED = 9999; private static final IndexMergerV9 INDEX_MERGER_V9; @@ -137,7 +141,7 @@ public class GroupByBenchmark private IncrementalIndex anIncrementalIndex; private List queryableIndexes; - private QueryRunnerFactory factory; + private QueryRunnerFactory factory; private BenchmarkSchemaInfo schemaInfo; private GroupByQuery query; @@ -190,7 +194,7 @@ private void setupQueries() .setAggregatorSpecs( queryAggs ) - .setGranularity(QueryGranularities.DAY) + .setGranularity(QueryGranularity.fromString(queryGranularity)) .build(); basicQueries.put("A", queryA); @@ -335,7 +339,7 @@ public int getBufferGrouperInitialBuckets() @Override public long getMaxOnDiskStorage() { - return 0L; + return 1_000_000_000L; } }; config.setSingleThreaded(false); @@ -475,29 +479,68 @@ public void querySingleQueryableIndex(Blackhole blackhole) throws Exception } } - @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception { - List> singleSegmentRunners = Lists.newArrayList(); - QueryToolChest toolChest = factory.getToolchest(); - for (int i = 0; i < numSegments; i++) { - String segmentName = "qIndex" + i; - QueryRunner runner = QueryBenchmarkUtil.makeQueryRunner( - factory, - segmentName, - new QueryableIndexSegment(segmentName, queryableIndexes.get(i)) - ); - singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner)); + QueryToolChest toolChest = factory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults( + factory.mergeRunners(executorService, makeMultiRunners()) + ), + (QueryToolChest) toolChest + ); + + Sequence queryResult = theRunner.run(query, Maps.newHashMap()); + List results = Sequences.toList(queryResult, Lists.newArrayList()); + + for (Row result : results) { + blackhole.consume(result); } + } - QueryRunner theRunner = toolChest.postMergeQueryDecoration( - new FinalizeResultsQueryRunner<>( - toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)), - toolChest - ) + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryMultiQueryableIndexWithSpilling(Blackhole blackhole) throws Exception + { + QueryToolChest toolChest = factory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults( + factory.mergeRunners(executorService, makeMultiRunners()) + ), + (QueryToolChest) toolChest + ); + + final GroupByQuery spillingQuery = query.withOverriddenContext( + ImmutableMap.of("bufferGrouperMaxSize", 4000) + ); + Sequence queryResult = theRunner.run(spillingQuery, Maps.newHashMap()); + List results = Sequences.toList(queryResult, Lists.newArrayList()); + + for (Row result : results) { + blackhole.consume(result); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void queryMultiQueryableIndexWithSerde(Blackhole blackhole) throws Exception + { + QueryToolChest toolChest = factory.getToolchest(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults( + new SerializingQueryRunner<>( + new DefaultObjectMapper(new SmileFactory()), + Row.class, + toolChest.mergeResults( + factory.mergeRunners(executorService, makeMultiRunners()) + ) + ) + ), + (QueryToolChest) toolChest ); Sequence queryResult = theRunner.run(query, Maps.newHashMap()); @@ -507,4 +550,19 @@ public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception blackhole.consume(result); } } + + private List> makeMultiRunners() + { + List> runners = Lists.newArrayList(); + for (int i = 0; i < numSegments; i++) { + String segmentName = "qIndex" + i; + QueryRunner runner = QueryBenchmarkUtil.makeQueryRunner( + factory, + segmentName, + new QueryableIndexSegment(segmentName, queryableIndexes.get(i)) + ); + runners.add(factory.getToolchest().preMergeQueryDecoration(runner)); + } + return runners; + } } diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SerializingQueryRunner.java b/benchmarks/src/main/java/io/druid/benchmark/query/SerializingQueryRunner.java new file mode 100644 index 000000000000..256559685524 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SerializingQueryRunner.java @@ -0,0 +1,71 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.query.Query; +import io.druid.query.QueryRunner; + +import java.util.Map; + +public class SerializingQueryRunner implements QueryRunner +{ + private final ObjectMapper smileMapper; + private final QueryRunner baseRunner; + private final Class clazz; + + public SerializingQueryRunner( + ObjectMapper smileMapper, + Class clazz, + QueryRunner baseRunner + ) + { + this.smileMapper = smileMapper; + this.clazz = clazz; + this.baseRunner = baseRunner; + } + + @Override + public Sequence run( + final Query query, + final Map responseContext + ) + { + return Sequences.map( + baseRunner.run(query, responseContext), + new Function() + { + @Override + public T apply(T input) + { + try { + return smileMapper.readValue(smileMapper.writeValueAsBytes(input), clazz); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + ); + } +} From f5175c2e13eda513187ce6304762e6b3ba8f02d3 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 5 Dec 2016 09:07:15 -0800 Subject: [PATCH 2/2] groupBy v2: Ignore timestamp completely when granularity = all, except for the final merge. Specifically: - Remove timestamp from RowBasedKey when not needed - Set timestamp to null in MapBasedRows that are not part of the final merge. --- .../io/druid/query/groupby/GroupByQuery.java | 18 +- .../groupby/epinephelinae/BufferGrouper.java | 14 +- .../epinephelinae/RowBasedGrouperHelper.java | 237 ++++++++++-------- .../groupby/strategy/GroupByStrategyV2.java | 14 +- 4 files changed, 170 insertions(+), 113 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 58634ed15ba4..7df97a4c56e5 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -32,6 +32,7 @@ import com.google.common.collect.Sets; import com.google.common.primitives.Longs; import io.druid.data.input.Row; +import io.druid.granularity.QueryGranularities; import io.druid.granularity.QueryGranularity; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; @@ -285,7 +286,18 @@ public Ordering getRowOrdering(final boolean granular) final Comparator timeComparator = getTimeComparator(granular); - if (sortByDimsFirst) { + if (timeComparator == null) { + return Ordering.from( + new Comparator() + { + @Override + public int compare(Row lhs, Row rhs) + { + return compareDims(dimensions, lhs, rhs); + } + } + ); + } else if (sortByDimsFirst) { return Ordering.from( new Comparator() { @@ -323,7 +335,9 @@ public int compare(Row lhs, Row rhs) private Comparator getTimeComparator(boolean granular) { - if (granular) { + if (QueryGranularities.ALL.equals(granularity)) { + return null; + } else if (granular) { return new Comparator() { @Override diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java index 4a8d04c2ce5b..a2dcd4ed815b 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferGrouper.java @@ -19,7 +19,6 @@ package io.druid.query.groupby.epinephelinae; -import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; @@ -135,12 +134,13 @@ public boolean aggregate(KeyType key, int keyHash) return false; } - Preconditions.checkArgument( - keyBuffer.remaining() == keySize, - "keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!", - keyBuffer.remaining(), - keySize - ); + if (keyBuffer.remaining() != keySize) { + throw new IAE( + "keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!", + keyBuffer.remaining(), + keySize + ); + } int bucket = findBucket( tableBuffer, diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 7d15315cef82..33f014662fdb 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -20,7 +20,7 @@ package io.druid.query.groupby.epinephelinae; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -86,9 +86,9 @@ public static Pair, Accumulator, Row>> Preconditions.checkArgument(concurrencyHint >= 1 || concurrencyHint == -1, "invalid concurrencyHint"); final GroupByQueryConfig querySpecificConfig = config.withOverrides(query); - final DateTime fudgeTimestamp = GroupByStrategyV2.getUniversalTimestamp(query); + final boolean includeTimestamp = GroupByStrategyV2.getUniversalTimestamp(query) == null; final Grouper.KeySerdeFactory keySerdeFactory = new RowBasedKeySerdeFactory( - fudgeTimestamp, + includeTimestamp, query.getContextSortByDimsFirst(), query.getDimensions().size(), querySpecificConfig.getMaxMergingDictionarySize() / (concurrencyHint == -1 ? 1 : concurrencyHint) @@ -150,29 +150,45 @@ public Grouper accumulate( return null; } - long timestamp = row.getTimestampFromEpoch(); - if (isInputRaw) { - if (query.getGranularity() instanceof AllGranularity) { - timestamp = query.getIntervals().get(0).getStartMillis(); + + columnSelectorFactory.setRow(row); + + final int dimStart; + final Comparable[] key; + + if (includeTimestamp) { + key = new Comparable[query.getDimensions().size() + 1]; + + final long timestamp; + if (isInputRaw) { + if (query.getGranularity() instanceof AllGranularity) { + timestamp = query.getIntervals().get(0).getStartMillis(); + } else { + timestamp = query.getGranularity().truncate(row.getTimestampFromEpoch()); + } } else { - timestamp = query.getGranularity().truncate(timestamp); + timestamp = row.getTimestampFromEpoch(); } + + key[0] = timestamp; + dimStart = 1; + } else { + key = new Comparable[query.getDimensions().size()]; + dimStart = 0; } - columnSelectorFactory.setRow(row); - final String[] dimensions = new String[query.getDimensions().size()]; - for (int i = 0; i < dimensions.length; i++) { + for (int i = dimStart; i < key.length; i++) { final String value; if (isInputRaw) { - IndexedInts index = dimensionSelectors[i].getRow(); - value = index.size() == 0 ? "" : dimensionSelectors[i].lookupName(index.get(0)); + IndexedInts index = dimensionSelectors[i - dimStart].getRow(); + value = index.size() == 0 ? "" : dimensionSelectors[i - dimStart].lookupName(index.get(0)); } else { - value = (String) row.getRaw(query.getDimensions().get(i).getOutputName()); + value = (String) row.getRaw(query.getDimensions().get(i - dimStart).getOutputName()); } - dimensions[i] = Strings.nullToEmpty(value); + key[i] = Strings.nullToEmpty(value); } - final boolean didAggregate = theGrouper.aggregate(new RowBasedKey(timestamp, dimensions)); + final boolean didAggregate = theGrouper.aggregate(new RowBasedKey(key)); if (!didAggregate) { // null return means grouping resources were exhausted. return null; @@ -192,6 +208,8 @@ public static CloseableGrouperIterator makeGrouperIterator( final Closeable closeable ) { + final boolean includeTimestamp = GroupByStrategyV2.getUniversalTimestamp(query) == null; + return new CloseableGrouperIterator<>( grouper, true, @@ -202,11 +220,23 @@ public Row apply(Grouper.Entry entry) { Map theMap = Maps.newLinkedHashMap(); + // Get timestamp, maybe. + final DateTime timestamp; + final int dimStart; + + if (includeTimestamp) { + timestamp = query.getGranularity().toDateTime(((long) (entry.getKey().getKey()[0]))); + dimStart = 1; + } else { + timestamp = null; + dimStart = 0; + } + // Add dimensions. - for (int i = 0; i < entry.getKey().getDimensions().length; i++) { + for (int i = dimStart; i < entry.getKey().getKey().length; i++) { theMap.put( - query.getDimensions().get(i).getOutputName(), - Strings.emptyToNull(entry.getKey().getDimensions()[i]) + query.getDimensions().get(i - dimStart).getOutputName(), + Strings.emptyToNull((String) entry.getKey().getKey()[i]) ); } @@ -215,10 +245,7 @@ public Row apply(Grouper.Entry entry) theMap.put(query.getAggregatorSpecs().get(i).getName(), entry.getValues()[i]); } - return new MapBasedRow( - query.getGranularity().toDateTime(entry.getKey().getTimestamp()), - theMap - ); + return new MapBasedRow(timestamp, theMap); } }, closeable @@ -227,30 +254,28 @@ public Row apply(Grouper.Entry entry) static class RowBasedKey { - private final long timestamp; - private final String[] dimensions; + private final Object[] key; - @JsonCreator - public RowBasedKey( - // Using short key names to reduce serialized size when spilling to disk. - @JsonProperty("t") long timestamp, - @JsonProperty("d") String[] dimensions - ) + RowBasedKey(final Object[] key) { - this.timestamp = timestamp; - this.dimensions = dimensions; + this.key = key; } - @JsonProperty("t") - public long getTimestamp() + @JsonCreator + public static RowBasedKey fromJsonArray(final Object[] key) { - return timestamp; + // Type info is lost during serde. We know we don't want ints as timestamps, so adjust. + if (key.length > 0 && key[0] instanceof Integer) { + key[0] = ((Integer) key[0]).longValue(); + } + + return new RowBasedKey(key); } - @JsonProperty("d") - public String[] getDimensions() + @JsonValue + public Object[] getKey() { - return dimensions; + return key; } @Override @@ -265,42 +290,32 @@ public boolean equals(Object o) RowBasedKey that = (RowBasedKey) o; - if (timestamp != that.timestamp) { - return false; - } - // Probably incorrect - comparing Object[] arrays with Arrays.equals - return Arrays.equals(dimensions, that.dimensions); - + return Arrays.equals(key, that.key); } @Override public int hashCode() { - int result = (int) (timestamp ^ (timestamp >>> 32)); - result = 31 * result + Arrays.hashCode(dimensions); - return result; + return Arrays.hashCode(key); } @Override public String toString() { - return "RowBasedKey{" + - "timestamp=" + timestamp + - ", dimensions=" + Arrays.toString(dimensions) + - '}'; + return Arrays.toString(key); } } private static class RowBasedKeySerdeFactory implements Grouper.KeySerdeFactory { - private final DateTime fudgeTimestamp; + private final boolean includeTimestamp; private final boolean sortByDimsFirst; private final int dimCount; private final long maxDictionarySize; - public RowBasedKeySerdeFactory(DateTime fudgeTimestamp, boolean sortByDimsFirst, int dimCount, long maxDictionarySize) + RowBasedKeySerdeFactory(boolean includeTimestamp, boolean sortByDimsFirst, int dimCount, long maxDictionarySize) { - this.fudgeTimestamp = fudgeTimestamp; + this.includeTimestamp = includeTimestamp; this.sortByDimsFirst = sortByDimsFirst; this.dimCount = dimCount; this.maxDictionarySize = maxDictionarySize; @@ -309,52 +324,59 @@ public RowBasedKeySerdeFactory(DateTime fudgeTimestamp, boolean sortByDimsFirst, @Override public Grouper.KeySerde factorize() { - return new RowBasedKeySerde(fudgeTimestamp, sortByDimsFirst, dimCount, maxDictionarySize); + return new RowBasedKeySerde(includeTimestamp, sortByDimsFirst, dimCount, maxDictionarySize); } @Override public Comparator objectComparator() { - if (sortByDimsFirst) { - return new Comparator() - { - @Override - public int compare( - RowBasedKey row1, RowBasedKey row2 - ) + if (includeTimestamp) { + if (sortByDimsFirst) { + return new Comparator() { - final int cmp = compareDimsInRows(row1, row2); - if (cmp != 0) { - return cmp; + @Override + public int compare(RowBasedKey key1, RowBasedKey key2) + { + final int cmp = compareDimsInRows(key1, key2, 1); + if (cmp != 0) { + return cmp; + } + + return Longs.compare((long) key1.getKey()[0], (long) key2.getKey()[0]); } + }; + } else { + return new Comparator() + { + @Override + public int compare(RowBasedKey key1, RowBasedKey key2) + { + final int timeCompare = Longs.compare((long) key1.getKey()[0], (long) key2.getKey()[0]); - return Longs.compare(row1.getTimestamp(), row2.getTimestamp()); - } - }; + if (timeCompare != 0) { + return timeCompare; + } + + return compareDimsInRows(key1, key2, 1); + } + }; + } } else { return new Comparator() { @Override - public int compare( - RowBasedKey row1, RowBasedKey row2 - ) + public int compare(RowBasedKey key1, RowBasedKey key2) { - final int timeCompare = Longs.compare(row1.getTimestamp(), row2.getTimestamp()); - - if (timeCompare != 0) { - return timeCompare; - } - - return compareDimsInRows(row1, row2); + return compareDimsInRows(key1, key2, 0); } }; } } - private static int compareDimsInRows(RowBasedKey row1, RowBasedKey row2) + private static int compareDimsInRows(RowBasedKey key1, RowBasedKey key2, int dimStart) { - for (int i = 0; i < row1.getDimensions().length; i++) { - final int cmp = row1.getDimensions()[i].compareTo(row2.getDimensions()[i]); + for (int i = dimStart; i < key1.getKey().length; i++) { + final int cmp = ((String) key1.getKey()[i]).compareTo((String) key2.getKey()[i]); if (cmp != 0) { return cmp; } @@ -369,7 +391,7 @@ private static class RowBasedKeySerde implements Grouper.KeySerde // Entry in dictionary, node pointer in reverseDictionary, hash + k/v/next pointer in reverseDictionary nodes private static final int ROUGH_OVERHEAD_PER_DICTIONARY_ENTRY = Longs.BYTES * 5 + Ints.BYTES; - private final DateTime fudgeTimestamp; + private final boolean includeTimestamp; private final boolean sortByDimsFirst; private final int dimCount; private final int keySize; @@ -384,18 +406,18 @@ private static class RowBasedKeySerde implements Grouper.KeySerde // dictionary id -> its position if it were sorted by dictionary value private int[] sortableIds = null; - public RowBasedKeySerde( - final DateTime fudgeTimestamp, + RowBasedKeySerde( + final boolean includeTimestamp, final boolean sortByDimsFirst, final int dimCount, final long maxDictionarySize ) { - this.fudgeTimestamp = fudgeTimestamp; + this.includeTimestamp = includeTimestamp; this.sortByDimsFirst = sortByDimsFirst; this.dimCount = dimCount; this.maxDictionarySize = maxDictionarySize; - this.keySize = (fudgeTimestamp == null ? Longs.BYTES : 0) + dimCount * Ints.BYTES; + this.keySize = (includeTimestamp ? Longs.BYTES : 0) + dimCount * Ints.BYTES; this.keyBuffer = ByteBuffer.allocate(keySize); } @@ -416,12 +438,16 @@ public ByteBuffer toByteBuffer(RowBasedKey key) { keyBuffer.rewind(); - if (fudgeTimestamp == null) { - keyBuffer.putLong(key.getTimestamp()); + final int dimStart; + if (includeTimestamp) { + keyBuffer.putLong((long) key.getKey()[0]); + dimStart = 1; + } else { + dimStart = 0; } - for (int i = 0; i < key.getDimensions().length; i++) { - final int id = addToDictionary(key.getDimensions()[i]); + for (int i = dimStart; i < key.getKey().length; i++) { + final int id = addToDictionary((String) key.getKey()[i]); if (id < 0) { return null; } @@ -435,13 +461,26 @@ public ByteBuffer toByteBuffer(RowBasedKey key) @Override public RowBasedKey fromByteBuffer(ByteBuffer buffer, int position) { - final long timestamp = fudgeTimestamp == null ? buffer.getLong(position) : fudgeTimestamp.getMillis(); - final String[] dimensions = new String[dimCount]; - final int dimsPosition = fudgeTimestamp == null ? position + Longs.BYTES : position; - for (int i = 0; i < dimensions.length; i++) { - dimensions[i] = dictionary.get(buffer.getInt(dimsPosition + (Ints.BYTES * i))); + final int dimStart; + final Comparable[] key; + final int dimsPosition; + + if (includeTimestamp) { + key = new Comparable[dimCount + 1]; + key[0] = buffer.getLong(position); + dimsPosition = position + Longs.BYTES; + dimStart = 1; + } else { + key = new Comparable[dimCount]; + dimsPosition = position; + dimStart = 0; + } + + for (int i = dimStart; i < key.length; i++) { + key[i] = dictionary.get(buffer.getInt(dimsPosition + (Ints.BYTES * (i - dimStart)))); } - return new RowBasedKey(timestamp, dimensions); + + return new RowBasedKey(key); } @Override @@ -459,8 +498,7 @@ public Grouper.KeyComparator bufferComparator() } } - - if (fudgeTimestamp == null) { + if (includeTimestamp) { if (sortByDimsFirst) { return new Grouper.KeyComparator() { @@ -505,7 +543,6 @@ public int compare(ByteBuffer lhsBuffer, ByteBuffer rhsBuffer, int lhsPosition, } }; } - } else { return new Grouper.KeyComparator() { diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index d1832acbcf9e..7e1ab35f6b9e 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -61,6 +61,7 @@ public class GroupByStrategyV2 implements GroupByStrategy { public static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp"; + public static final String CTX_KEY_OUTERMOST = "groupByOutermost"; private final DruidProcessingConfig processingConfig; private final Supplier configSupplier; @@ -158,7 +159,8 @@ protected BinaryFn createMergeFn(Query queryParam) ImmutableMap.of( "finalize", false, GroupByQueryConfig.CTX_KEY_STRATEGY, GroupByStrategySelector.STRATEGY_V2, - CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()) + CTX_KEY_FUDGE_TIMESTAMP, fudgeTimestamp == null ? "" : String.valueOf(fudgeTimestamp.getMillis()), + CTX_KEY_OUTERMOST, false ) ), responseContext @@ -168,9 +170,13 @@ protected BinaryFn createMergeFn(Query queryParam) @Override public Row apply(final Row row) { - // Maybe apply postAggregators. + // Apply postAggregators and fudgeTimestamp if present and if this is the outermost mergeResults. - if (query.getPostAggregatorSpecs().isEmpty()) { + if (!query.getContextBoolean(CTX_KEY_OUTERMOST, true)) { + return row; + } + + if (query.getPostAggregatorSpecs().isEmpty() && fudgeTimestamp == null) { return row; } @@ -186,7 +192,7 @@ public Row apply(final Row row) } } - return new MapBasedRow(row.getTimestamp(), newMap); + return new MapBasedRow(fudgeTimestamp != null ? fudgeTimestamp : row.getTimestamp(), newMap); } } )