Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.benchmark.query;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
Expand All @@ -32,13 +33,16 @@
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.collections.BlockingPool;
import io.druid.collections.StupidPool;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.offheap.OffheapBufferPool;
import io.druid.offheap.OffheapBufferGenerator;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
Expand All @@ -54,6 +58,9 @@
import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.groupby.GroupByQueryQueryToolChest;
import io.druid.query.groupby.GroupByQueryRunnerFactory;
import io.druid.query.groupby.strategy.GroupByStrategySelector;
import io.druid.query.groupby.strategy.GroupByStrategyV1;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.segment.IncrementalIndexSegment;
Expand All @@ -67,7 +74,6 @@
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -83,6 +89,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
Expand All @@ -100,12 +107,21 @@ public class GroupByBenchmark
@Param({"4"})
private int numSegments;

@Param({"4"})
private int numProcessingThreads;

@Param({"-1"})
private int initialBuckets;

@Param({"100000"})
private int rowsPerSegment;

@Param({"basic.A"})
private String schemaAndQuery;

@Param({"v1", "v2"})
private String defaultStrategy;

private static final Logger log = new Logger(GroupByBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
Expand Down Expand Up @@ -186,7 +202,7 @@ public void setup() throws IOException
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
}
executorService = Execs.multiThreaded(numSegments, "GroupByThreadPool");
executorService = Execs.multiThreaded(numProcessingThreads, "GroupByThreadPool[%d]");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most formats are with the thread enumeration after a dash rather than in brackets. Suggest keeping that here.


setupQueries();

Expand Down Expand Up @@ -237,25 +253,75 @@ public void setup() throws IOException
qIndexes.add(qIndex);
}

OffheapBufferPool bufferPool = new OffheapBufferPool(250000000, Integer.MAX_VALUE);
OffheapBufferPool bufferPool2 = new OffheapBufferPool(250000000, Integer.MAX_VALUE);
final GroupByQueryConfig config = new GroupByQueryConfig();
StupidPool<ByteBuffer> bufferPool = new StupidPool<>(
new OffheapBufferGenerator("compute", 250000000),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these be more descriptive, and are those values required to be fixed? Are the changes in direct memory needserves documented?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is benchmark, nevermind

Integer.MAX_VALUE
);
BlockingPool<ByteBuffer> mergePool = new BlockingPool<>(
new OffheapBufferGenerator("merge", 250000000),
1
);
final GroupByQueryConfig config = new GroupByQueryConfig()
{
@Override
public String getDefaultStrategy()
{
return defaultStrategy;
}

@Override
public int getBufferGrouperInitialBuckets()
{
return initialBuckets;
}
};
config.setSingleThreaded(false);
config.setMaxIntermediateRows(1000000);
config.setMaxResults(1000000);
config.setMaxIntermediateRows(Integer.MAX_VALUE);
config.setMaxResults(Integer.MAX_VALUE);

DruidProcessingConfig druidProcessingConfig = new DruidProcessingConfig()
{
@Override
public int getNumThreads()
{
// Used by "v2" strategy for concurrencyHint
return numProcessingThreads;
}

@Override
public String getFormatString()
{
return null;
}
};

final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, bufferPool);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
QueryBenchmarkUtil.NOOP_QUERYWATCHER,
bufferPool
),
new GroupByStrategyV2(
druidProcessingConfig,
configSupplier,
bufferPool,
mergePool,
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
)
);

factory = new GroupByQueryRunnerFactory(
engine,
QueryBenchmarkUtil.NOOP_QUERYWATCHER,
configSupplier,
strategySelector,
new GroupByQueryQueryToolChest(
configSupplier, JSON_MAPPER, engine, bufferPool2,
configSupplier,
strategySelector,
bufferPool,
QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()
),
bufferPool2
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
import io.druid.benchmark.datagen.BenchmarkSchemas;
import io.druid.collections.StupidPool;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.offheap.OffheapBufferPool;
import io.druid.offheap.OffheapBufferGenerator;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
Expand Down Expand Up @@ -236,7 +237,7 @@ public void setup() throws IOException
}

