Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/content/configuration/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ layout: doc_page

# Caching

Caching can optionally be enabled on the broker and / or historical nodes.
See the [broker](broker.html#caching) and [historical](historical.html#caching)
Caching can optionally be enabled on the broker, historical, and realtime
nodes, as well as realtime index tasks. See [broker](broker.html#caching),
[historical](historical.html#caching), and [realtime](realtime.html#caching)
configuration options for how to enable it for individual node types.

Druid uses a local in-memory cache by default, unless a diffrent type of cache is specified.
Expand Down
8 changes: 4 additions & 4 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,14 @@ This deep storage is used to interface with Cassandra.

### Caching

You can enable caching of results at the broker/historical using following configurations.
You can enable caching of results at the broker, historical, or realtime level using following configurations.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.cache.type`|`local`, `memcached`|The type of cache to use for queries.|`local`|
|`druid.(broker/historical).cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
|`druid.(broker/historical).cache.useCache`|Whether to use cache for getting query results.|false|
|`druid.(broker/historical).cache.populateCache`|Whether to populate cache.|false|
|`druid.(broker|historical|realtime).cache.unCacheable`|All druid query types|All query types to not cache.|["groupBy", "select"]|
|`druid.(broker|historical|realtime).cache.useCache`|Whether to use cache for getting query results.|false|
|`druid.(broker|historical|realtime).cache.populateCache`|Whether to populate cache.|false|

#### Local Cache

Expand Down
9 changes: 9 additions & 0 deletions docs/content/configuration/realtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,14 @@ The realtime node uses several of the global configs in [Configuration](../confi
|--------|-----------|-------|
|`druid.query.search.maxSearchLimit`|Maximum number of search results to return.|1000|

### Caching

You can optionally configure caching to be enabled on the realtime node by setting caching configs here.

|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.realtime.cache.useCache`|true, false|Enable the cache on the realtime.|false|
|`druid.realtime.cache.populateCache`|true, false|Populate the cache on the realtime.|false|
|`druid.realtime.cache.unCacheable`|All druid query types|All query types to not cache.|`["groupBy", "select"]`|

See [cache configuration](caching.html) for how to configure cache settings.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
Expand Down Expand Up @@ -75,6 +77,9 @@ public class TaskToolbox
private final File taskWorkDir;
private final IndexMerger indexMerger;
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;


public TaskToolbox(
TaskConfig config,
Expand All @@ -94,7 +99,9 @@ public TaskToolbox(
ObjectMapper objectMapper,
File taskWorkDir,
IndexMerger indexMerger,
IndexIO indexIO
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig
)
{
this.config = config;
Expand All @@ -115,6 +122,8 @@ public TaskToolbox(
this.taskWorkDir = taskWorkDir;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
}

public TaskConfig getConfig()
Expand Down Expand Up @@ -227,4 +236,14 @@ public IndexMerger getIndexMerger()
{
return indexMerger;
}

public Cache getCache()
{
return cache;
}

public CacheConfig getCacheConfig()
{
return cacheConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.guice.annotations.Processing;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
Expand Down Expand Up @@ -60,6 +62,8 @@ public class TaskToolboxFactory
private final ObjectMapper objectMapper;
private final IndexMerger indexMerger;
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;

@Inject
public TaskToolboxFactory(
Expand All @@ -78,7 +82,9 @@ public TaskToolboxFactory(
SegmentLoaderFactory segmentLoaderFactory,
ObjectMapper objectMapper,
IndexMerger indexMerger,
IndexIO indexIO
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig
)
{
this.config = config;
Expand All @@ -97,6 +103,8 @@ public TaskToolboxFactory(
this.objectMapper = objectMapper;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
}

public TaskToolbox build(Task task)
Expand All @@ -121,7 +129,9 @@ public TaskToolbox build(Task task)
objectMapper,
taskWorkDir,
indexMerger,
indexIO
indexIO,
cache,
cacheConfig
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ public String getVersion(final Interval interval)
toolbox.getNewSegmentServerView(),
toolbox.getQueryExecutorService(),
toolbox.getIndexMerger(),
toolbox.getIndexIO()
toolbox.getIndexIO(),
toolbox.getCache(),
toolbox.getCacheConfig(),
toolbox.getObjectMapper()
);

this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
Expand Down Expand Up @@ -74,6 +76,8 @@ public class TaskToolboxTest
private Task task = EasyMock.createMock(Task.class);
private IndexMerger mockIndexMerger = EasyMock.createMock(IndexMerger.class);
private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class);
private Cache mockCache = EasyMock.createMock(Cache.class);
private CacheConfig mockCacheConfig = EasyMock.createMock(CacheConfig.class);

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
Expand All @@ -100,7 +104,9 @@ public void setUp() throws IOException
new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager),
ObjectMapper,
mockIndexMerger,
mockIndexIO
mockIndexIO,
mockCache,
mockCacheConfig
);
}

Expand Down Expand Up @@ -180,4 +186,16 @@ public void testGetDataSegmentMover()
{
Assert.assertEquals(mockDataSegmentMover, taskToolbox.build(task).getDataSegmentMover());
}

@Test
public void testGetCache() throws Exception
{
Assert.assertEquals(mockCache, taskToolbox.build(task).getCache());
}

@Test
public void testGetCacheConfig() throws Exception
{
Assert.assertEquals(mockCacheConfig, taskToolbox.build(task).getCacheConfig());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public DataSegment push(File file, DataSegment segment) throws IOException
return segment;
}
}, null, null, null, null, null, null, null, null, null, null, temporaryFolder.newFolder(),
indexMerger, indexIO
indexMerger, indexIO, null, null
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ public List<StorageLocationConfig> getLocations()
),
MAPPER,
INDEX_MERGER,
INDEX_IO
INDEX_IO,
null,
null
);
Collection<Object[]> values = new LinkedList<>();
for (InputRowParser parser : Arrays.<InputRowParser>asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ public List<StorageLocationConfig> getLocations()
),
MAPPER,
INDEX_MERGER,
INDEX_IO
INDEX_IO,
null,
null
);
final Injector injector = Guice.createInjector(
new Module()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.client.cache.MapCache;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
Expand All @@ -67,6 +68,7 @@
import io.druid.indexing.common.task.IndexTask;
import io.druid.indexing.common.task.KillTask;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.RealtimeIndexTaskTest;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.TaskQueueConfig;
Expand Down Expand Up @@ -94,6 +96,7 @@
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentTest;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
Expand Down Expand Up @@ -484,7 +487,9 @@ public List<StorageLocationConfig> getLocations()
),
MAPPER,
INDEX_MERGER,
INDEX_IO
INDEX_IO,
MapCache.create(0),
FireDepartmentTest.NO_CACHE_CONFIG
);
tr = new ThreadPoolTaskRunner(tb, null);
tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ public List<StorageLocationConfig> getLocations()
),
jsonMapper,
indexMerger,
indexIO
indexIO,
null,
null
),
null
),
Expand Down
13 changes: 9 additions & 4 deletions processing/src/test/java/io/druid/segment/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public class TestHelper
private static final IndexMerger INDEX_MERGER;
private static final IndexMaker INDEX_MAKER;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();

static {
ObjectMapper jsonMapper = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
jsonMapper,
JSON_MAPPER,
new ColumnConfig()
{
@Override
Expand All @@ -50,10 +50,11 @@ public int columnCacheSizeBytes()
}
}
);
INDEX_MERGER = new IndexMerger(jsonMapper, INDEX_IO);
INDEX_MAKER = new IndexMaker(jsonMapper, INDEX_IO);
INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO);
INDEX_MAKER = new IndexMaker(JSON_MAPPER, INDEX_IO);
}


public static IndexMerger getTestIndexMerger()
{
return INDEX_MERGER;
Expand All @@ -69,6 +70,10 @@ public static IndexIO getTestIndexIO()
return INDEX_IO;
}

public static ObjectMapper getObjectMapper() {
return JSON_MAPPER;
}

public static <T> void assertExpectedResults(Iterable<Result<T>> expectedResults, Sequence<Result<T>> results)
{
assertResults(expectedResults, Sequences.toList(results, Lists.<Result<T>>newArrayList()), "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package io.druid.segment.realtime.plumber;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.concurrent.Execs;
import io.druid.query.QueryRunnerFactoryConglomerate;
Expand Down Expand Up @@ -63,7 +66,11 @@ public FlushingPlumber(
DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService,
IndexMerger indexMerger,
IndexIO indexIO
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig,
ObjectMapper objectMapper

)
{
super(
Expand All @@ -78,7 +85,10 @@ public FlushingPlumber(
null,
null,
indexMerger,
indexIO
indexIO,
cache,
cacheConfig,
objectMapper
);

this.flushDuration = flushDuration;
Expand Down
Loading