Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,7 @@ You can optionally configure caching to be enabled on the peons by setting cachi
|`druid.realtime.cache.useCache`|true, false|Enable the cache on the realtime.|false|
|`druid.realtime.cache.populateCache`|true, false|Populate the cache on the realtime.|false|
|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|`druid.realtime.cache.maxEntrySize`|Maximum cache entry size in bytes.|1_000_000|
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description (or the more general description on the common page) should explain what happens if the serialized form of a query result is bigger than this size. I suppose it is "the result is not recorded in the cache; XXX/put/oversized metric is incremented" (I don't know what should go into XXX, this question should be researched.)

Copy link
Copy Markdown
Contributor

@gianm gianm Jul 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens is "the result is not recorded in the cache; cache */put/oversized metrics are incremented."

The */put/oversized syntax would match how they are described in metrics.md, so I think that'd be the right way to write it in user-facing docs. The * could be query/cache/delta or query/cache/total, also mentioned in metrics.md.


See [cache configuration](#cache-configuration) for how to configure cache settings.

Expand Down Expand Up @@ -1319,6 +1320,7 @@ You can optionally only configure caching to be enabled on the Historical by set
|`druid.historical.cache.useCache`|true, false|Enable the cache on the Historical.|false|
|`druid.historical.cache.populateCache`|true, false|Populate the cache on the Historical.|false|
|`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
|`druid.historical.cache.maxEntrySize`|Maximum cache entry size in bytes.|1_000_000|

See [cache configuration](#cache-configuration) for how to configure cache settings.

Expand Down Expand Up @@ -1452,6 +1454,7 @@ You can optionally only configure caching to be enabled on the Broker by setting
|`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`|
|`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|
|`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the Historicals|`Integer.MAX_VALUE`|
|`druid.broker.cache.maxEntrySize`|Maximum cache entry size in bytes.|1_000_000|

See [cache configuration](#cache-configuration) for how to configure cache settings.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,42 +25,44 @@
import java.util.concurrent.ExecutorService;

/**
* An interface that defines the nitty gritty implementation detauls of a Query on a Segment
* An interface that defines the nitty gritty implementation details of a Query on a Segment
*/
@ExtensionPoint
public interface QueryRunnerFactory<T, QueryType extends Query<T>>
{
/**
* Given a specific segment, this method will create a QueryRunner.
* Given a specific segment, this method will create a {@link QueryRunner}.
*
* The QueryRunner, when asked, will generate a Sequence of results based on the given segment. This
* is the meat of the query processing and is where the results are actually generated. Everything else
* is just merging and reduction logic.
* The {@link QueryRunner}, when asked, will generate a {@link org.apache.druid.java.util.common.guava.Sequence} of
* results based on the given segment. This is the meat of the {@link Query} processing and is where the results are
* actually generated. Everything else is just merging and reduction logic.
*
* @param segment The segment to process
* @return A QueryRunner that, when asked, will generate a Sequence of results based on the given segment
* @param segment The segment to process
* @return A {@link QueryRunner} that, when asked, will generate a
* {@link org.apache.druid.java.util.common.guava.Sequence} of results based on the given segment
*/
QueryRunner<T> createRunner(Segment segment);

/**
* Runners generated with createRunner() and combined into an Iterable in (time,shardId) order are passed
* along to this method with an ExecutorService. The method should then return a QueryRunner that, when
* asked, will use the ExecutorService to run the base QueryRunners in some fashion.
* along to this method with an {@link ExecutorService}. The method should then return a {@link QueryRunner} that,
* when asked, will use the {@link ExecutorService} to run the base QueryRunners in some fashion.
*
* The vast majority of the time, this should be implemented with
* The vast majority of the time, this should be implemented with {@link ChainedExecutionQueryRunner}:
*
* return new ChainedExecutionQueryRunner<>(queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners);
*
* Which will allow for parallel execution up to the maximum number of processing threads allowed.
*
* @param queryExecutor ExecutorService to be used for parallel processing
* @param queryRunners Individual QueryRunner objects that produce some results
* @return a QueryRunner that, when asked, will use the ExecutorService to run the base QueryRunners
* @param queryExecutor {@link ExecutorService} to be used for parallel processing
* @param queryRunners Individual {@link QueryRunner} objects that produce some results
* @return a {@link QueryRunner} that, when asked, will use the {@link ExecutorService} to run the base
* {@link QueryRunner} collection.
*/
QueryRunner<T> mergeRunners(ExecutorService queryExecutor, Iterable<QueryRunner<T>> queryRunners);

/**
* Provides access to the toolchest for this specific query type.
* Provides access to the {@link QueryToolChest} for this specific {@link Query} type.
*
* @return an instance of the toolchest for this specific query type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
import java.util.concurrent.ExecutorService;
import java.util.function.Function;

/**
* {@link CachePopulator} implementation that uses a {@link ExecutorService} thread pool to populate a cache in the
* background. Used if config "druid.*.cache.numBackgroundThreads" is greater than 0.
*/
public class BackgroundCachePopulator implements CachePopulator
{
private static final Logger log = new Logger(BackgroundCachePopulator.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@

import java.util.function.Function;

/**
* Abstraction of mechanism for populating a {@link Cache} by wrapping a {@link Sequence} and providing a function to
* extract the values from it. At runtime, the {@link CachePopulator} implementation is used as a singleton and
* injected where needed to share between all cacheables, which requires the {@link Cache} itself to be thread-safe.
*
* Consumers of the {@link Sequence} will either be a processing thread (in the case of a historical or task), or
* an http thread in the case of a broker. See:
*
* historicals: {@link org.apache.druid.server.coordination.ServerManager} and
* {@link org.apache.druid.client.CachingQueryRunner}
*
* realtime tasks: {@link org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker} and
* {@link org.apache.druid.client.CachingQueryRunner}
*
* brokers: {@link org.apache.druid.server.ClientQuerySegmentWalker} and
* {@link org.apache.druid.client.CachingClusteredClient}
*
* for additional details
*/
public interface CachePopulator
{
<T, CacheType> Sequence<T> wrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import java.util.concurrent.atomic.AtomicLong;

/**
* Thread safe collector of {@link CachePopulator} statistics, utilized {@link CacheMonitor} to emit cache metrics.
* Like the {@link CachePopulator}, this is used as a singleton.
*
* See {@link org.apache.druid.guice.DruidProcessingModule#getCachePopulator} which supplies either
* {@link ForegroundCachePopulator} or {@link BackgroundCachePopulator}, as configured, for more details.
*/
public class CachePopulatorStats
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,24 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

/**
* {@link CachePopulator} implementation that populates a cache on the same thread that is processing the
* {@link Sequence}. Used if config "druid.*.cache.numBackgroundThreads" is 0 (the default).
*/
public class ForegroundCachePopulator implements CachePopulator
{
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide concurrent access documentation for this class (or for CachePopulator in general, and make ForegroundCachePopulator's Javadoc just refer to its superclass's Javadoc), and/or describe the concurrent control or data flow.

In particular, this passage from the PR description:

since CachePopulator is shared by all processing threads but the lock covers bytes and json generator that are tied to a specific segment which should only be happening in a single thread.

is not obvious, but it's not reflected anywhere in the code itself.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there isn't so much concurrent access to document anymore since all concurrency primitives have been removed from ForegroundCachePopulator, which I think previously qualified for the excessive thread safety section. I have tried to update the javadocs of CachePopulator to describe it's usage and how the sequence is consumed, so hopefully at least I've left it better than I found it.

private static final Logger log = new Logger(ForegroundCachePopulator.class);

private final Object lock = new Object();
private final ObjectMapper objectMapper;
private final CachePopulatorStats cachePopulatorStats;
private final long maxEntrySize;
Expand All @@ -61,7 +64,7 @@ public <T, CacheType> Sequence<T> wrap(
)
{
final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
final AtomicBoolean tooBig = new AtomicBoolean(false);
final MutableBoolean tooBig = new MutableBoolean(false);
final JsonGenerator jsonGenerator;

try {
Expand All @@ -75,21 +78,19 @@ public <T, CacheType> Sequence<T> wrap(
Sequences.map(
sequence,
input -> {
if (!tooBig.get()) {
synchronized (lock) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock removal is the core change happening, and it looks good to me. The lock seems to have been pointless, since the only thing worth protecting is the ByteArrayOutputStream, and that's only being accessed by a single thread (the thread that walks the Sequence).

try {
jsonGenerator.writeObject(cacheFn.apply(input));
if (!tooBig.isTrue()) {
try {
jsonGenerator.writeObject(cacheFn.apply(input));

// Not flushing jsonGenerator before checking this, but should be ok since Jackson buffers are
// typically just a few KB, and we don't want to waste cycles flushing.
if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
tooBig.set(true);
}
}
catch (IOException e) {
throw new RuntimeException(e);
// Not flushing jsonGenerator before checking this, but should be ok since Jackson buffers are
// typically just a few KB, and we don't want to waste cycles flushing.
if (maxEntrySize > 0 && bytes.size() > maxEntrySize) {
tooBig.setValue(true);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

return input;
Expand All @@ -100,24 +101,22 @@ public <T, CacheType> Sequence<T> wrap(
@Override
public void after(final boolean isDone, final Throwable thrown) throws Exception
{
synchronized (lock) {
jsonGenerator.close();
jsonGenerator.close();

if (isDone) {
// Check tooBig, then check maxEntrySize one more time, after closing/flushing jsonGenerator.
if (tooBig.get() || (maxEntrySize > 0 && bytes.size() > maxEntrySize)) {
cachePopulatorStats.incrementOversized();
return;
}
if (isDone) {
// Check tooBig, then check maxEntrySize one more time, after closing/flushing jsonGenerator.
if (tooBig.isTrue() || (maxEntrySize > 0 && bytes.size() > maxEntrySize)) {
cachePopulatorStats.incrementOversized();
return;
}

try {
cache.put(cacheKey, bytes.toByteArray());
cachePopulatorStats.incrementOk();
}
catch (Exception e) {
log.warn(e, "Unable to write to cache");
cachePopulatorStats.incrementError();
}
try {
cache.put(cacheKey, bytes.toByteArray());
cachePopulatorStats.incrementOk();
}
catch (Exception e) {
log.warn(e, "Unable to write to cache");
cachePopulatorStats.incrementError();
}
}
}
Expand Down