Please note that this is an experimental module and the development/testing still at early stage. Feel free to try it and give us your feedback.
+
+## Description
+This module provides a per-lookup caching mechanism for JDBC data sources.
+The main goal of this cache is to speed up the access to a high latency lookup sources and to provide a caching isolation for every lookup source.
+Thus user can define various caching strategies or and implementation per lookup, even if the source is the same.
+This module can be used side to side with other lookup module like the global cached lookup module.
+
+To use this extension please make sure to [include](../../operations/including-extensions.html) `druid-lookups-cached-single` as an extension.
+
+## Architecture
+Generally speaking this module can be divided into two main component, namely, the data fetcher layer and caching layer.
+
+### Data Fetcher layer
+
+First part is the data fetcher layer API `DataFetcher`, that exposes a set of fetch methods to fetch data from the actual Lookup dimension source.
+For instance `JdbcDataFetcher` provides an implementation of `DataFetcher` that can be used to fetch key/value from a RDBMS via JDBC driver.
+If you need new type of data fetcher, all you need to do, is to implement the interface `DataFetcher` and load it via another druid module.
+### Caching layer
+
+This extension comes with two different caching strategies. First strategy is a poll based and the second is a load based.
+#### Poll lookup cache
+
+The poll strategy cache strategy will fetch and swap all the pair of key/values periodically from the lookup source.
+Hence, user should make sure that the cache can fit all the data.
+The current implementation provides 2 type of poll cache, the first is onheap (uses immutable map), while the second uses MapBD based offheap map.
+User can also implement a different lookup polling cache by implementing `PollingCacheFactory` and `PollingCache` interfaces.
+
+#### Loading lookup
+Loading cache strategy will load the key\value pair upon request on the key it self, the general algorithm is load key if absent.
+Once the key/value pair is loaded eviction will occur according to the cache eviction policy.
+This module comes with two loading lookup implementation, the first is onheap backed by a Guava cache implementation, the second is MapDB offheap implementation.
+Both implementations offer various eviction strategies.
+Same for Loading cache, developer can implement a new type of loading cache by implementing `LookupLoadingCache` interface.
+
+## Configuration and Operation:
+
+
+### Polling Lookup
+
+**Note that the current implementation of `offHeapPolling` and `onHeapPolling` will create two caches one to lookup value based on key and the other to reverse lookup the key from value**
+
+|Field|Type|Description|Required|default|
+|-----|----|-----------|--------|-------|
+|dataFetcher|Json object|Specifies the lookup data fetcher type to use in order to fetch data|yes|null|
+|cacheFactory|Json Object|Cache factory implementation|no |onHeapPolling|
+|pollPeriod|Period|polling period |no |null (poll once)|
+
+
+##### Example of Polling On-heap Lookup
+This example demonstrates a polling cache that will update its on-heap cache every 10 minutes
+```json
+{
+ "type":"pollingLookup",
+ "pollPeriod":"PT10M",
+ "dataFetcher":{ "type":"jdbcDataFetcher", "connectorConfig":"jdbc://mysql://localhost:3306/my_data_base", "table":"lookup_table_name", "keyColumn":"key_column_name", "valueColumn": "value_column_name"},
+ "cacheFactory":{"type":"onHeapPolling"}
+}
+
+```
+
+##### Example Polling Off-heap Lookup
+This example demonstrates an off-heap lookup that will be cached once and never swapped `(pollPeriod == null)`
+
+```json
+{
+ "type":"pollingLookup",
+ "dataFetcher":{ "type":"jdbcDataFetcher", "connectorConfig":"jdbc://mysql://localhost:3306/my_data_base", "table":"lookup_table_name", "keyColumn":"key_column_name", "valueColumn": "value_column_name"},
+ "cacheFactory":{"type":"offHeapPolling"}
+}
+
+```
+
+
+### Loading lookup
+
+|Field|Type|Description|Required|default|
+|-----|----|-----------|--------|-------|
+|dataFetcher|Json object|Specifies the lookup data fetcher type to use in order to fetch data|yes|null|
+|loadingCacheSpec|Json Object|Lookup cache spec implementation|yes |null|
+|reverseLoadingCacheSpec|Json Object| Reverse lookup cache implementation|yes |null|
+
+
+##### Example Loading On-heap Guava
+
+Guava cache configuration spec.
+
+|Field|Type|Description|Required|default|
+|-----|----|-----------|--------|-------|
+|concurrencyLevel|int|Allowed concurrency among update operations|no|4|
+|initialCapacity|int|Initial capacity size|no |null|
+|maximumSize|long| Specifies the maximum number of entries the cache may contain.|no |null (infinite capacity)|
+|expireAfterAccess|long| Specifies the eviction time after last read in milliseconds.|no |null (No read-time-based eviction when set to null)|
+|expireAfterWrite|long| Specifies the eviction time after last write in milliseconds.|no |null (No write-time-based eviction when set to null)|
+
+```json
+{
+ "type":"loadingLookup",
+ "dataFetcher":{ "type":"jdbcDataFetcher", "connectorConfig":"jdbc://mysql://localhost:3306/my_data_base", "table":"lookup_table_name", "keyColumn":"key_column_name", "valueColumn": "value_column_name"},
+ "loadingCacheSpec":{"type":"guava"},
+ "reverseLoadingCacheSpec":{"type":"guava", "maximumSize":500000, "expireAfterAccess":100000, "expireAfterAccess":10000}
+}
+```
+
+##### Example Loading Off-heap MapDB
+
+Off heap cache is backed by [MapDB](http://www.mapdb.org/) implementation. MapDB is using direct memory as memory pool, please take that into account when limiting the JVM direct memory setup.
+
+|Field|Type|Description|Required|default|
+|-----|----|-----------|--------|-------|
+|maxStoreSize|double|maximal size of store in GB, if store is larger entries will start expiring|no |0|
+|maxEntriesSize|long| Specifies the maximum number of entries the cache may contain.|no |0 (infinite capacity)|
+|expireAfterAccess|long| Specifies the eviction time after last read in milliseconds.|no |0 (No read-time-based eviction when set to null)|
+|expireAfterWrite|long| Specifies the eviction time after last write in milliseconds.|no |0 (No write-time-based eviction when set to null)|
+
+
+```json
+{
+ "type":"loadingLookup",
+ "dataFetcher":{ "type":"jdbcDataFetcher", "connectorConfig":"jdbc://mysql://localhost:3306/my_data_base", "table":"lookup_table_name", "keyColumn":"key_column_name", "valueColumn": "value_column_name"},
+ "loadingCacheSpec":{"type":"mapDb", "maxEntriesSize":100000},
+ "reverseLoadingCacheSpec":{"type":"mapDb", "maxStoreSize":5, "expireAfterAccess":100000, "expireAfterAccess":10000}
+}
+```
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index 97ce5429c469..91ac0add648a 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -29,6 +29,7 @@ Core extensions are maintained by Druid committers.
|druid-kafka-eight|Kafka ingest firehose (high level consumer).|[link](../development/extensions-core/kafka-eight-firehose.html)|
|druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)|
|druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)|
+|druid-lookups-cached-single| Per lookup caching module to support the use cases where a lookup need to be isolated from the global pool of lookups |[link](../development/extensions-core/druid-lookups.html)|
|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)|
|druid-stats|Statistics related module including variance and standard deviation.|[link](../development/extensions-core/stats.html)|
|mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)|
diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java
index 7a7ad0abba82..58887aa0902b 100644
--- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java
@@ -169,6 +169,6 @@ public Timestamp withHandle(Handle handle) throws Exception
}
);
return update.getTime();
- }
+ }
}
diff --git a/extensions-core/lookups-cached-single/pom.xml b/extensions-core/lookups-cached-single/pom.xml
new file mode 100644
index 000000000000..465ba4823687
--- /dev/null
+++ b/extensions-core/lookups-cached-single/pom.xml
@@ -0,0 +1,96 @@
+
+
+
+
+ 4.0.0
+ io.druid.extensions
+ druid-lookups-cached-single
+ druid-lookups-cached-single
+ Extension to rename Druid dimension values using lookups
+
+
+ io.druid
+ druid
+ 0.9.2-SNAPSHOT
+ ../../pom.xml
+
+
+
+
+ io.druid
+ druid-api
+ ${project.parent.version}
+ provided
+
+
+ io.druid
+ druid-processing
+ ${project.parent.version}
+ provided
+
+
+ io.druid
+ druid-server
+ ${project.parent.version}
+ provided
+
+
+ org.mapdb
+ mapdb
+
+
+ io.dropwizard
+ dropwizard-jdbi
+ 0.9.2
+
+
+ org.antlr
+ stringtemplate
+ 3.2
+
+
+
+ junit
+ junit
+ test
+
+
+ io.druid
+ druid-server
+ ${project.parent.version}
+ test-jar
+ test
+
+
+ org.easymock
+ easymock
+ test
+
+
+ io.druid
+ druid-processing
+ ${project.parent.version}
+ test-jar
+ test
+
+
+
+
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/DataFetcher.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/DataFetcher.java
new file mode 100644
index 000000000000..001d36d73cfb
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/DataFetcher.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package io.druid.server.lookup;
+
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.druid.server.lookup.jdbc.JdbcDataFetcher;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class will be used to pull data from a given lookup table.
+ *
+ * @param Keys type
+ * @param Values type
+ */
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "jdbcDataFetcher", value = JdbcDataFetcher.class)
+})
+public interface DataFetcher
+{
+ /**
+ * Function used to fetch all the pair of key-values from the lookup
+ * One use case of this method is to populate a polling cache {@link io.druid.server.lookup.cache.polling.PollingCache}
+ *
+ * @return Returns an {@link Iterable} of key-value pairs.
+ */
+ Iterable> fetchAll();
+
+ /**
+ * Function to perform a one item lookup.
+ * For instance this can be used by a loading cache to fetch the value in case of the cache doesn't have it
+ *
+ * @param key non-null key used to lookup the value
+ *
+ * @return Returns null if the key doesn't have an associated value, or the value if it exists
+ */
+ @Nullable V fetch(K key);
+
+ /**
+ * Function used to perform a bulk lookup
+ *
+ * @param keys used to lookup the respective values
+ *
+ * @return Returns a {@link Iterable} containing the pair of existing key-value.
+ */
+ Iterable> fetch(Iterable keys);
+
+ /**
+ * Function used to perform reverse lookup
+ *
+ * @param value use to fetch it's keys from the lookup table
+ *
+ * @return Returns a list of keys of the given {@code value} or an empty list {@link java.util.Collections.EmptyList}
+ */
+ List reverseFetchKeys(V value);
+
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookup.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookup.java
new file mode 100644
index 000000000000..4a6b19b27ab1
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookup.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup;
+
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.metamx.common.logger.Logger;
+import io.druid.query.lookup.LookupExtractor;
+import io.druid.server.lookup.cache.loading.LoadingCache;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Loading lookup will load the key\value pair upon request on the key it self, the general algorithm is load key if absent.
+ * Once the key/value pair is loaded eviction will occur according to the cache eviction policy.
+ * This module comes with two loading cache implementations, the first {@link io.druid.server.lookup.cache.loading.OnHeapLoadingCache}is onheap backed by a Guava cache implementation, the second {@link io.druid.server.lookup.cache.loading.OffHeapLoadingCache}is MapDB offheap implementation.
+ * Both implementations offer various eviction strategies.
+ */
+public class LoadingLookup extends LookupExtractor
+{
+ private static final Logger LOGGER = new Logger(LoadingLookup.class);
+
+ private final DataFetcher dataFetcher;
+ private final LoadingCache loadingCache;
+ private final LoadingCache> reverseLoadingCache;
+ private final AtomicBoolean isOpen;
+ private final String id = Integer.toHexString(System.identityHashCode(this));
+
+ public LoadingLookup(
+ DataFetcher dataFetcher,
+ LoadingCache loadingCache,
+ LoadingCache> reverseLoadingCache
+ )
+ {
+ this.dataFetcher = Preconditions.checkNotNull(dataFetcher, "lookup must have a DataFetcher");
+ this.loadingCache = Preconditions.checkNotNull(loadingCache, "loading lookup need a cache");
+ this.reverseLoadingCache = Preconditions.checkNotNull(reverseLoadingCache, "loading lookup need reverse cache");
+ this.isOpen = new AtomicBoolean(true);
+ }
+
+
+ @Override
+ public String apply(final String key)
+ {
+ if (key == null) {
+ return null;
+ }
+ final String presentVal;
+ try {
+ presentVal = loadingCache.get(key, new applyCallable(key));
+ return Strings.emptyToNull(presentVal);
+ }
+ catch (ExecutionException e) {
+ LOGGER.debug("value not found for key [%s]", key);
+ return null;
+ }
+ }
+
+ @Override
+ public List unapply(final String value)
+ {
+ // null value maps to empty list
+ if (value == null) {
+ return Collections.EMPTY_LIST;
+ }
+ final List retList;
+ try {
+ retList = reverseLoadingCache.get(value, new unapplyCallable(value));
+ return retList;
+ }
+ catch (ExecutionException e) {
+ LOGGER.debug("list of keys not found for value [%s]", value);
+ return Collections.EMPTY_LIST;
+ }
+ }
+
+ public synchronized void close()
+ {
+ if (isOpen.getAndSet(false)) {
+ LOGGER.info("Closing loading cache [%s]", id);
+ loadingCache.close();
+ reverseLoadingCache.close();
+ } else {
+ LOGGER.info("Closing already closed lookup");
+ return;
+ }
+ }
+
+ public boolean isOpen()
+ {
+ return isOpen.get();
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return LookupExtractionModule.getRandomCacheKey();
+ }
+
+ private class applyCallable implements Callable
+ {
+ private final String key;
+
+ public applyCallable(String key) {this.key = key;}
+
+ @Override
+ public String call() throws Exception
+ {
+ // avoid returning null and return an empty string to cache it.
+ return Strings.nullToEmpty(dataFetcher.fetch(key));
+ }
+ }
+
+ private class unapplyCallable implements Callable>
+ {
+ private final String value;
+
+ public unapplyCallable(String value) {this.value = value;}
+
+ @Override
+ public List call() throws Exception
+ {
+ return dataFetcher.reverseFetchKeys(value);
+ }
+ }
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookupFactory.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookupFactory.java
new file mode 100644
index 000000000000..5a9648789854
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookupFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.metamx.common.logger.Logger;
+import io.druid.query.lookup.LookupExtractorFactory;
+import io.druid.query.lookup.LookupIntrospectHandler;
+import io.druid.server.lookup.cache.loading.LoadingCache;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@JsonTypeName("loadingLookup")
+public class LoadingLookupFactory implements LookupExtractorFactory
+{
+ private final static Logger LOGGER = new Logger(LoadingLookupFactory.class);
+
+ @JsonProperty("dataFetcher")
+ private final DataFetcher dataFetcher;
+
+ @JsonProperty("loadingCacheSpec")
+ private final LoadingCache loadingCache;
+
+ @JsonProperty("reverseLoadingCacheSpec")
+ private final LoadingCache> reverseLoadingCache;
+
+ private final String id = Integer.toHexString(System.identityHashCode(this));
+ private final LoadingLookup loadingLookup;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ public LoadingLookupFactory(
+ @JsonProperty("dataFetcher") DataFetcher dataFetcher,
+ @JsonProperty("loadingCacheSpec") LoadingCache loadingCache,
+ @JsonProperty("reverseLoadingCacheSpec") LoadingCache> reverseLoadingCache
+ )
+ {
+ this(dataFetcher, loadingCache, reverseLoadingCache, new LoadingLookup(dataFetcher,loadingCache,reverseLoadingCache));
+ }
+
+ protected LoadingLookupFactory(
+ DataFetcher dataFetcher,
+ LoadingCache loadingCache,
+ LoadingCache> reverseLoadingCache,
+ LoadingLookup loadingLookup
+ )
+ {
+ this.dataFetcher = Preconditions.checkNotNull(dataFetcher);
+ this.loadingCache = Preconditions.checkNotNull(loadingCache);
+ this.reverseLoadingCache = Preconditions.checkNotNull(reverseLoadingCache);
+ this.loadingLookup = loadingLookup;
+ }
+
+ @Override
+ public synchronized boolean start()
+ {
+ if (!started.get()) {
+ started.set(loadingLookup.isOpen());
+ LOGGER.info("created loading lookup with id [%s]", id);
+ }
+ return started.get();
+ }
+
+ @Override
+ public synchronized boolean close()
+ {
+ if (started.getAndSet(false)) {
+ LOGGER.info("closing loading lookup [%s]", id);
+ loadingLookup.close();
+ }
+ return !started.get();
+ }
+
+ @Override
+ public boolean replaces(
+ @Nullable LookupExtractorFactory lookupExtractorFactory
+ )
+ {
+ if (lookupExtractorFactory == null) return true;
+ return !this.equals(lookupExtractorFactory);
+ }
+
+ @Nullable
+ @Override
+ public LookupIntrospectHandler getIntrospectHandler()
+ {
+ //not supported yet
+ return null;
+ }
+
+ @Override
+ public LoadingLookup get()
+ {
+ return loadingLookup;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof LoadingLookupFactory)) {
+ return false;
+ }
+
+ LoadingLookupFactory that = (LoadingLookupFactory) o;
+
+ if (dataFetcher != null ? !dataFetcher.equals(that.dataFetcher) : that.dataFetcher != null) {
+ return false;
+ }
+ if (loadingCache != null ? !loadingCache.equals(that.loadingCache) : that.loadingCache != null) {
+ return false;
+ }
+ return reverseLoadingCache != null
+ ? reverseLoadingCache.equals(that.reverseLoadingCache)
+ : that.reverseLoadingCache == null;
+
+ }
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LookupExtractionModule.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LookupExtractionModule.java
new file mode 100644
index 000000000000..afffd8b97900
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LookupExtractionModule.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.metamx.common.StringUtils;
+import io.druid.initialization.DruidModule;
+
+import java.util.List;
+import java.util.UUID;
+
+public class LookupExtractionModule implements DruidModule
+{
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(
+ new SimpleModule("SingleCached-LoadingOrPolling-Lookup-Module")
+ {
+ @Override
+ public void setupModule(SetupContext context)
+ {
+ context.registerSubtypes(LoadingLookupFactory.class);
+ context.registerSubtypes(PollingLookupFactory.class);
+ }
+ }
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ }
+
+ public static byte[] getRandomCacheKey()
+ {
+ return StringUtils.toUtf8(UUID.randomUUID().toString());
+ }
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookup.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookup.java
new file mode 100644
index 000000000000..20744eacabd6
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookup.java
@@ -0,0 +1,243 @@
+/*
+ *
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * /
+ */
+
+package io.druid.server.lookup;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.metamx.common.ISE;
+import com.metamx.common.logger.Logger;
+import io.druid.concurrent.Execs;
+import io.druid.query.lookup.LookupExtractor;
+import io.druid.server.lookup.cache.polling.OnHeapPollingCache;
+import io.druid.server.lookup.cache.polling.PollingCache;
+import io.druid.server.lookup.cache.polling.PollingCacheFactory;
+
+import javax.validation.constraints.NotNull;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PollingLookup extends LookupExtractor
+{
+ private static final Logger LOGGER = new Logger(PollingLookup.class);
+
+ private final long pollPeriodMs;
+
+ private final DataFetcher dataFetcher;
+
+ private final PollingCacheFactory cacheFactory;
+
+ private final AtomicReference refOfCacheKeeper = new AtomicReference<>();
+
+
+ private final ListeningScheduledExecutorService scheduledExecutorService;
+
+ private final AtomicBoolean isOpen = new AtomicBoolean(false);
+
+ private final ListenableFuture> pollFuture;
+
+ private final String id = Integer.toHexString(System.identityHashCode(this));
+
+ public PollingLookup(
+ long pollPeriodMs,
+ DataFetcher dataFetcher,
+ PollingCacheFactory cacheFactory
+ )
+ {
+ this.pollPeriodMs = pollPeriodMs;
+ this.dataFetcher = Preconditions.checkNotNull(dataFetcher);
+ this.cacheFactory = cacheFactory == null ? new OnHeapPollingCache.OnHeapPollingCacheProvider() : cacheFactory;
+ refOfCacheKeeper.set(new CacheRefKeeper(this.cacheFactory.makeOf(dataFetcher.fetchAll())));
+ if (pollPeriodMs > 0) {
+ scheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(
+ Execs.makeThreadFactory("PollingLookup-" + id, Thread.MIN_PRIORITY)
+ ));
+ pollFuture = scheduledExecutorService.scheduleWithFixedDelay(
+ pollAndSwap(),
+ pollPeriodMs,
+ pollPeriodMs,
+ TimeUnit.MILLISECONDS
+ );
+ } else {
+ scheduledExecutorService = null;
+ pollFuture = null;
+ }
+ this.isOpen.set(true);
+ }
+
+
+ public void close()
+ {
+ LOGGER.info("Closing polling lookup [%s]", id);
+ synchronized (isOpen) {
+ isOpen.getAndSet(false);
+ if (pollFuture != null) {
+ pollFuture.cancel(true);
+ scheduledExecutorService.shutdown();
+ }
+ CacheRefKeeper cacheRefKeeper = refOfCacheKeeper.getAndSet(null);
+ if (cacheRefKeeper != null) {
+ cacheRefKeeper.doneWithIt();
+ }
+ }
+ }
+
+ @Override
+ public String apply(@NotNull String key)
+ {
+ final CacheRefKeeper cacheRefKeeper = refOfCacheKeeper.get();
+ if (cacheRefKeeper == null) {
+ throw new ISE("Cache reference is null WTF");
+ }
+ final PollingCache cache = cacheRefKeeper.getAndIncrementRef();
+ try {
+ if (cache == null) {
+ // it must've been closed after swapping while I was getting it. Try again.
+ return this.apply(key);
+ }
+ return Strings.emptyToNull((String) cache.get(key));
+ }
+ finally {
+ if (cacheRefKeeper != null && cache != null) {
+ cacheRefKeeper.doneWithIt();
+ }
+ }
+ }
+
+ @Override
+ public List unapply(final String value)
+ {
+ CacheRefKeeper cacheRefKeeper = refOfCacheKeeper.get();
+ if (cacheRefKeeper == null) {
+ throw new ISE("pollingLookup id [%s] is closed", id);
+ }
+ PollingCache cache = cacheRefKeeper.getAndIncrementRef();
+ try {
+ if (cache == null) {
+ // it must've been closed after swapping while I was getting it. Try again.
+ return this.unapply(value);
+ }
+ return cache.getKeys(value);
+ }
+ finally {
+ if (cacheRefKeeper != null && cache != null) {
+ cacheRefKeeper.doneWithIt();
+ }
+ }
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return LookupExtractionModule.getRandomCacheKey();
+ }
+
+ private Runnable pollAndSwap()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ LOGGER.debug("Polling and swapping of PollingLookup [%s]", id);
+ CacheRefKeeper newCacheKeeper = new CacheRefKeeper(cacheFactory.makeOf(dataFetcher.fetchAll()));
+ CacheRefKeeper oldCacheKeeper = refOfCacheKeeper.getAndSet(newCacheKeeper);
+ if (oldCacheKeeper != null) {
+ oldCacheKeeper.doneWithIt();
+ }
+ }
+ };
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = (int) (pollPeriodMs ^ (pollPeriodMs >>> 32));
+ result = 31 * result + dataFetcher.hashCode();
+ result = 31 * result + cacheFactory.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof PollingLookup)) {
+ return false;
+ }
+
+ PollingLookup that = (PollingLookup) o;
+
+ if (pollPeriodMs != that.pollPeriodMs) {
+ return false;
+ }
+ if (!dataFetcher.equals(that.dataFetcher)) {
+ return false;
+ }
+ return cacheFactory.equals(that.cacheFactory);
+
+ }
+
+ public boolean isOpen()
+ {
+ return isOpen.get();
+ }
+
+
+ protected static class CacheRefKeeper
+ {
+ private final PollingCache pollingCache;
+ private final AtomicLong refCounts = new AtomicLong(0L);
+
+ CacheRefKeeper(PollingCache pollingCache) {this.pollingCache = pollingCache;}
+
+ PollingCache getAndIncrementRef()
+ {
+ synchronized (refCounts) {
+ if (refCounts.get() < 0) {
+ return null;
+ }
+ refCounts.incrementAndGet();
+ return pollingCache;
+ }
+ }
+
+ void doneWithIt()
+ {
+ synchronized (refCounts) {
+ if (refCounts.get() == 0) {
+ pollingCache.close();
+ }
+ refCounts.decrementAndGet();
+ }
+ }
+ }
+
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookupFactory.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookupFactory.java
new file mode 100644
index 000000000000..1c93d61d9cd9
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookupFactory.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import com.metamx.common.logger.Logger;
+import io.druid.query.lookup.LookupExtractorFactory;
+import io.druid.query.lookup.LookupIntrospectHandler;
+import io.druid.server.lookup.cache.polling.PollingCacheFactory;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@JsonTypeName("pollingLookup")
+public class PollingLookupFactory implements LookupExtractorFactory
+{
+ private static final Logger LOGGER = new Logger(PollingLookupFactory.class);
+
+ @JsonProperty("pollPeriod")
+ private final Period pollPeriod;
+
+ @JsonProperty("dataFetcher")
+ private final DataFetcher dataFetcher;
+
+ @JsonProperty("cacheProvider")
+ private final PollingCacheFactory cacheFactory;
+
+ private final PollingLookup pollingLookup;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private final String id = Integer.toHexString(System.identityHashCode(this));
+
+ @JsonCreator
+ public PollingLookupFactory(
+ @JsonProperty("pollPeriod") Period pollPeriod,
+ @JsonProperty("dataFetcher") DataFetcher dataFetcher,
+ @JsonProperty("cacheFactory") PollingCacheFactory cacheFactory
+ )
+ {
+ this.pollPeriod = pollPeriod == null ? Period.ZERO : pollPeriod;
+ this.dataFetcher = dataFetcher;
+ this.cacheFactory = cacheFactory;
+ this.pollingLookup = new PollingLookup(this.pollPeriod.getMillis(), this.dataFetcher, this.cacheFactory);
+ }
+
+ @VisibleForTesting
+ protected PollingLookupFactory(
+ Period pollPeriod,
+ DataFetcher dataFetcher,
+ PollingCacheFactory pollingCacheFactory,
+ PollingLookup pollingLookup
+ )
+ {
+ this.pollPeriod = pollPeriod;
+ this.dataFetcher = dataFetcher;
+ this.cacheFactory = pollingCacheFactory;
+ this.pollingLookup = pollingLookup;
+ }
+
+ @Override
+ public boolean start()
+ {
+ synchronized (started) {
+ if (!started.get()) {
+ LOGGER.info("started polling lookup [%s]", id);
+ started.set(pollingLookup.isOpen());
+ }
+ return started.get();
+ }
+ }
+
+ @Override
+ public boolean close()
+ {
+ synchronized (started) {
+ if (started.getAndSet(false)) {
+ LOGGER.info("closing polling lookup [%s]", id);
+ pollingLookup.close();
+ }
+ return !started.get();
+ }
+ }
+
+ @Override
+ public boolean replaces(
+ @Nullable LookupExtractorFactory lookupExtractorFactory
+ )
+ {
+ if (lookupExtractorFactory == null) {
+ return true;
+ }
+
+ return !this.equals(lookupExtractorFactory);
+ }
+
+ @Nullable
+ @Override
+ public LookupIntrospectHandler getIntrospectHandler()
+ {
+ //not supported yet
+ return null;
+ }
+
+ @Override
+ public PollingLookup get()
+ {
+ return pollingLookup;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof PollingLookupFactory)) {
+ return false;
+ }
+
+ PollingLookupFactory that = (PollingLookupFactory) o;
+
+ if (pollPeriod != null ? !pollPeriod.equals(that.pollPeriod) : that.pollPeriod != null) {
+ return false;
+ }
+ if (dataFetcher != null ? !dataFetcher.equals(that.dataFetcher) : that.dataFetcher != null) {
+ return false;
+ }
+ return cacheFactory != null
+ ? (that.cacheFactory != null
+ && cacheFactory.getClass() == (that.cacheFactory).getClass())
+ : that.cacheFactory == null;
+
+ }
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LoadingCache.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LoadingCache.java
new file mode 100644
index 000000000000..2abde26b6f0d
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LoadingCache.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup.cache.loading;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.common.cache.CacheLoader;
+import com.google.common.util.concurrent.ExecutionError;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A semi-persistent mapping from keys to values. Cache entries are added using
+ * {@link #get(Object, Callable)} and stored in the cache until either evicted or manually invalidated.
+ *
+ *
Implementations of this interface are expected to be thread-safe, and can be safely accessed
+ * by multiple concurrent threads.
+ *
+ * This interface borrows ideas (and in some cases methods and javadoc) from Guava and JCache cache interface.
+ * Thanks Guava and JSR !
+ * We elected to make this as close as possible to JSR API like that users can build bridges between all the existing implementations of JSR.
+ */
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "guava", value = OnHeapLoadingCache.class),
+ @JsonSubTypes.Type(name = "mapDb", value = OffHeapLoadingCache.class)
+})
+public interface LoadingCache extends Closeable
+{
+ /**
+ * @param key must not be null
+ * Returns the value associated with {@code key} in this cache, or {@code null} if there is no
+ * cached value for {@code key}.
+ * a cache miss should be counted if the key is missing.
+ */
+ @Nullable
+ V getIfPresent(K key);
+
+ /**
+ * Returns a map of the values associated with {@code keys} in this cache. The returned map will
+ * only contain entries which are already present in the cache.
+ */
+
+ Map getAllPresent(Iterable keys);
+
+ /**
+ * Returns the value associated with {@code key} in this cache, obtaining that value from
+ * {@code valueLoader} if necessary. No observable state associated with this cache is modified
+ * until loading completes.
+ *
+ *
Warning: as with {@link CacheLoader#load}, {@code valueLoader} must not return
+ * {@code null}; it may either return a non-null value or throw an exception.
+ *
+ * @throws ExecutionException if a checked exception was thrown while loading the value
+ * @throws UncheckedExecutionException if an unchecked exception was thrown while loading the
+ * value
+ * @throws ExecutionError if an error was thrown while loading the value
+ *
+ */
+ V get(K key, Callable extends V> valueLoader) throws ExecutionException;
+
+ /**
+ * Copies all of the mappings from the specified map to the cache. This method is used for bulk put.
+ *
+ * @throws IllegalStateException if the cache is {@link #isClosed()}
+ */
+
+ void putAll(Map extends K, ? extends V> m);
+
+ /**
+ * Discards any cached value for key {@code key}. Eviction can be lazy or eager.
+ *
+ * @throws IllegalStateException if the cache is {@link #isClosed()}
+ */
+ void invalidate(K key);
+
+ /**
+ * Discards any cached values for keys {@code keys}. Eviction can be lazy or eager.
+ *
+ * @throws IllegalStateException if the cache is {@link #isClosed()}
+ */
+ void invalidateAll(Iterable keys);
+
+ /**
+ * Clears the contents of the cache.
+ *
+ * @throws IllegalStateException if the cache is {@link #isClosed()}
+ */
+ void invalidateAll();
+
+ /**
+ * @return Stats of the cache.
+ *
+ * @throws IllegalStateException if the cache is {@link #isClosed()}
+ */
+ LookupCacheStats getStats();
+
+ /**
+ * @return true if the Cache is closed
+ */
+ boolean isClosed();
+
+ /**
+ * Clean the used resources of the cache. Still not sure about cache lifecycle but as an initial design
+ * the namespace deletion event should call this method to clean up resources.
+ */
+
+ void close();
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LookupCacheStats.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LookupCacheStats.java
new file mode 100644
index 000000000000..cf9335a76112
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LookupCacheStats.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup.cache.loading;
+
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Statistics about the performance of a {@link LoadingCache}. Instances of this class are immutable.
+ * Cache statistics are incremented according to the following rules:
+ *
+ *
When a cache lookup encounters an existing cache entry {@code hitCount} is incremented.
+ *
When a cache lookup first encounters a missing cache entry {@code missCount} is incremented.
+ *
When an entry is evicted from the cache, {@code evictionCount} is incremented.
+ *
+ */
+public class LookupCacheStats
+{
+ private final long hitCount;
+ private final long missCount;
+ private final long evictionCount;
+
+ /**
+ * Constructs a new {@code CacheStats} instance.
+ */
+ public LookupCacheStats(
+ long hitCount, long missCount,
+ long evictionCount
+ )
+ {
+ checkArgument(hitCount >= 0);
+ checkArgument(missCount >= 0);
+ checkArgument(evictionCount >= 0);
+
+ this.hitCount = hitCount;
+ this.missCount = missCount;
+ this.evictionCount = evictionCount;
+ }
+
+ /**
+ * Returns the number of times {@link LoadingCache} lookup methods have returned either a cached or
+ * uncached value. This is defined as {@code hitCount + missCount}.
+ */
+ public long requestCount()
+ {
+ return hitCount + missCount;
+ }
+
+ /**
+ * Returns the number of times {@link LoadingCache} lookup methods have returned a cached value.
+ */
+ public long hitCount()
+ {
+ return hitCount;
+ }
+
+ /**
+ * Returns the ratio of cache requests which were hits. This is defined as
+ * {@code hitCount / requestCount}, or {@code 1.0} when {@code requestCount == 0}.
+ * Note that {@code hitRate + missRate =~ 1.0}.
+ */
+ public double hitRate()
+ {
+ long requestCount = requestCount();
+ return (requestCount == 0) ? 1.0 : (double) hitCount / requestCount;
+ }
+
+ /**
+ * Returns the number of times {@link LoadingCache} lookup methods have returned an uncached (newly
+ * loaded) value, or null. Multiple concurrent calls to {@link LoadingCache} lookup methods on an absent
+ * value can result in multiple misses, all returning the results of a single cache load
+ * operation.
+ */
+ public long missCount()
+ {
+ return missCount;
+ }
+
+ /**
+ * Returns the ratio of cache requests which were misses. This is defined as
+ * {@code missCount / requestCount}, or {@code 0.0} when {@code requestCount == 0}.
+ * Note that {@code hitRate + missRate =~ 1.0}. Cache misses include all requests which
+ * weren't cache hits, including requests which resulted in either successful or failed loading
+ * attempts, and requests which waited for other threads to finish loading. It is thus the case
+ * that {@code missCount >= loadSuccessCount + loadExceptionCount}. Multiple
+ * concurrent misses for the same key will result in a single load operation.
+ */
+ public double missRate()
+ {
+ long requestCount = requestCount();
+ return (requestCount == 0) ? 0.0 : (double) missCount / requestCount;
+ }
+
+
+ /**
+ * Returns the number of times an entry has been evicted. This count does not include manual
+ * {@linkplain LoadingCache#invalidate invalidations}.
+ */
+ public long evictionCount()
+ {
+ return evictionCount;
+ }
+
+ /**
+ * Returns a new {@code CacheStats} representing the difference between this {@code CacheStats}
+ * and {@code other}. Negative values, which aren't supported by {@code CacheStats} will be
+ * rounded up to zero.
+ */
+ public LookupCacheStats minus(LookupCacheStats other)
+ {
+ return new LookupCacheStats(
+ Math.max(0, hitCount - other.hitCount),
+ Math.max(0, missCount - other.missCount),
+ Math.max(0, evictionCount - other.evictionCount)
+ );
+ }
+
+ /**
+ * Returns a new {@code CacheStats} representing the sum of this {@code CacheStats}
+ * and {@code other}.
+ */
+ public LookupCacheStats plus(LookupCacheStats other)
+ {
+ return new LookupCacheStats(
+ hitCount + other.hitCount,
+ missCount + other.missCount,
+ evictionCount + other.evictionCount
+ );
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof LookupCacheStats)) {
+ return false;
+ }
+
+ LookupCacheStats that = (LookupCacheStats) o;
+
+ if (hitCount != that.hitCount) {
+ return false;
+ }
+ if (missCount != that.missCount) {
+ return false;
+ }
+
+ return evictionCount == that.evictionCount;
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = (int) (hitCount ^ (hitCount >>> 32));
+ result = 31 * result + (int) (missCount ^ (missCount >>> 32));
+ result = 31 * result + (int) (evictionCount ^ (evictionCount >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "LookupCacheStats{" +
+ "hitCount=" + hitCount +
+ ", missCount=" + missCount +
+ ", evictionCount=" + evictionCount +
+ '}';
+ }
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OffHeapLoadingCache.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OffHeapLoadingCache.java
new file mode 100644
index 000000000000..9bf22c5ab3ca
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OffHeapLoadingCache.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup.cache.loading;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableMap;
+import com.metamx.common.ISE;
+import org.mapdb.Bind;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.HTreeMap;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class OffHeapLoadingCache implements LoadingCache
+{
+ private static final DB DB = DBMaker.newMemoryDirectDB().transactionDisable().closeOnJvmShutdown().make();
+
+ private final HTreeMap cache;
+ private final AtomicLong hitCount = new AtomicLong(0);
+ private final AtomicLong missCount = new AtomicLong(0);
+ private final AtomicLong evictionCount = new AtomicLong(0);
+ private final AtomicBoolean closed = new AtomicBoolean(true);
+
+ /**
+ * Sets store size limit. Disk or memory space consumed be storage should not grow over this space.
+ * maximal size of store in GB, if store is larger entries will start expiring
+ */
+ @JsonProperty
+ private final double maxStoreSize;
+
+ /**
+ * Sets maximal number of entries in this map.
+ * Less used entries will be expired and removed to make collection smaller
+ */
+ @JsonProperty
+ private final long maxEntriesSize;
+
+ /**
+ * Specifies that each entry should be automatically removed from the map once a fixed duration has elapsed after the entry's creation, or the most recent replacement of its value.
+ */
+ @JsonProperty
+ private final long expireAfterWrite;
+
+ /**
+ * Specifies that each entry should be automatically removed from the map once a fixed duration has elapsed after the entry's creation, the most recent replacement of its value, or its last access. Access time is reset by all map read and write operations
+ */
+ @JsonProperty
+ private final long expireAfterAccess;
+
+ private final String name = UUID.randomUUID().toString();
+
+ @JsonCreator
+ public OffHeapLoadingCache(
+ @JsonProperty("maxStoreSize") double maxStoreSize,
+ @JsonProperty("maxEntriesSize") long maxEntriesSize,
+ @JsonProperty("expireAfterWrite") long expireAfterWrite,
+ @JsonProperty("expireAfterAccess") long expireAfterAccess
+ )
+ {
+ this.maxStoreSize = maxStoreSize;
+ this.maxEntriesSize = maxEntriesSize;
+ this.expireAfterWrite = expireAfterWrite;
+ this.expireAfterAccess = expireAfterAccess;
+ this.cache = DB.createHashMap(name)
+ .expireStoreSize(this.maxStoreSize)
+ .expireAfterWrite(this.expireAfterWrite, TimeUnit.MILLISECONDS)
+ .expireAfterAccess(this.expireAfterAccess, TimeUnit.MILLISECONDS)
+ .expireMaxSize(this.maxEntriesSize)
+ .make();
+ cache.modificationListenerAdd(new Bind.MapListener()
+ {
+ @Override
+ public void update(K key, V oldVal, V newVal)
+ {
+ if (oldVal != null && newVal == null) {
+ // eviction or remove call
+ evictionCount.getAndIncrement();
+ }
+ }
+ });
+ this.closed.set(false);
+ }
+
+ @Override
+ public V getIfPresent(K key)
+ {
+ V value = cache.get(key);
+ if (value == null) {
+ missCount.getAndIncrement();
+ } else {
+ hitCount.getAndIncrement();
+ }
+ return value;
+ }
+
+ @Override
+ public Map getAllPresent(final Iterable keys)
+ {
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ for (K key : keys
+ ) {
+ V value = getIfPresent(key);
+ if (value != null) {
+ builder.put(key, value);
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public V get(K key, final Callable extends V> valueLoader) throws ExecutionException
+ {
+ synchronized (key) {
+ V value = cache.get(key);
+ if (value != null) {
+ return value;
+ }
+ try {
+ value = valueLoader.call();
+ cache.put(key, value);
+ return value;
+ }
+ catch (Exception e) {
+ throw new ISE(e, "got an exception while loading key [%s]", key);
+ }
+ }
+ }
+
+
+ @Override
+ public void putAll(Map extends K, ? extends V> m)
+ {
+ cache.putAll(m);
+ }
+
+ @Override
+ public void invalidate(K key)
+ {
+ cache.remove(key);
+ }
+
+ @Override
+ public void invalidateAll(Iterable keys)
+ {
+ for (K key : keys) {
+ invalidate(key);
+ }
+ }
+
+ @Override
+ public void invalidateAll()
+ {
+ cache.clear();
+ }
+
+ @Override
+ public LookupCacheStats getStats()
+ {
+ return new LookupCacheStats(hitCount.get(), missCount.get(), evictionCount.get());
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return closed.get();
+ }
+
+ @Override
+ public void close()
+ {
+ if (!closed.getAndSet(true)) {
+ DB.delete(String.valueOf(name));
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof OffHeapLoadingCache)) {
+ return false;
+ }
+
+ OffHeapLoadingCache, ?> that = (OffHeapLoadingCache, ?>) o;
+
+ if (Double.compare(that.maxStoreSize, maxStoreSize) != 0) {
+ return false;
+ }
+ if (maxEntriesSize != that.maxEntriesSize) {
+ return false;
+ }
+ if (expireAfterWrite != that.expireAfterWrite) {
+ return false;
+ }
+ return expireAfterAccess == that.expireAfterAccess;
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result;
+ long temp;
+ temp = Double.doubleToLongBits(maxStoreSize);
+ result = (int) (temp ^ (temp >>> 32));
+ result = 31 * result + (int) (maxEntriesSize ^ (maxEntriesSize >>> 32));
+ result = 31 * result + (int) (expireAfterWrite ^ (expireAfterWrite >>> 32));
+ result = 31 * result + (int) (expireAfterAccess ^ (expireAfterAccess >>> 32));
+ return result;
+ }
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OnHeapLoadingCache.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OnHeapLoadingCache.java
new file mode 100644
index 000000000000..9fd8fb3aab81
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OnHeapLoadingCache.java
@@ -0,0 +1,218 @@
+/*
+ *
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * /
+ */
+
+package io.druid.server.lookup.cache.loading;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.metamx.common.logger.Logger;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class OnHeapLoadingCache implements LoadingCache
+{
+ private final static Logger log = new Logger(OnHeapLoadingCache.class);
+ private static final int DEFAULT_INITIAL_CAPACITY = 16;
+ //See com.google.common.cache.CacheBuilder#DEFAULT_CONCURRENCY_LEVEL
+ private static final int DEFAULT_CONCURRENCY_LEVEL = 4;
+
+ private final Cache cache;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ @JsonProperty
+ private final int concurrencyLevel;
+ @JsonProperty
+ private final int initialCapacity;
+ @JsonProperty
+ private final Long maximumSize;
+ @JsonProperty
+ private final Long expireAfterAccess;
+ @JsonProperty
+ private final Long expireAfterWrite;
+
+
+ /**
+ * @param concurrencyLevel default to {@code DEFAULT_CONCURRENCY_LEVEL}
+ * @param initialCapacity default to {@code DEFAULT_INITIAL_CAPACITY}
+ * @param maximumSize Max number of entries that the cache can hold, When set to zero, elements will be evicted immediately after being loaded into the
+ * cache.
+ * When set to null, cache maximum size is infinity
+ * @param expireAfterAccess Specifies that each entry should be automatically removed from the cache once a fixed duration
+ * has elapsed after the entry's creation, the most recent replacement of its value, or its last
+ * access. Access time is reset by all cache read and write operations.
+ * No read-time-based eviction when set to null.
+ * @param expireAfterWrite Specifies that each entry should be automatically removed from the cache once a fixed duration
+ * has elapsed after the entry's creation, or the most recent replacement of its value.
+ * No write-time-based eviction when set to null.
+ */
+ @JsonCreator
+ public OnHeapLoadingCache(
+ @JsonProperty("concurrencyLevel") int concurrencyLevel,
+ @JsonProperty("initialCapacity") int initialCapacity,
+ @JsonProperty("maximumSize") Long maximumSize,
+ @JsonProperty("expireAfterAccess") Long expireAfterAccess,
+ @JsonProperty("expireAfterWrite") Long expireAfterWrite
+ )
+ {
+ this.concurrencyLevel = concurrencyLevel <= 0 ? DEFAULT_CONCURRENCY_LEVEL : concurrencyLevel;
+ this.initialCapacity = initialCapacity <= 0 ? DEFAULT_INITIAL_CAPACITY : initialCapacity;
+ this.maximumSize = maximumSize;
+ this.expireAfterAccess = expireAfterAccess;
+ this.expireAfterWrite = expireAfterWrite;
+ CacheBuilder builder = CacheBuilder.newBuilder()
+ .concurrencyLevel(this.concurrencyLevel)
+ .initialCapacity(this.initialCapacity)
+ .recordStats();
+ if (this.expireAfterAccess != null) {
+ builder.expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS);
+ }
+ if (this.expireAfterWrite != null) {
+ builder.expireAfterWrite(this.expireAfterWrite, TimeUnit.MILLISECONDS);
+ }
+ if (this.maximumSize != null) {
+ builder.maximumSize(this.maximumSize);
+ }
+
+ this.cache = builder.build();
+
+ if (isClosed.getAndSet(false)) {
+ log.info("Guava Based OnHeapCache started with spec [%s]", cache.toString());
+ }
+ }
+
+
+ @Override
+ public V getIfPresent(K key)
+ {
+ return cache.getIfPresent(key);
+ }
+
+ @Override
+ public void putAll(Map extends K, ? extends V> m)
+ {
+ cache.putAll(m);
+ }
+
+
+ @Override
+ public Map getAllPresent(Iterable keys)
+ {
+ return cache.getAllPresent(keys);
+ }
+
+ @Override
+ public V get(K key, Callable extends V> valueLoader) throws ExecutionException
+ {
+ return cache.get(key, valueLoader);
+ }
+
+ @Override
+ public void invalidate(K key)
+ {
+ cache.invalidate(key);
+ }
+
+ @Override
+ public void invalidateAll(Iterable keys)
+ {
+ cache.invalidateAll(keys);
+ }
+
+ @Override
+ public void invalidateAll()
+ {
+ cache.invalidateAll();
+ cache.cleanUp();
+ }
+
+ @Override
+ public LookupCacheStats getStats()
+ {
+ return new LookupCacheStats(
+ cache.stats().hitCount(),
+ cache.stats().missCount(),
+ cache.stats().evictionCount()
+ );
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return isClosed.get();
+ }
+
+ @Override
+ public void close()
+ {
+ if (!isClosed.getAndSet(true)) {
+ cache.cleanUp();
+ }
+ }
+
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof OnHeapLoadingCache)) {
+ return false;
+ }
+
+ OnHeapLoadingCache, ?> that = (OnHeapLoadingCache, ?>) o;
+
+ if (concurrencyLevel != that.concurrencyLevel) {
+ return false;
+ }
+ if (initialCapacity != that.initialCapacity) {
+ return false;
+ }
+ if (maximumSize != null ? !maximumSize.equals(that.maximumSize) : that.maximumSize != null) {
+ return false;
+ }
+ if (expireAfterAccess != null
+ ? !expireAfterAccess.equals(that.expireAfterAccess)
+ : that.expireAfterAccess != null) {
+ return false;
+ }
+ return expireAfterWrite != null ? expireAfterWrite.equals(that.expireAfterWrite) : that.expireAfterWrite == null;
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = concurrencyLevel;
+ result = 31 * result + initialCapacity;
+ result = 31 * result + (maximumSize != null ? maximumSize.hashCode() : 0);
+ result = 31 * result + (expireAfterAccess != null ? expireAfterAccess.hashCode() : 0);
+ result = 31 * result + (expireAfterWrite != null ? expireAfterWrite.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OffHeapPollingCache.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OffHeapPollingCache.java
new file mode 100644
index 000000000000..52025e633dd0
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OffHeapPollingCache.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * /
+ */
+
+package io.druid.server.lookup.cache.polling;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.HTreeMap;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class OffHeapPollingCache implements PollingCache
+{
+ private static final DB DB = DBMaker.newMemoryDirectDB().transactionDisable().closeOnJvmShutdown().make();
+
+ private final HTreeMap mapCache;
+ private final HTreeMap> reverseCache;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final String cacheName;
+ private final String reverseCacheName;
+
+ public OffHeapPollingCache(final Iterable> entries)
+ {
+ synchronized (started) {
+ this.cacheName = String.format("cache-%s", UUID.randomUUID());
+ this.reverseCacheName = String.format("reverseCache-%s", UUID.randomUUID());
+ mapCache = DB.createHashMap(cacheName).make();
+ reverseCache = DB.createHashMap(reverseCacheName).make();
+ ImmutableSet.Builder setOfValuesBuilder = ImmutableSet.builder();
+ for (Map.Entry entry : entries) {
+ mapCache.put(entry.getKey(), entry.getValue());
+ setOfValuesBuilder.add(entry.getValue());
+ }
+
+ final Set setOfValues = setOfValuesBuilder.build();
+ reverseCache.putAll(Maps.asMap(
+ setOfValues, new Function>()
+ {
+ @Override
+ public List apply(final V input)
+ {
+ return Lists.newArrayList(Maps.filterKeys(mapCache, new Predicate()
+ {
+ @Override
+ public boolean apply(K key)
+ {
+ V retVal = mapCache.get(key);
+ if (retVal == null) {
+ return false;
+ }
+ return retVal.equals(input);
+ }
+ }).keySet());
+ }
+ }));
+ started.getAndSet(true);
+ }
+ }
+
+ @Override
+ public V get(K key)
+ {
+ return mapCache.get(key);
+ }
+
+ @Override
+ public List getKeys(V value)
+ {
+ final List listOfKey = reverseCache.get(value);
+ if (listOfKey == null) {
+ return Collections.emptyList();
+ }
+ return listOfKey;
+ }
+
+ @Override
+ public void close()
+ {
+ synchronized (started) {
+ if (started.getAndSet(false)) {
+ DB.delete(cacheName);
+ DB.delete(reverseCacheName);
+ }
+ }
+ }
+
+ public static class OffHeapPollingCacheProvider implements PollingCacheFactory
+ {
+ @Override
+ public PollingCache makeOf(Iterable> entries)
+ {
+ return new OffHeapPollingCache<>(entries);
+ }
+ }
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OnHeapPollingCache.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OnHeapPollingCache.java
new file mode 100644
index 000000000000..c40534599a60
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OnHeapPollingCache.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup.cache.polling;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class OnHeapPollingCache implements PollingCache
+{
+ private final ImmutableMap immutableMap;
+ private final ImmutableMap> immutableReverseMap;
+
+
+ public OnHeapPollingCache(Iterable> entries)
+ {
+
+ if (entries == null) {
+ immutableMap = ImmutableMap.of();
+ immutableReverseMap = ImmutableMap.of();
+ } else {
+ ImmutableSet.Builder setOfValuesBuilder = ImmutableSet.builder();
+ ImmutableMap.Builder mapBuilder = ImmutableMap.builder();
+ for (Map.Entry entry: entries
+ ) {
+ setOfValuesBuilder.add(entry.getValue());
+ mapBuilder.put(entry.getKey(), entry.getValue());
+ }
+ final Set setOfValues = setOfValuesBuilder.build();
+ immutableMap = mapBuilder.build();
+ immutableReverseMap = ImmutableMap.copyOf(Maps.asMap(
+ setOfValues, new Function>()
+ {
+ @Override
+ public List apply(final V input)
+ {
+ return Lists.newArrayList(Maps.filterKeys(immutableMap, new Predicate()
+ {
+ @Override
+ public boolean apply(K key)
+ {
+ V retVal = immutableMap.get(key);
+ if (retVal == null) {
+ return false;
+ }
+ return retVal.equals(input);
+ }
+ }).keySet());
+ }
+ }));
+ }
+
+ }
+
+ @Override
+ public V get(K key)
+ {
+ return immutableMap.get(key);
+ }
+
+ @Override
+ public List getKeys(final V value)
+ {
+ final List listOfKeys = immutableReverseMap.get(value);
+ if (listOfKeys == null) {
+ return Collections.emptyList();
+ }
+ return listOfKeys;
+ }
+
+ @Override
+ public void close()
+ {
+ //noop
+ }
+
+
+ public static class OnHeapPollingCacheProvider implements PollingCacheFactory
+ {
+ @Override
+ public PollingCache makeOf(Iterable> entries)
+ {
+ return new OnHeapPollingCache(entries);
+ }
+ }
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCache.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCache.java
new file mode 100644
index 000000000000..f9ac3414fea2
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCache.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup.cache.polling;
+
+import java.util.List;
+
+public interface PollingCache
+{
+ /**
+ * @param key Given key to lookup its value from the cache.
+ *
+ * @return Returns the value associated to the {@code key} or {@code null} if no value exist.
+ */
+ V get(K key);
+
+ /**
+ * @param value Given value to reverse lookup its keys.
+ *
+ * @return Returns a {@link List} of keys associated to the given {@code value} otherwise {@link java.util.Collections.EmptyList}
+ */
+ List getKeys(V value);
+
+ /**
+ * close and clean the resources used by the cache
+ */
+ void close();
+
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCacheFactory.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCacheFactory.java
new file mode 100644
index 000000000000..18789351c100
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCacheFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup.cache.polling;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.util.Map;
+
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = OnHeapPollingCache.OnHeapPollingCacheProvider.class)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "onHeapPolling", value = OnHeapPollingCache.OnHeapPollingCacheProvider.class),
+ @JsonSubTypes.Type(name = "offHeapPolling", value = OffHeapPollingCache.OffHeapPollingCacheProvider.class)
+})
+public interface PollingCacheFactory
+{
+ /**
+ * @param entries of keys and values used to populate the cache
+ *
+ * @return Returns a new {@link PollingCache} containing all the entries of {@code map}
+ */
+
+ PollingCache makeOf(Iterable> entries);
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/JdbcDataFetcher.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/JdbcDataFetcher.java
new file mode 100644
index 000000000000..0c6b7b75eda9
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/JdbcDataFetcher.java
@@ -0,0 +1,230 @@
+/*
+ *
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * /
+ */
+
+package io.druid.server.lookup.jdbc;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.metamx.common.logger.Logger;
+import io.druid.metadata.MetadataStorageConnectorConfig;
+import io.druid.server.lookup.DataFetcher;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.TransactionCallback;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.skife.jdbi.v2.util.StringMapper;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+public class JdbcDataFetcher implements DataFetcher
+{
+ private static final Logger LOGGER = new Logger(JdbcDataFetcher.class);
+ private static final int DEFAULT_STREAMING_FETCH_SIZE = 1000;
+
+ @JsonProperty
+ private final MetadataStorageConnectorConfig connectorConfig;
+ @JsonProperty
+ private final String table;
+ @JsonProperty
+ private final String keyColumn;
+ @JsonProperty
+ private final String valueColumn;
+ @JsonProperty
+ private final int streamingFetchSize;
+
+ private final String fetchAllQuery;
+ private final String fetchQuery;
+ private final String reverseFetchQuery;
+ private final DBI dbi;
+
+ public JdbcDataFetcher(
+ @JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig,
+ @JsonProperty("table") String table,
+ @JsonProperty("keyColumn") String keyColumn,
+ @JsonProperty("valueColumn") String valueColumn,
+ @JsonProperty("streamingFetchSize") Integer streamingFetchSize
+ )
+ {
+ this.connectorConfig = Preconditions.checkNotNull(connectorConfig, "connectorConfig");
+ this.streamingFetchSize = streamingFetchSize == null ? DEFAULT_STREAMING_FETCH_SIZE : streamingFetchSize;
+ Preconditions.checkNotNull(connectorConfig.getConnectURI(), "connectorConfig.connectURI");
+ this.table = Preconditions.checkNotNull(table, "table");
+ this.keyColumn = Preconditions.checkNotNull(keyColumn, "keyColumn");
+ this.valueColumn = Preconditions.checkNotNull(valueColumn, "valueColumn");
+
+ this.fetchAllQuery = String.format(
+ "SELECT %s, %s FROM %s",
+ this.keyColumn,
+ this.valueColumn,
+ this.table
+ );
+ this.fetchQuery = String.format(
+ "SELECT %s FROM %s WHERE %s = :val",
+ this.valueColumn,
+ this.table,
+ this.keyColumn
+ );
+ this.reverseFetchQuery = String.format(
+ "SELECT %s FROM %s WHERE %s = :val",
+ this.keyColumn,
+ this.table,
+ this.valueColumn
+ );
+ dbi = new DBI(
+ connectorConfig.getConnectURI(),
+ connectorConfig.getUser(),
+ connectorConfig.getPassword()
+ );
+ dbi.registerMapper(new KeyValueResultSetMapper(keyColumn, valueColumn));
+ }
+
+ @Override
+ public Iterable> fetchAll()
+ {
+ return inReadOnlyTransaction(new TransactionCallback>>()
+ {
+ @Override
+ public List> inTransaction(
+ Handle handle,
+ TransactionStatus status
+ ) throws Exception
+ {
+ return handle.createQuery(fetchAllQuery)
+ .setFetchSize(streamingFetchSize)
+ .map(new KeyValueResultSetMapper(keyColumn, valueColumn))
+ .list();
+ }
+
+ }
+ );
+ }
+
+ @Override
+ public String fetch(final String key)
+ {
+ List pairs = inReadOnlyTransaction(
+ new TransactionCallback>()
+ {
+ @Override
+ public List inTransaction(Handle handle, TransactionStatus status) throws Exception
+ {
+ return handle.createQuery(fetchQuery)
+ .bind("val", key)
+ .map(StringMapper.FIRST)
+ .list();
+ }
+ }
+ );
+ if (pairs.isEmpty()) {
+ return null;
+ }
+ return Strings.nullToEmpty(pairs.get(0));
+ }
+
+ @Override
+ public Iterable> fetch(final Iterable keys)
+ {
+ QueryKeys queryKeys = dbi.onDemand(QueryKeys.class);
+ return queryKeys.findNamesForIds(Lists.newArrayList(keys), table, keyColumn, valueColumn);
+ }
+
+ @Override
+ public List reverseFetchKeys(final String value)
+ {
+ List results = inReadOnlyTransaction(new TransactionCallback>()
+ {
+ @Override
+ public List inTransaction(Handle handle, TransactionStatus status) throws Exception
+ {
+ return handle.createQuery(reverseFetchQuery)
+ .bind("val", value)
+ .map(StringMapper.FIRST)
+ .list();
+ }
+ });
+ return results;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof JdbcDataFetcher)) {
+ return false;
+ }
+
+ JdbcDataFetcher that = (JdbcDataFetcher) o;
+
+ if (!connectorConfig.equals(that.connectorConfig)) {
+ return false;
+ }
+ if (!table.equals(that.table)) {
+ return false;
+ }
+ if (!keyColumn.equals(that.keyColumn)) {
+ return false;
+ }
+ return valueColumn.equals(that.valueColumn);
+
+ }
+
+ private DBI getDbi()
+ {
+ return dbi;
+ }
+
+ private T inReadOnlyTransaction(final TransactionCallback callback)
+ {
+ return getDbi().withHandle(
+ new HandleCallback()
+ {
+ @Override
+ public T withHandle(Handle handle) throws Exception
+ {
+ final Connection connection = handle.getConnection();
+ final boolean readOnly = connection.isReadOnly();
+ connection.setReadOnly(true);
+ try {
+ return handle.inTransaction(callback);
+ }
+ finally {
+ try {
+ connection.setReadOnly(readOnly);
+ }
+ catch (SQLException e) {
+ // at least try to log it so we don't swallow exceptions
+ LOGGER.error(e, "Unable to reset connection read-only state");
+ }
+ }
+ }
+ }
+ );
+ }
+
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/KeyValueResultSetMapper.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/KeyValueResultSetMapper.java
new file mode 100644
index 000000000000..14d27939ad29
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/KeyValueResultSetMapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup.jdbc;
+
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.AbstractMap;
+import java.util.Map;
+
+
+public class KeyValueResultSetMapper implements ResultSetMapper>
+{
+ private final String keyColumn;
+ private final String valueColumn;
+
+ public KeyValueResultSetMapper(String keyColumn, String valueColumn)
+ {
+ this.keyColumn = keyColumn;
+ this.valueColumn = valueColumn;
+ }
+
+ @Override
+ public Map.Entry map(int index, ResultSet r, StatementContext ctx) throws SQLException
+ {
+ return new AbstractMap.SimpleEntry<>(r.getString(keyColumn), r.getString(valueColumn));
+ }
+}
diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/QueryKeys.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/QueryKeys.java
new file mode 100644
index 000000000000..dce03904dd3b
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/QueryKeys.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup.jdbc;
+
+
+import com.google.common.collect.ImmutableSet;
+import io.dropwizard.jdbi.ImmutableSetContainerFactory;
+import org.skife.jdbi.v2.sqlobject.SqlQuery;
+import org.skife.jdbi.v2.sqlobject.customizers.Define;
+import org.skife.jdbi.v2.sqlobject.customizers.RegisterContainerMapper;
+import org.skife.jdbi.v2.sqlobject.stringtemplate.UseStringTemplate3StatementLocator;
+import org.skife.jdbi.v2.unstable.BindIn;
+
+import java.util.List;
+import java.util.Map;
+
+
+@UseStringTemplate3StatementLocator()
+@RegisterContainerMapper(ImmutableSetContainerFactory.class)
+public interface QueryKeys
+{
+ @SqlQuery("SELECT , FROM
WHERE IN ()")
+ ImmutableSet> findNamesForIds(
+ @BindIn("keys") List keys,
+ @Define("table") String table,
+ @Define("keyColumn") String keyColumn,
+ @Define("valueColumn") String valueColumn
+ );
+}
+
diff --git a/extensions-core/lookups-cached-single/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/lookups-cached-single/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
new file mode 100644
index 000000000000..09592acd62f8
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -0,0 +1,22 @@
+#
+# /*
+# * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. Metamarkets licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing,
+# * software distributed under the License is distributed on an
+# * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# * KIND, either express or implied. See the License for the
+# * specific language governing permissions and limitations
+# * under the License.
+# */
+#
+
+io.druid.server.lookup.LookupExtractionModule
diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/CacheRefKeeperTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/CacheRefKeeperTest.java
new file mode 100644
index 000000000000..94f2927c8f1a
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/CacheRefKeeperTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup;
+
+import io.druid.server.lookup.cache.polling.PollingCache;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CacheRefKeeperTest
+{
+
+ @Test
+ public void testGet()
+ {
+ PollingCache mockPollingCache = EasyMock.createStrictMock(PollingCache.class);
+ PollingLookup.CacheRefKeeper cacheRefKeeper = new PollingLookup.CacheRefKeeper(mockPollingCache);
+ Assert.assertEquals(mockPollingCache, cacheRefKeeper.getAndIncrementRef());
+ }
+
+ @Test
+ public void testDoneWithIt()
+ {
+ PollingCache mockPollingCache = EasyMock.createStrictMock(PollingCache.class);
+ PollingLookup.CacheRefKeeper cacheRefKeeper = new PollingLookup.CacheRefKeeper(mockPollingCache);
+ mockPollingCache.close();
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.replay(mockPollingCache);
+ cacheRefKeeper.doneWithIt();
+ EasyMock.verify(mockPollingCache);
+ }
+
+ @Test
+ public void testGetAfterDoneWithIt()
+ {
+ PollingCache mockPollingCache = EasyMock.createStrictMock(PollingCache.class);
+ PollingLookup.CacheRefKeeper cacheRefKeeper = new PollingLookup.CacheRefKeeper(mockPollingCache);
+ mockPollingCache.close();
+ EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.replay(mockPollingCache);
+ cacheRefKeeper.doneWithIt();
+ EasyMock.verify(mockPollingCache);
+ Assert.assertEquals(null, cacheRefKeeper.getAndIncrementRef());
+ Assert.assertEquals(null, cacheRefKeeper.getAndIncrementRef());
+ }
+
+}
diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupFactoryTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupFactoryTest.java
new file mode 100644
index 000000000000..06e17b47f5f7
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupFactoryTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup;
+
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.druid.query.lookup.LookupExtractorFactory;
+import io.druid.segment.TestHelper;
+import io.druid.server.lookup.cache.loading.LoadingCache;
+import io.druid.server.lookup.cache.loading.OffHeapLoadingCache;
+import io.druid.server.lookup.cache.loading.OnHeapLoadingCache;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class LoadingLookupFactoryTest
+{
+ DataFetcher dataFetcher = EasyMock.createMock(DataFetcher.class);
+ LoadingCache lookupCache = EasyMock.createStrictMock(LoadingCache.class);
+ LoadingCache reverseLookupCache = EasyMock.createStrictMock(LoadingCache.class);
+ LoadingLookup loadingLookup = EasyMock.createMock(LoadingLookup.class);
+ LoadingLookupFactory loadingLookupFactory = new LoadingLookupFactory(
+ dataFetcher,
+ lookupCache,
+ reverseLookupCache,
+ loadingLookup
+ );
+
+ @Test
+ public void testStartStop()
+ {
+ EasyMock.expect(loadingLookup.isOpen()).andReturn(true).once();
+ loadingLookup.close();
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(loadingLookup);
+ Assert.assertTrue(loadingLookupFactory.start());
+ Assert.assertTrue(loadingLookupFactory.close());
+ EasyMock.verify(loadingLookup);
+
+ }
+
+ @Test
+ public void testReplacesWithNull()
+ {
+ Assert.assertTrue(loadingLookupFactory.replaces(null));
+ }
+
+ @Test
+ public void testReplacesWithSame()
+ {
+ Assert.assertFalse(loadingLookupFactory.replaces(loadingLookupFactory));
+ }
+
+ @Test
+ public void testReplacesWithDifferent()
+ {
+ Assert.assertTrue(loadingLookupFactory.replaces(new LoadingLookupFactory(
+ EasyMock.createMock(DataFetcher.class),
+ lookupCache,
+ reverseLookupCache
+ )));
+ Assert.assertTrue(loadingLookupFactory.replaces(new LoadingLookupFactory(
+ dataFetcher,
+ EasyMock.createMock(LoadingCache.class),
+ reverseLookupCache
+ )));
+ Assert.assertTrue(loadingLookupFactory.replaces(new LoadingLookupFactory(
+ dataFetcher,
+ lookupCache,
+ EasyMock.createMock(LoadingCache.class)
+ )));
+ }
+
+
+ @Test
+ public void testGet()
+ {
+ Assert.assertEquals(loadingLookup, loadingLookupFactory.get());
+ }
+
+ @Test
+ public void testSerDeser() throws IOException
+ {
+ ObjectMapper mapper = TestHelper.getObjectMapper();
+ LoadingLookupFactory loadingLookupFactory = new LoadingLookupFactory(
+ new MockDataFetcher(),
+ new OnHeapLoadingCache(
+ 0,
+ 100,
+ 100L,
+ 0L,
+ 0L
+ ),
+ new OffHeapLoadingCache>(
+ 100,
+ 100L,
+ 0L,
+ 0L
+ )
+ );
+
+ mapper.registerSubtypes(MockDataFetcher.class);
+ mapper.registerSubtypes(LoadingLookupFactory.class);
+ Assert.assertEquals(
+ loadingLookupFactory,
+ mapper.reader(LookupExtractorFactory.class)
+ .readValue(mapper.writeValueAsString(loadingLookupFactory))
+ );
+ }
+
+
+ @JsonTypeName("mock")
+ private static class MockDataFetcher implements DataFetcher
+ {
+ @JsonCreator
+ public MockDataFetcher()
+ {
+ }
+
+ @Override
+ public Iterable fetchAll()
+ {
+ return Collections.emptyMap().entrySet();
+ }
+
+ @Override
+ public Object fetch(Object key)
+ {
+ return null;
+ }
+
+ @Override
+ public Iterable fetch(Iterable keys)
+ {
+ return null;
+ }
+
+ @Override
+ public List reverseFetchKeys(Object value)
+ {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ return obj instanceof MockDataFetcher;
+ }
+ }
+
+}
diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupTest.java
new file mode 100644
index 000000000000..9e648dd82dc0
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import io.druid.server.lookup.cache.loading.LoadingCache;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+public class LoadingLookupTest
+{
+ DataFetcher dataFetcher = EasyMock.createMock(DataFetcher.class);
+ LoadingCache lookupCache = EasyMock.createStrictMock(LoadingCache.class);
+ LoadingCache reverseLookupCache = EasyMock.createStrictMock(LoadingCache.class);
+ LoadingLookup loadingLookup = new LoadingLookup(dataFetcher, lookupCache, reverseLookupCache);
+
+ @Test
+ public void testApplyEmptyOrNull()
+ {
+ Assert.assertEquals(null, loadingLookup.apply(null));
+ Assert.assertEquals(null, loadingLookup.apply(""));
+ }
+
+ @Test
+ public void testUnapplyNull()
+ {
+ Assert.assertEquals(Collections.EMPTY_LIST, loadingLookup.unapply(null));
+ }
+
+ @Test
+ public void testApply() throws ExecutionException
+ {
+ EasyMock.expect(lookupCache.get(EasyMock.eq("key"), EasyMock.anyObject(Callable.class))).andReturn("value").once();
+ EasyMock.replay(lookupCache);
+ Assert.assertEquals(ImmutableMap.of("key","value"), loadingLookup.applyAll(ImmutableSet.of("key")));
+ EasyMock.verify(lookupCache);
+ }
+
+ @Test
+ public void testUnapplyAll() throws ExecutionException
+ {
+ EasyMock.expect(reverseLookupCache.get(EasyMock.eq("value"), EasyMock.anyObject(Callable.class)))
+ .andReturn(Lists.newArrayList("key"))
+ .once();
+ EasyMock.replay(reverseLookupCache);
+ Assert.assertEquals(ImmutableMap.of("value",Lists.newArrayList("key")), loadingLookup.unapplyAll(ImmutableSet.of("value")));
+ EasyMock.verify(reverseLookupCache);
+ }
+
+ @Test
+ public void testClose()
+ {
+ lookupCache.close();
+ reverseLookupCache.close();
+ EasyMock.replay(lookupCache, reverseLookupCache);
+ loadingLookup.close();
+ EasyMock.verify(lookupCache, reverseLookupCache);
+ }
+
+ @Test
+ public void testApplyWithExecutionError() throws ExecutionException
+ {
+ EasyMock.expect(lookupCache.get(EasyMock.eq("key"), EasyMock.anyObject(Callable.class)))
+ .andThrow(new ExecutionException(null))
+ .once();
+ EasyMock.replay(lookupCache);
+ Assert.assertEquals(null, loadingLookup.apply("key"));
+ EasyMock.verify(lookupCache);
+ }
+
+ @Test
+ public void testUnApplyWithExecutionError() throws ExecutionException
+ {
+ EasyMock.expect(reverseLookupCache.get(EasyMock.eq("value"), EasyMock.anyObject(Callable.class)))
+ .andThrow(new ExecutionException(null))
+ .once();
+ EasyMock.replay(reverseLookupCache);
+ Assert.assertEquals(Collections.EMPTY_LIST, loadingLookup.unapply("value"));
+ EasyMock.verify(reverseLookupCache);
+ }
+
+ @Test
+ public void testGetCacheKey()
+ {
+ Assert.assertFalse(Arrays.equals(loadingLookup.getCacheKey(), loadingLookup.getCacheKey()));
+ }
+}
diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupFactoryTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupFactoryTest.java
new file mode 100644
index 000000000000..e59c3cabff85
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupFactoryTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup;
+
+import org.easymock.EasyMock;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PollingLookupFactoryTest
+{
+
+ PollingLookup pollingLookup = EasyMock.createMock(PollingLookup.class);
+ PollingLookupFactory pollingLookupFactory = new PollingLookupFactory(Period.ZERO, null, null, pollingLookup);
+
+ @Test
+ public void testStart()
+ {
+ EasyMock.expect(pollingLookup.isOpen()).andReturn(true).once();
+ EasyMock.replay(pollingLookup);
+ Assert.assertTrue(pollingLookupFactory.start());
+ EasyMock.verify(pollingLookup);
+ }
+
+ @Test
+ public void testClose()
+ {
+ Assert.assertTrue(pollingLookupFactory.close());
+ }
+
+ @Test
+ public void testReplacesWithNull()
+ {
+ Assert.assertTrue(pollingLookupFactory.replaces(null));
+ }
+
+ @Test
+ public void testReplaces()
+ {
+ Assert.assertTrue(pollingLookupFactory.replaces(new PollingLookupFactory(
+ Period.millis(1),
+ null,
+ null,
+ pollingLookup
+ )));
+ }
+
+ @Test
+ public void testGet(){
+ Assert.assertEquals(pollingLookup, pollingLookupFactory.get());
+ }
+
+}
diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupSerDeserTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupSerDeserTest.java
new file mode 100644
index 000000000000..df994502ee49
--- /dev/null
+++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupSerDeserTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.query.lookup.LookupExtractorFactory;
+import io.druid.server.lookup.cache.polling.OffHeapPollingCache;
+import io.druid.server.lookup.cache.polling.OnHeapPollingCache;
+import io.druid.server.lookup.cache.polling.PollingCacheFactory;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+
+@RunWith(Parameterized.class)
+public class PollingLookupSerDeserTest
+{
+ @Parameterized.Parameters
+ public static Collection