diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 28d434614c39..073b512157fd 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -1176,6 +1176,7 @@ You can optionally configure caching to be enabled on the peons by setting cachi |`druid.realtime.cache.useCache`|true, false|Enable the cache on the realtime.|false| |`druid.realtime.cache.populateCache`|true, false|Populate the cache on the realtime.|false| |`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`| +|`druid.realtime.cache.maxEntrySize`|Maximum cache entry size in bytes.|1_000_000| See [cache configuration](#cache-configuration) for how to configure cache settings. @@ -1319,6 +1320,7 @@ You can optionally only configure caching to be enabled on the Historical by set |`druid.historical.cache.useCache`|true, false|Enable the cache on the Historical.|false| |`druid.historical.cache.populateCache`|true, false|Populate the cache on the Historical.|false| |`druid.historical.cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]| +|`druid.historical.cache.maxEntrySize`|Maximum cache entry size in bytes.|1_000_000| See [cache configuration](#cache-configuration) for how to configure cache settings. @@ -1452,6 +1454,7 @@ You can optionally only configure caching to be enabled on the Broker by setting |`druid.broker.cache.resultLevelCacheLimit`|positive integer|Maximum size of query response that can be cached.|`Integer.MAX_VALUE`| |`druid.broker.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`| |`druid.broker.cache.cacheBulkMergeLimit`|positive integer or 0|Queries with more segments than this number will not attempt to fetch from cache at the broker level, leaving potential caching fetches (and cache result merging) to the Historicals|`Integer.MAX_VALUE`| +|`druid.broker.cache.maxEntrySize`|Maximum cache entry size in bytes.|1_000_000| See [cache configuration](#cache-configuration) for how to configure cache settings. diff --git a/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java index fc4e5a69a18f..0832fb13ccae 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/QueryRunnerFactory.java @@ -25,42 +25,44 @@ import java.util.concurrent.ExecutorService; /** - * An interface that defines the nitty gritty implementation detauls of a Query on a Segment + * An interface that defines the nitty gritty implementation details of a Query on a Segment */ @ExtensionPoint public interface QueryRunnerFactory> { /** - * Given a specific segment, this method will create a QueryRunner. + * Given a specific segment, this method will create a {@link QueryRunner}. * - * The QueryRunner, when asked, will generate a Sequence of results based on the given segment. This - * is the meat of the query processing and is where the results are actually generated. Everything else - * is just merging and reduction logic. + * The {@link QueryRunner}, when asked, will generate a {@link org.apache.druid.java.util.common.guava.Sequence} of + * results based on the given segment. This is the meat of the {@link Query} processing and is where the results are + * actually generated. Everything else is just merging and reduction logic. * - * @param segment The segment to process - * @return A QueryRunner that, when asked, will generate a Sequence of results based on the given segment + * @param segment The segment to process + * @return A {@link QueryRunner} that, when asked, will generate a + * {@link org.apache.druid.java.util.common.guava.Sequence} of results based on the given segment */ QueryRunner createRunner(Segment segment); /** * Runners generated with createRunner() and combined into an Iterable in (time,shardId) order are passed - * along to this method with an ExecutorService. The method should then return a QueryRunner that, when - * asked, will use the ExecutorService to run the base QueryRunners in some fashion. + * along to this method with an {@link ExecutorService}. The method should then return a {@link QueryRunner} that, + * when asked, will use the {@link ExecutorService} to run the base QueryRunners in some fashion. * - * The vast majority of the time, this should be implemented with + * The vast majority of the time, this should be implemented with {@link ChainedExecutionQueryRunner}: * * return new ChainedExecutionQueryRunner<>(queryExecutor, toolChest.getOrdering(), queryWatcher, queryRunners); * * Which will allow for parallel execution up to the maximum number of processing threads allowed. * - * @param queryExecutor ExecutorService to be used for parallel processing - * @param queryRunners Individual QueryRunner objects that produce some results - * @return a QueryRunner that, when asked, will use the ExecutorService to run the base QueryRunners + * @param queryExecutor {@link ExecutorService} to be used for parallel processing + * @param queryRunners Individual {@link QueryRunner} objects that produce some results + * @return a {@link QueryRunner} that, when asked, will use the {@link ExecutorService} to run the base + * {@link QueryRunner} collection. */ QueryRunner mergeRunners(ExecutorService queryExecutor, Iterable> queryRunners); /** - * Provides access to the toolchest for this specific query type. + * Provides access to the {@link QueryToolChest} for this specific {@link Query} type. * * @return an instance of the toolchest for this specific query type. */ diff --git a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java index 0e78e994fd04..3b7f30dac676 100644 --- a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java +++ b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java @@ -38,6 +38,10 @@ import java.util.concurrent.ExecutorService; import java.util.function.Function; +/** + * {@link CachePopulator} implementation that uses a {@link ExecutorService} thread pool to populate a cache in the + * background. Used if config "druid.*.cache.numBackgroundThreads" is greater than 0. + */ public class BackgroundCachePopulator implements CachePopulator { private static final Logger log = new Logger(BackgroundCachePopulator.class); diff --git a/server/src/main/java/org/apache/druid/client/cache/CachePopulator.java b/server/src/main/java/org/apache/druid/client/cache/CachePopulator.java index 0d67cb9a0eca..dbbade1cf6a0 100644 --- a/server/src/main/java/org/apache/druid/client/cache/CachePopulator.java +++ b/server/src/main/java/org/apache/druid/client/cache/CachePopulator.java @@ -23,6 +23,25 @@ import java.util.function.Function; +/** + * Abstraction of mechanism for populating a {@link Cache} by wrapping a {@link Sequence} and providing a function to + * extract the values from it. At runtime, the {@link CachePopulator} implementation is used as a singleton and + * injected where needed to share between all cacheables, which requires the {@link Cache} itself to be thread-safe. + * + * Consumers of the {@link Sequence} will either be a processing thread (in the case of a historical or task), or + * an http thread in the case of a broker. See: + * + * historicals: {@link org.apache.druid.server.coordination.ServerManager} and + * {@link org.apache.druid.client.CachingQueryRunner} + * + * realtime tasks: {@link org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker} and + * {@link org.apache.druid.client.CachingQueryRunner} + * + * brokers: {@link org.apache.druid.server.ClientQuerySegmentWalker} and + * {@link org.apache.druid.client.CachingClusteredClient} + * + * for additional details + */ public interface CachePopulator { Sequence wrap( diff --git a/server/src/main/java/org/apache/druid/client/cache/CachePopulatorStats.java b/server/src/main/java/org/apache/druid/client/cache/CachePopulatorStats.java index cfe4587fef7d..e3e768b1c42b 100644 --- a/server/src/main/java/org/apache/druid/client/cache/CachePopulatorStats.java +++ b/server/src/main/java/org/apache/druid/client/cache/CachePopulatorStats.java @@ -22,6 +22,11 @@ import java.util.concurrent.atomic.AtomicLong; /** + * Thread safe collector of {@link CachePopulator} statistics, utilized {@link CacheMonitor} to emit cache metrics. + * Like the {@link CachePopulator}, this is used as a singleton. + * + * See {@link org.apache.druid.guice.DruidProcessingModule#getCachePopulator} which supplies either + * {@link ForegroundCachePopulator} or {@link BackgroundCachePopulator}, as configured, for more details. */ public class CachePopulatorStats { diff --git a/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java b/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java index 5ace591c099c..0bdeb3a49125 100644 --- a/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java +++ b/server/src/main/java/org/apache/druid/client/cache/ForegroundCachePopulator.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.SequenceWrapper; import org.apache.druid.java.util.common.guava.Sequences; @@ -29,14 +30,16 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +/** + * {@link CachePopulator} implementation that populates a cache on the same thread that is processing the + * {@link Sequence}. Used if config "druid.*.cache.numBackgroundThreads" is 0 (the default). + */ public class ForegroundCachePopulator implements CachePopulator { private static final Logger log = new Logger(ForegroundCachePopulator.class); - private final Object lock = new Object(); private final ObjectMapper objectMapper; private final CachePopulatorStats cachePopulatorStats; private final long maxEntrySize; @@ -61,7 +64,7 @@ public Sequence wrap( ) { final ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - final AtomicBoolean tooBig = new AtomicBoolean(false); + final MutableBoolean tooBig = new MutableBoolean(false); final JsonGenerator jsonGenerator; try { @@ -75,21 +78,19 @@ public Sequence wrap( Sequences.map( sequence, input -> { - if (!tooBig.get()) { - synchronized (lock) { - try { - jsonGenerator.writeObject(cacheFn.apply(input)); + if (!tooBig.isTrue()) { + try { + jsonGenerator.writeObject(cacheFn.apply(input)); - // Not flushing jsonGenerator before checking this, but should be ok since Jackson buffers are - // typically just a few KB, and we don't want to waste cycles flushing. - if (maxEntrySize > 0 && bytes.size() > maxEntrySize) { - tooBig.set(true); - } - } - catch (IOException e) { - throw new RuntimeException(e); + // Not flushing jsonGenerator before checking this, but should be ok since Jackson buffers are + // typically just a few KB, and we don't want to waste cycles flushing. + if (maxEntrySize > 0 && bytes.size() > maxEntrySize) { + tooBig.setValue(true); } } + catch (IOException e) { + throw new RuntimeException(e); + } } return input; @@ -100,24 +101,22 @@ public Sequence wrap( @Override public void after(final boolean isDone, final Throwable thrown) throws Exception { - synchronized (lock) { - jsonGenerator.close(); + jsonGenerator.close(); - if (isDone) { - // Check tooBig, then check maxEntrySize one more time, after closing/flushing jsonGenerator. - if (tooBig.get() || (maxEntrySize > 0 && bytes.size() > maxEntrySize)) { - cachePopulatorStats.incrementOversized(); - return; - } + if (isDone) { + // Check tooBig, then check maxEntrySize one more time, after closing/flushing jsonGenerator. + if (tooBig.isTrue() || (maxEntrySize > 0 && bytes.size() > maxEntrySize)) { + cachePopulatorStats.incrementOversized(); + return; + } - try { - cache.put(cacheKey, bytes.toByteArray()); - cachePopulatorStats.incrementOk(); - } - catch (Exception e) { - log.warn(e, "Unable to write to cache"); - cachePopulatorStats.incrementError(); - } + try { + cache.put(cacheKey, bytes.toByteArray()); + cachePopulatorStats.incrementOk(); + } + catch (Exception e) { + log.warn(e, "Unable to write to cache"); + cachePopulatorStats.incrementError(); } } }