Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions common/src/main/java/io/druid/collections/StupidPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,24 @@ public class StupidPool<T>

private final Queue<T> objects = new ConcurrentLinkedQueue<>();

//note that this is just the max entries in the cache, pool can still create as many buffers as needed.
private final int objectsCacheMaxCount;

public StupidPool(
Supplier<T> generator
)
{
this.generator = generator;
this.objectsCacheMaxCount = Integer.MAX_VALUE;
}

public StupidPool(
Supplier<T> generator,
int objectsCacheMaxCount
)
{
this.generator = generator;
this.objectsCacheMaxCount = objectsCacheMaxCount;
}

public ResourceHolder<T> take()
Expand Down Expand Up @@ -80,8 +93,12 @@ public void close() throws IOException
log.warn(new ISE("Already Closed!"), "Already closed");
return;
}
if (!objects.offer(object)) {
log.warn(new ISE("Queue offer failed"), "Could not offer object [%s] back into the queue", object);
if (objects.size() < objectsCacheMaxCount) {
if (!objects.offer(object)) {
log.warn(new ISE("Queue offer failed"), "Could not offer object [%s] back into the queue", object);
}
} else {
log.debug("cache num entries is exceeding max limit [%s]", objectsCacheMaxCount);
}
}

Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally,
|Property|Description|Default|
|--------|-----------|-------|
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration/historical.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Druid uses Jetty to serve HTTP requests.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ public int intermediateComputeSizeBytes()
return 1024 * 1024 * 1024;
}

@Config({"druid.computation.buffer.poolCacheMaxCount", "${base_path}.buffer.poolCacheMaxCount"})
public int poolCacheMaxCount()
{
return Integer.MAX_VALUE;
}

@Override @Config(value = "${base_path}.numThreads")
public int getNumThreads()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;

import java.nio.ByteBuffer;
Expand All @@ -45,7 +46,6 @@ public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> creat
final GroupByQuery query,
final GroupByQueryConfig config,
StupidPool<ByteBuffer> bufferPool

)
{
final QueryGranularity gran = query.getGranularity();
Expand Down Expand Up @@ -77,15 +77,30 @@ public String apply(DimensionSpec input)
}
}
);
final IncrementalIndex index = new OnheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
config.getMaxResults()
);
final IncrementalIndex index;

if (query.getContextValue("useOffheap", false)) {
index = new OffheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
config.getMaxResults(),
bufferPool
);
} else {
index = new OnheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
config.getMaxResults()
);
}

Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
{
Expand Down
Loading