Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.joda.time.Interval;
Expand Down Expand Up @@ -227,10 +228,10 @@ public void tearDown() throws IOException

private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
return new OnheapIncrementalIndex.Builder()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this method take a parameter to decide which type of index to return?
or is this the default builder?
then maybe buildDefaultIncIndex and the default should be some hard coded value that can be changed over time

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. But for the sake of reducing the diff size, I'd prefer to avoid this refactor.

.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
.build();
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.benchmark;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
Expand Down Expand Up @@ -68,13 +69,17 @@
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
Expand All @@ -88,7 +93,6 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -113,22 +117,22 @@ public class FilteredAggregatorBenchmark
@Param({"false", "true"})
private String vectorize;

@Param({"true", "false"})
private boolean descending;

private static final Logger log = new Logger(FilteredAggregatorBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
private IncrementalIndex incIndex;
private IncrementalIndex incIndexFilteredAgg;
private AggregatorFactory[] filteredMetrics;
private QueryableIndex qIndex;
private File indexFile;

private AppendableIndexSpec appendableIndexSpec;
private AggregatorFactory filteredMetric;
private DimFilter filter;
private List<InputRow> inputRows;
private DataGenerator generator;
private QueryRunnerFactory factory;
private GeneratorSchemaInfo schemaInfo;
private TimeseriesQuery query;
private File tmpDir;

static {
JSON_MAPPER = new DefaultObjectMapper();
Expand All @@ -146,24 +150,25 @@ public int columnCacheSizeBytes()
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
}

/**
* Setup everything common for benchmarking both the incremental-index and the queriable-index.
*/
@Setup
public void setup() throws IOException
public void setup()
{
log.info("SETUP CALLED AT " + System.currentTimeMillis());

ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());

schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);

DataGenerator gen = new DataGenerator(
generator = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
schemaInfo.getDataInterval(),
rowsPerSegment
);

incIndex = makeIncIndex(schemaInfo.getAggsArray());

filter = new OrDimFilter(
Arrays.asList(
new BoundDimFilter("dimSequential", "-1", "-1", true, true, null, null, StringComparators.ALPHANUMERIC),
Expand All @@ -172,30 +177,7 @@ public void setup() throws IOException
new InDimFilter("dimSequential", Collections.singletonList("X"), null)
)
);
filteredMetrics = new AggregatorFactory[1];
filteredMetrics[0] = new FilteredAggregatorFactory(new CountAggregatorFactory("rows"), filter);
incIndexFilteredAgg = makeIncIndex(filteredMetrics);

inputRows = new ArrayList<>();
for (int j = 0; j < rowsPerSegment; j++) {
InputRow row = gen.nextRow();
if (j % 10000 == 0) {
log.info(j + " rows generated.");
}
incIndex.add(row);
inputRows.add(row);
}

tmpDir = FileUtils.createTempDir();
log.info("Using temp dir: " + tmpDir.getAbsolutePath());

indexFile = INDEX_MERGER_V9.persist(
incIndex,
tmpDir,
new IndexSpec(),
null
);
qIndex = INDEX_IO.loadIndex(indexFile);
filteredMetric = new FilteredAggregatorFactory(new CountAggregatorFactory("rows"), filter);

factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
Expand All @@ -205,30 +187,127 @@ public void setup() throws IOException

GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
List<AggregatorFactory> queryAggs = new ArrayList<>();
queryAggs.add(filteredMetrics[0]);
List<AggregatorFactory> queryAggs = Collections.singletonList(filteredMetric);

query = Druids.newTimeseriesQueryBuilder()
.dataSource("blah")
.granularity(Granularities.ALL)
.intervals(intervalSpec)
.aggregators(queryAggs)
.descending(false)
.descending(descending)
.build();
}

@TearDown
public void tearDown() throws IOException
/**
* Setup/teardown everything specific for benchmarking the incremental-index.
*/
@State(Scope.Benchmark)
public static class IncrementalIndexState
{
@Param({"onheap", "offheap"})
private String indexType;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since now there is a new extension point for incremental index, shouldn't the type be extendable as well?
use enum instead of string and names like defaultOnHeap and OakOffHeap so additional on/off-heap implementations can be added in the future

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is indeed to allow a future index to be tested with the same code.
Using Enum will force this enumeration to list all existing index types in the core Druid package, albeit the index may only exist as an extension.
This way (using string), the user can choose any indexType name in the command line without it having to be pre-defined in the code.


IncrementalIndex<?> incIndex;

@Setup
public void setup(FilteredAggregatorBenchmark global) throws JsonProcessingException
{
// Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
// It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth mentioning that this is where the type of the index is set in the spec, and later used by the factory

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

incIndex = global.makeIncIndex(global.schemaInfo.getAggsArray());
global.generator.addToIndex(incIndex, global.rowsPerSegment);
}

@TearDown
public void tearDown()
{
if (incIndex != null) {
incIndex.close();
}
}
}

