diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 25f4c600371c..42c966daa2a1 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -78,6 +78,11 @@ json-flattener 0.1.0 + + org.easymock + easymock + compile + junit junit @@ -87,8 +92,8 @@ UTF-8 - 1.17.2 - 1.7 + 1.19 + 1.8 benchmarks diff --git a/benchmarks/src/main/java/io/druid/benchmark/ExpressionBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/ExpressionBenchmark.java new file mode 100644 index 000000000000..3a2f6444ec94 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/ExpressionBenchmark.java @@ -0,0 +1,244 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import io.druid.benchmark.datagen.BenchmarkColumnSchema; +import io.druid.benchmark.datagen.BenchmarkSchemaInfo; +import io.druid.benchmark.datagen.SegmentGenerator; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.js.JavaScriptConfig; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.JavaScriptAggregatorFactory; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.Cursor; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexStorageAdapter; +import io.druid.segment.VirtualColumns; +import io.druid.segment.column.ValueType; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.LinearShardSpec; +import org.joda.time.Interval; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 15) +@Measurement(iterations = 30) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ExpressionBenchmark +{ + @Param({"1000000"}) + private int rowsPerSegment; + + private SegmentGenerator segmentGenerator; + private QueryableIndex index; + private JavaScriptAggregatorFactory javaScriptAggregatorFactory; + private DoubleSumAggregatorFactory expressionAggregatorFactory; + private ByteBuffer aggregationBuffer = ByteBuffer.allocate(Double.BYTES); + + @Setup(Level.Trial) + public void setup() throws Exception + { + final BenchmarkSchemaInfo schemaInfo = new BenchmarkSchemaInfo( + ImmutableList.of( + BenchmarkColumnSchema.makeNormal("x", ValueType.FLOAT, false, 1, 0d, 0d, 10000d, false), + BenchmarkColumnSchema.makeNormal("y", ValueType.FLOAT, false, 1, 0d, 0d, 10000d, false) + ), + ImmutableList.of(), + new Interval("2000/P1D"), + false + ); + + final DataSegment dataSegment = DataSegment.builder() + .dataSource("foo") + .interval(schemaInfo.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .build(); + + this.segmentGenerator = new SegmentGenerator(); + this.index = segmentGenerator.generate(dataSegment, schemaInfo, rowsPerSegment); + this.javaScriptAggregatorFactory = new JavaScriptAggregatorFactory( + "name", + ImmutableList.of("x", "y"), + "function(current,x,y) { if (x > 0) { return current + x + 1 } else { return current + y + 1 } }", + "function() { return 0 }", + "function(a,b) { return a + b }", + JavaScriptConfig.getEnabledInstance() + ); + this.expressionAggregatorFactory = new DoubleSumAggregatorFactory( + "name", + null, + "if(x>0,1.0+x,y+1)" + ); + } + + @TearDown(Level.Trial) + public void tearDown() throws Exception + { + if (index != null) { + index.close(); + index = null; + } + + if (segmentGenerator != null) { + segmentGenerator.close(); + segmentGenerator = null; + } + } + + @Benchmark + public void queryUsingJavaScript(Blackhole blackhole) throws Exception + { + final Double result = compute(javaScriptAggregatorFactory::factorizeBuffered); + blackhole.consume(result); + } + + @Benchmark + public void queryUsingExpression(Blackhole blackhole) throws Exception + { + final Double result = compute(expressionAggregatorFactory::factorizeBuffered); + blackhole.consume(result); + } + + @Benchmark + public void queryUsingNative(Blackhole blackhole) throws Exception + { + final Double result = compute( + columnSelectorFactory -> + new NativeBufferAggregator( + columnSelectorFactory.makeFloatColumnSelector("x"), + columnSelectorFactory.makeFloatColumnSelector("y") + ) + ); + blackhole.consume(result); + } + + private double compute(final Function aggregatorFactory) + { + final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(index); + + final Sequence cursors = adapter.makeCursors( + null, + index.getDataInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false + ); + + final List results = Sequences.toList( + Sequences.map( + cursors, + cursor -> { + final BufferAggregator bufferAggregator = aggregatorFactory.apply(cursor); + bufferAggregator.init(aggregationBuffer, 0); + + while (!cursor.isDone()) { + bufferAggregator.aggregate(aggregationBuffer, 0); + cursor.advance(); + } + + final Double dbl = (Double) bufferAggregator.get(aggregationBuffer, 0); + bufferAggregator.close(); + return dbl; + } + ), + new ArrayList<>() + ); + + return Iterables.getOnlyElement(results); + } + + private static class NativeBufferAggregator implements BufferAggregator + { + private final FloatColumnSelector xSelector; + private final FloatColumnSelector ySelector; + + public NativeBufferAggregator(final FloatColumnSelector xSelector, final FloatColumnSelector ySelector) + { + this.xSelector = xSelector; + this.ySelector = ySelector; + } + + @Override + public void init(final ByteBuffer buf, final int position) + { + buf.putDouble(0, 0d); + } + + @Override + public void aggregate(final ByteBuffer buf, final int position) + { + final float x = xSelector.get(); + final double n = x > 0 ? x + 1 : ySelector.get() + 1; + buf.putDouble(0, buf.getDouble(position) + n); + } + + @Override + public Object get(final ByteBuffer buf, final int position) + { + return buf.getDouble(position); + } + + @Override + public float getFloat(final ByteBuffer buf, final int position) + { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(final ByteBuffer buf, final int position) + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() + { + + } + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java index 5a15f55e8148..12a1299c7c59 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java @@ -100,7 +100,7 @@ import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 10) @Measurement(iterations = 25) public class FilterPartitionBenchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/FilteredAggregatorBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/FilteredAggregatorBenchmark.java index f065e725b754..40a5180d5483 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/FilteredAggregatorBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/FilteredAggregatorBenchmark.java @@ -100,7 +100,7 @@ import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 10) @Measurement(iterations = 25) public class FilteredAggregatorBenchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java index 78211112449e..e4bf398dcb17 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -106,7 +106,7 @@ // Benchmark for determining the interface overhead of GroupBy with multiple type implementations @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 15) @Measurement(iterations = 30) public class GroupByTypeInterfaceBenchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/TopNTypeInterfaceBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/TopNTypeInterfaceBenchmark.java index 9ef2eb93a3ec..48d312aed74a 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/TopNTypeInterfaceBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/TopNTypeInterfaceBenchmark.java @@ -99,7 +99,7 @@ // Benchmark for determining the interface overhead of TopN with multiple type implementations @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 10) @Measurement(iterations = 25) public class TopNTypeInterfaceBenchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/SegmentGenerator.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/SegmentGenerator.java new file mode 100644 index 000000000000..78ba8ff70818 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/SegmentGenerator.java @@ -0,0 +1,190 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark.datagen; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.io.Files; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.DimensionSchema; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.FloatDimensionSchema; +import io.druid.data.input.impl.LongDimensionSchema; +import io.druid.data.input.impl.StringDimensionSchema; +import io.druid.hll.HyperLogLogHash; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import io.druid.segment.IndexBuilder; +import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexIndexableAdapter; +import io.druid.segment.TestHelper; +import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.serde.ComplexMetrics; +import io.druid.timeline.DataSegment; +import org.apache.commons.io.FileUtils; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class SegmentGenerator implements Closeable +{ + private static final Logger log = new Logger(SegmentGenerator.class); + + private static final int MAX_ROWS_IN_MEMORY = 200000; + private static final int STARTING_SEED = 9999; // Consistent seed for reproducibility + + private final File tempDir; + private final AtomicInteger seed; + + public SegmentGenerator() + { + this.tempDir = Files.createTempDir(); + this.seed = new AtomicInteger(STARTING_SEED); + } + + public QueryableIndex generate( + final DataSegment dataSegment, + final BenchmarkSchemaInfo schemaInfo, + final int numRows + ) + { + // In case we need to generate hyperUniques. + if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault())); + } + + final BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + seed.getAndIncrement(), + schemaInfo.getDataInterval(), + numRows + ); + + final List dimensions = new ArrayList<>(); + for (BenchmarkColumnSchema columnSchema : schemaInfo.getColumnSchemas()) { + if (schemaInfo.getAggs().stream().noneMatch(agg -> agg.getName().equals(columnSchema.getName()))) { + switch (columnSchema.getType()) { + case STRING: + dimensions.add(new StringDimensionSchema(columnSchema.getName())); + break; + case LONG: + dimensions.add(new LongDimensionSchema(columnSchema.getName())); + break; + case FLOAT: + dimensions.add(new FloatDimensionSchema(columnSchema.getName())); + break; + default: + throw new ISE("Unhandleable type[%s]", columnSchema.getType()); + } + } + } + + final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() + .withDimensionsSpec(new DimensionsSpec(dimensions, ImmutableList.of(), ImmutableList.of())) + .withMetrics(schemaInfo.getAggsArray()) + .withRollup(schemaInfo.isWithRollup()) + .build(); + + final List rows = new ArrayList<>(); + final List indexes = new ArrayList<>(); + + for (int i = 0; i < numRows; i++) { + final InputRow row = dataGenerator.nextRow(); + rows.add(row); + + if ((i + 1) % 20000 == 0) { + log.info("%,d/%,d rows generated.", i + 1, numRows); + } + + if (rows.size() % MAX_ROWS_IN_MEMORY == 0) { + indexes.add(makeIndex(dataSegment.getIdentifier(), indexes.size(), rows, indexSchema)); + rows.clear(); + } + } + + log.info("%,d/%,d rows generated.", numRows, numRows); + + if (rows.size() > 0) { + indexes.add(makeIndex(dataSegment.getIdentifier(), indexes.size(), rows, indexSchema)); + rows.clear(); + } + + if (indexes.isEmpty()) { + throw new ISE("No rows to index?"); + } else if (indexes.size() == 1) { + return Iterables.getOnlyElement(indexes); + } else { + try { + final QueryableIndex merged = TestHelper.getTestIndexIO().loadIndex( + TestHelper.getTestIndexMergerV9().merge( + indexes.stream().map(QueryableIndexIndexableAdapter::new).collect(Collectors.toList()), + false, + schemaInfo.getAggs() + .stream() + .map(AggregatorFactory::getCombiningFactory) + .toArray(AggregatorFactory[]::new), + new File(tempDir, "merged"), + new IndexSpec() + ) + ); + + for (QueryableIndex index : indexes) { + index.close(); + } + + return merged; + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + } + + @Override + public void close() throws IOException + { + FileUtils.deleteDirectory(tempDir); + } + + private QueryableIndex makeIndex( + final String identifier, + final int indexNumber, + final List rows, + final IncrementalIndexSchema indexSchema + ) + { + return IndexBuilder + .create() + .schema(indexSchema) + .tmpDir(new File(new File(tempDir, identifier), String.valueOf(indexNumber))) + .indexMerger(TestHelper.getTestIndexMergerV9()) + .rows(rows) + .buildMMappedIndex(); + } +} diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java index a0b67105e01d..6d2df0c64128 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java @@ -72,7 +72,7 @@ import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 10) @Measurement(iterations = 25) public class IncrementalIndexReadBenchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java index 015fe82b80eb..69b3b017f564 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java @@ -51,7 +51,7 @@ import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 10) @Measurement(iterations = 25) public class IndexIngestionBenchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java index 3702f4643050..6d8fdaac4d3b 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java @@ -63,7 +63,7 @@ import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 10) @Measurement(iterations = 25) public class IndexMergeBenchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java index 6e3762616043..7bd825262815 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java @@ -62,7 +62,7 @@ import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 10) @Measurement(iterations = 25) public class IndexPersistBenchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index ded7ebba0a2b..417cdfcc406f 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -106,7 +106,7 @@ import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 15) @Measurement(iterations = 30) public class GroupByBenchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SearchBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SearchBenchmark.java index a97d34fab585..15119da67ca9 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/SearchBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SearchBenchmark.java @@ -105,7 +105,7 @@ import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 10) @Measurement(iterations = 25) public class SearchBenchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SelectBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SelectBenchmark.java index ef843f68c25a..c21af6bde443 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/SelectBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SelectBenchmark.java @@ -97,7 +97,7 @@ import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 10) @Measurement(iterations = 25) public class SelectBenchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java index 6a38eec88e0e..290aae43aa2a 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java @@ -23,13 +23,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; -import io.druid.benchmark.datagen.BenchmarkDataGenerator; import io.druid.benchmark.datagen.BenchmarkSchemaInfo; import io.druid.benchmark.datagen.BenchmarkSchemas; +import io.druid.benchmark.datagen.SegmentGenerator; import io.druid.common.utils.JodaUtils; -import io.druid.data.input.InputRow; import io.druid.data.input.Row; -import io.druid.hll.HyperLogLogHash; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; @@ -38,15 +36,11 @@ import io.druid.query.TableDataSource; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; -import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.groupby.GroupByQuery; -import io.druid.segment.IndexBuilder; import io.druid.segment.QueryableIndex; -import io.druid.segment.TestHelper; import io.druid.segment.column.ValueType; -import io.druid.segment.serde.ComplexMetrics; import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidPlanner; @@ -82,7 +76,6 @@ import java.io.File; import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -90,18 +83,19 @@ * Benchmark that compares the same groupBy query through the native query layer and through the SQL layer. */ @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 15) @Measurement(iterations = 30) public class SqlBenchmark { - @Param({"10000", "100000", "200000"}) + @Param({"200000", "1000000"}) private int rowsPerSegment; private static final Logger log = new Logger(SqlBenchmark.class); private static final int RNG_SEED = 9999; private File tmpDir; + private SegmentGenerator segmentGenerator; private SpecificSegmentsQuerySegmentWalker walker; private PlannerFactory plannerFactory; private GroupByQuery groupByQuery; @@ -113,46 +107,22 @@ public void setup() throws Exception tmpDir = Files.createTempDir(); log.info("Starting benchmark setup using tmpDir[%s], rows[%,d].", tmpDir, rowsPerSegment); - if (ComplexMetrics.getSerdeForType("hyperUnique") == null) { - ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault())); - } - final BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get("basic"); - final BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator( - schemaInfo.getColumnSchemas(), - RNG_SEED + 1, - schemaInfo.getDataInterval(), - rowsPerSegment - ); - final List rows = Lists.newArrayList(); - for (int i = 0; i < rowsPerSegment; i++) { - final InputRow row = dataGenerator.nextRow(); - if (i % 20000 == 0) { - log.info("%,d/%,d rows generated.", i, rowsPerSegment); - } - rows.add(row); - } + final DataSegment dataSegment = DataSegment.builder() + .dataSource("foo") + .interval(schemaInfo.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .build(); - log.info("%,d/%,d rows generated.", rows.size(), rowsPerSegment); + this.segmentGenerator = new SegmentGenerator(); - final PlannerConfig plannerConfig = new PlannerConfig(); + final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, rowsPerSegment); final QueryRunnerFactoryConglomerate conglomerate = CalciteTests.queryRunnerFactoryConglomerate(); - final QueryableIndex index = IndexBuilder.create() - .tmpDir(new File(tmpDir, "1")) - .indexMerger(TestHelper.getTestIndexMergerV9()) - .rows(rows) - .buildMMappedIndex(); - - this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add( - DataSegment.builder() - .dataSource("foo") - .interval(index.getDataInterval()) - .version("1") - .shardSpec(new LinearShardSpec(0)) - .build(), - index - ); + final PlannerConfig plannerConfig = new PlannerConfig(); + + this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index); final Map tableMap = ImmutableMap.of( "foo", @@ -211,6 +181,11 @@ public void tearDown() throws Exception walker = null; } + if (segmentGenerator != null) { + segmentGenerator.close(); + segmentGenerator = null; + } + if (tmpDir != null) { FileUtils.deleteDirectory(tmpDir); } diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/TimeseriesBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/TimeseriesBenchmark.java index 5ea9e85bf1ed..f0a6c7b36983 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/TimeseriesBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/TimeseriesBenchmark.java @@ -100,7 +100,7 @@ import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 10) @Measurement(iterations = 25) public class TimeseriesBenchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java index 7b8db2997dce..b2d337c6e4e0 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java @@ -97,7 +97,7 @@ import java.util.concurrent.TimeUnit; @State(Scope.Benchmark) -@Fork(jvmArgsPrepend = "-server", value = 1) +@Fork(value = 1) @Warmup(iterations = 10) @Measurement(iterations = 25) public class TopNBenchmark