factory = new TopNQueryRunnerFactory(
new OffheapBufferPool(250000000, Integer.MAX_VALUE),
new StupidPool<>(new OffheapBufferGenerator("compute", 250000000), Integer.MAX_VALUE),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
Expand Down
119 changes: 119 additions & 0 deletions common/src/main/java/io/druid/collections/BlockingPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.collections;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;

import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations.
*/
public class BlockingPool<T>
{
private static final Logger log = new Logger(BlockingPool.class);

private final BlockingQueue<T> objects;

public BlockingPool(
Supplier<T> generator,
int limit
)
{
this.objects = limit > 0 ? new ArrayBlockingQueue<T>(limit) : null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preconditions check that limit is set?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm i see it below


for (int i = 0; i < limit; i++) {
objects.add(generator.get());
}
}

/**
* Take a resource from the pool.
*
* @param timeout maximum time to wait for a resource, in milliseconds. Negative means do not use a timeout.
*
* @return a resource, or null if the timeout was reached
*
* @throws InterruptedException if interrupted while waiting for a resource to become available
*/
public ResourceHolder<T> take(final long timeout) throws InterruptedException
{
Preconditions.checkState(objects != null, "Pool was initialized with limit = 0, there are no objects to take.");
final T theObject = timeout >= 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.take();
return theObject == null ? null : new ObjectResourceHolder(theObject);
}

/**
* Similar to StupidPool.ObjectResourceHolder, except this one has no objectsCacheMaxCount, and it returns objects
* to the pool on finalize.
*/
private class ObjectResourceHolder implements ResourceHolder<T>
{
private AtomicBoolean closed = new AtomicBoolean(false);
private final T object;

public ObjectResourceHolder(final T object)
{
this.object = object;
}

// WARNING: it is entirely possible for a caller to hold onto the object and call "close", then still use that
// object even though it will be offered to someone else in BlockingPool.take
@Override
public T get()
{
if (closed.get()) {
throw new ISE("Already Closed!");
}

return object;
}

@Override
public void close()
{
if (!closed.compareAndSet(false, true)) {
log.warn(new ISE("Already Closed!"), "Already closed");
return;
}
if (!objects.offer(object)) {
throw new ISE("WTF?! Queue offer failed");
}
}

@Override
protected void finalize() throws Throwable
{
if (closed.compareAndSet(false, true)) {
log.warn("Not closed! Object was[%s]. Returning to pool.", object);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible here to try to return the same object to pool?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean?

if (!objects.offer(object)) {
log.error("WTF?! Queue offer failed during finalize, uh oh...");
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,9 @@ public T get()

/**
* Not idempotent, should only be called once when done using the resource
*
* @throws IOException
*/
@Override
public void close() throws IOException
public void close()
{
// ensures count always gets adjusted while item is removed from the queue
synchronized (this) {
Expand Down
3 changes: 3 additions & 0 deletions common/src/main/java/io/druid/collections/ResourceHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@
public interface ResourceHolder<T> extends Closeable
{
T get();

@Override
void close();
}
2 changes: 1 addition & 1 deletion common/src/main/java/io/druid/collections/StupidPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public T get()
}

@Override
public void close() throws IOException
public void close()
{
if (!closed.compareAndSet(false, true)) {
log.warn(new ISE("Already Closed!"), "Already closed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public T get()
}

@Override
public void close() throws IOException
public void close()
{
// Do nothing
}
Expand Down
12 changes: 7 additions & 5 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,26 @@ Druid broker can optionally retry queries internally for transient errors.

The broker uses processing configs for nested groupBy queries. And, optionally, Long-interval queries (of any type) can be broken into shorter interval queries and processed in parallel inside this thread pool. For more details, see "chunkPeriod" in [Query Context](../querying/query-context.html) doc.


|Property|Description|Default|
|--------|-----------|-------|
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|

The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command
line.

#### General Query Configuration

##### GroupBy Query Config

|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows. This can be lowered at query time by `maxIntermediateRows` attribute in query context.|50000|
|`druid.query.groupBy.maxResults`|Maximum number of results. This can be lowered at query time by `maxResults` attribute in query context.|500000|
See [groupBy server configuration](../querying/groupbyquery.html#server-configuration).

##### Search Query Config

Expand Down
Loading