Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;

import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
Expand All @@ -39,6 +39,7 @@
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
Expand Down Expand Up @@ -105,14 +106,14 @@

@State(Scope.Benchmark)
@Fork(jvmArgsPrepend = "-server", value = 1)
@Warmup(iterations = 10)
@Measurement(iterations = 25)
@Warmup(iterations = 15)
@Measurement(iterations = 30)
public class GroupByBenchmark
{
@Param({"4"})
private int numSegments;

@Param({"4"})
@Param({"2", "4"})
private int numProcessingThreads;

@Param({"-1"})
Expand All @@ -127,6 +128,9 @@ public class GroupByBenchmark
@Param({"v1", "v2"})
private String defaultStrategy;

@Param({"all", "day"})
private String queryGranularity;

private static final Logger log = new Logger(GroupByBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
Expand All @@ -137,7 +141,7 @@ public class GroupByBenchmark
private IncrementalIndex anIncrementalIndex;
private List<QueryableIndex> queryableIndexes;

private QueryRunnerFactory factory;
private QueryRunnerFactory<Row, GroupByQuery> factory;

private BenchmarkSchemaInfo schemaInfo;
private GroupByQuery query;
Expand Down Expand Up @@ -190,7 +194,7 @@ private void setupQueries()
.setAggregatorSpecs(
queryAggs
)
.setGranularity(QueryGranularities.DAY)
.setGranularity(QueryGranularity.fromString(queryGranularity))
.build();

basicQueries.put("A", queryA);
Expand Down Expand Up @@ -335,7 +339,7 @@ public int getBufferGrouperInitialBuckets()
@Override
public long getMaxOnDiskStorage()
{
return 0L;
return 1_000_000_000L;
}
};
config.setSingleThreaded(false);
Expand Down Expand Up @@ -475,29 +479,68 @@ public void querySingleQueryableIndex(Blackhole blackhole) throws Exception
}
}


@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception
{
List<QueryRunner<Row>> singleSegmentRunners = Lists.newArrayList();
QueryToolChest toolChest = factory.getToolchest();
for (int i = 0; i < numSegments; i++) {
String segmentName = "qIndex" + i;
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentName,
new QueryableIndexSegment(segmentName, queryableIndexes.get(i))
);
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
QueryToolChest<Row, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<Row> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(executorService, makeMultiRunners())
),
(QueryToolChest) toolChest
);

Sequence<Row> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
List<Row> results = Sequences.toList(queryResult, Lists.<Row>newArrayList());

for (Row result : results) {
blackhole.consume(result);
}
}

QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
toolChest
)
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndexWithSpilling(Blackhole blackhole) throws Exception
{
QueryToolChest<Row, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<Row> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(executorService, makeMultiRunners())
),
(QueryToolChest) toolChest
);

final GroupByQuery spillingQuery = query.withOverriddenContext(
ImmutableMap.<String, Object>of("bufferGrouperMaxSize", 4000)
);
Sequence<Row> queryResult = theRunner.run(spillingQuery, Maps.<String, Object>newHashMap());
List<Row> results = Sequences.toList(queryResult, Lists.<Row>newArrayList());

for (Row result : results) {
blackhole.consume(result);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void queryMultiQueryableIndexWithSerde(Blackhole blackhole) throws Exception
{
QueryToolChest<Row, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<Row> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
new SerializingQueryRunner<>(
new DefaultObjectMapper(new SmileFactory()),
Row.class,
toolChest.mergeResults(
factory.mergeRunners(executorService, makeMultiRunners())
)
)
),
(QueryToolChest) toolChest
);

Sequence<Row> queryResult = theRunner.run(query, Maps.<String, Object>newHashMap());
Expand All @@ -507,4 +550,19 @@ public void queryMultiQueryableIndex(Blackhole blackhole) throws Exception
blackhole.consume(result);
}
}

private List<QueryRunner<Row>> makeMultiRunners()
{
List<QueryRunner<Row>> runners = Lists.newArrayList();
for (int i = 0; i < numSegments; i++) {
String segmentName = "qIndex" + i;
QueryRunner<Row> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentName,
new QueryableIndexSegment(segmentName, queryableIndexes.get(i))
);
runners.add(factory.getToolchest().preMergeQueryDecoration(runner));
}
return runners;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.benchmark.query;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryRunner;

import java.util.Map;

public class SerializingQueryRunner<T> implements QueryRunner<T>
{
private final ObjectMapper smileMapper;
private final QueryRunner<T> baseRunner;
private final Class<T> clazz;

public SerializingQueryRunner(
ObjectMapper smileMapper,
Class<T> clazz,
QueryRunner<T> baseRunner
)
{
this.smileMapper = smileMapper;
this.clazz = clazz;
this.baseRunner = baseRunner;
}

@Override
public Sequence<T> run(
final Query<T> query,
final Map<String, Object> responseContext
)
{
return Sequences.map(
baseRunner.run(query, responseContext),
new Function<T, T>()
{
@Override
public T apply(T input)
{
try {
return smileMapper.readValue(smileMapper.writeValueAsBytes(input), clazz);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
);
}
}
18 changes: 16 additions & 2 deletions processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
Expand Down Expand Up @@ -285,7 +286,18 @@ public Ordering<Row> getRowOrdering(final boolean granular)

final Comparator<Row> timeComparator = getTimeComparator(granular);

if (sortByDimsFirst) {
if (timeComparator == null) {
return Ordering.from(
new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
return compareDims(dimensions, lhs, rhs);
}
}
);
} else if (sortByDimsFirst) {
return Ordering.from(
new Comparator<Row>()
{
Expand Down Expand Up @@ -323,7 +335,9 @@ public int compare(Row lhs, Row rhs)

private Comparator<Row> getTimeComparator(boolean granular)
{
if (granular) {
if (QueryGranularities.ALL.equals(granularity)) {
return null;
} else if (granular) {
return new Comparator<Row>()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package io.druid.query.groupby.epinephelinae;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
Expand Down Expand Up @@ -135,12 +134,13 @@ public boolean aggregate(KeyType key, int keyHash)
return false;
}

Preconditions.checkArgument(
keyBuffer.remaining() == keySize,
"keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!",
keyBuffer.remaining(),
keySize
);
if (keyBuffer.remaining() != keySize) {
throw new IAE(
"keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!",
keyBuffer.remaining(),
keySize
);
}

int bucket = findBucket(
tableBuffer,
Expand Down
Loading