/**
* Setup/teardown everything specific for benchmarking the ingestion of the incremental-index.
*/
@State(Scope.Benchmark)
public static class IncrementalIndexIngestState
{
@Param({"onheap", "offheap"})
private String indexType;

IncrementalIndex<?> incIndex;
List<InputRow> inputRows;

@Setup(Level.Invocation)
public void setup(FilteredAggregatorBenchmark global) throws JsonProcessingException
{
// Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
// It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
inputRows = global.generator.toList(global.rowsPerSegment);
incIndex = global.makeIncIndex(new AggregatorFactory[]{global.filteredMetric});
}

@TearDown(Level.Invocation)
public void tearDown()
{
if (incIndex != null) {
incIndex.close();
}
}
}

/**
* Setup/teardown everything specific for benchmarking the queriable-index.
*/
@State(Scope.Benchmark)
public static class QueryableIndexState
{
FileUtils.deleteDirectory(tmpDir);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the diff here is very misleading - this line is part of a one line method tearDown that was deleted

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that it was not deleted. It just moved below to the teardown of the QueryableIndexState: qIndexesDir.delete();

private File qIndexesDir;
private QueryableIndex qIndex;

@Setup
public void setup(FilteredAggregatorBenchmark global) throws IOException
{
global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();

IncrementalIndex<?> incIndex = global.makeIncIndex(global.schemaInfo.getAggsArray());
global.generator.addToIndex(incIndex, global.rowsPerSegment);

qIndexesDir = FileUtils.createTempDir();
log.info("Using temp dir: " + qIndexesDir.getAbsolutePath());

File indexFile = INDEX_MERGER_V9.persist(
incIndex,
qIndexesDir,
new IndexSpec(),
null
);
incIndex.close();

qIndex = INDEX_IO.loadIndex(indexFile);
}

@TearDown
public void tearDown()
{
if (qIndex != null) {
qIndex.close();
}
if (qIndexesDir != null) {
qIndexesDir.delete();
}
}
}

private IncrementalIndex makeIncIndex(AggregatorFactory[] metrics)
private IncrementalIndex<?> makeIncIndex(AggregatorFactory[] metrics)
{
return new IncrementalIndex.Builder()
return appendableIndexSpec.builder()
.setSimpleTestingIndexSchema(metrics)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
.build();
}

private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query, String vectorize)
Expand All @@ -254,24 +333,23 @@ private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runn
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void ingest(Blackhole blackhole) throws Exception
public void ingest(Blackhole blackhole, IncrementalIndexIngestState state) throws Exception
{
incIndexFilteredAgg = makeIncIndex(filteredMetrics);
for (InputRow row : inputRows) {
int rv = incIndexFilteredAgg.add(row).getRowCount();
for (InputRow row : state.inputRows) {
int rv = state.incIndex.add(row).getRowCount();
blackhole.consume(rv);
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void querySingleIncrementalIndex(Blackhole blackhole)
public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
{
QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("incIndex"),
new IncrementalIndexSegment(incIndex, SegmentId.dummy("incIndex"))
new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
);

List<Result<TimeseriesResultValue>> results = FilteredAggregatorBenchmark.runQuery(
Expand All @@ -288,12 +366,12 @@ public void querySingleIncrementalIndex(Blackhole blackhole)
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void querySingleQueryableIndex(Blackhole blackhole)
public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
{
final QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("qIndex"),
new QueryableIndexSegment(qIndex, SegmentId.dummy("qIndex"))
new QueryableIndexSegment(state.qIndex, SegmentId.dummy("qIndex"))
);

List<Result<TimeseriesResultValue>> results = FilteredAggregatorBenchmark.runQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -413,11 +414,11 @@ public String getFormatString()

private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
return new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setConcurrentEventAdd(true)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
.build();
}

@TearDown(Level.Trial)
Expand Down
Loading