From a294244dd61a81c86f2c17c0c4283102d618d989 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Fri, 12 Feb 2016 09:34:00 -0600 Subject: [PATCH] Cached lookup module. first cut implementing JDBC cache --- .../metadata/DefaultPasswordProvider.java | 21 ++ .../MetadataStorageConnectorConfig.java | 43 ++++ .../MetadataStorageConnectorConfigTest.java | 91 +++++-- .../extensions-core/druid-lookups.md | 129 ++++++++++ docs/content/development/extensions.md | 1 + .../JDBCExtractionNamespaceCacheFactory.java | 2 +- extensions-core/lookups-cached-single/pom.xml | 96 +++++++ .../io/druid/server/lookup/DataFetcher.java | 82 ++++++ .../io/druid/server/lookup/LoadingLookup.java | 148 +++++++++++ .../server/lookup/LoadingLookupFactory.java | 140 ++++++++++ .../server/lookup/LookupExtractionModule.java | 59 +++++ .../io/druid/server/lookup/PollingLookup.java | 243 ++++++++++++++++++ .../server/lookup/PollingLookupFactory.java | 156 +++++++++++ .../lookup/cache/loading/LoadingCache.java | 132 ++++++++++ .../cache/loading/LookupCacheStats.java | 189 ++++++++++++++ .../cache/loading/OffHeapLoadingCache.java | 238 +++++++++++++++++ .../cache/loading/OnHeapLoadingCache.java | 218 ++++++++++++++++ .../cache/polling/OffHeapPollingCache.java | 124 +++++++++ .../cache/polling/OnHeapPollingCache.java | 111 ++++++++ .../lookup/cache/polling/PollingCache.java | 45 ++++ .../cache/polling/PollingCacheFactory.java | 42 +++ .../server/lookup/jdbc/JdbcDataFetcher.java | 230 +++++++++++++++++ .../lookup/jdbc/KeyValueResultSetMapper.java | 47 ++++ .../druid/server/lookup/jdbc/QueryKeys.java | 47 ++++ .../io.druid.initialization.DruidModule | 22 ++ .../server/lookup/CacheRefKeeperTest.java | 64 +++++ .../lookup/LoadingLookupFactoryTest.java | 174 +++++++++++++ .../server/lookup/LoadingLookupTest.java | 112 ++++++++ .../lookup/PollingLookupFactoryTest.java | 70 +++++ .../lookup/PollingLookupSerDeserTest.java | 107 ++++++++ .../server/lookup/PollingLookupTest.java | 208 +++++++++++++++ .../cache/loading/LoadingCacheTest.java | 171 ++++++++++++ .../loading/OffHeapLoadingCacheTest.java | 37 +++ .../cache/loading/OnHeapLoadingCacheTest.java | 34 +++ .../lookup/jdbc/JdbcDataFetcherTest.java | 183 +++++++++++++ pom.xml | 1 + 36 files changed, 3796 insertions(+), 21 deletions(-) create mode 100644 docs/content/development/extensions-core/druid-lookups.md create mode 100644 extensions-core/lookups-cached-single/pom.xml create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/DataFetcher.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookup.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookupFactory.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LookupExtractionModule.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookup.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookupFactory.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LoadingCache.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LookupCacheStats.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OffHeapLoadingCache.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OnHeapLoadingCache.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OffHeapPollingCache.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OnHeapPollingCache.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCache.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCacheFactory.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/JdbcDataFetcher.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/KeyValueResultSetMapper.java create mode 100644 extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/QueryKeys.java create mode 100644 extensions-core/lookups-cached-single/src/main/resources/META-INF/services/io.druid.initialization.DruidModule create mode 100644 extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/CacheRefKeeperTest.java create mode 100644 extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupFactoryTest.java create mode 100644 extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupTest.java create mode 100644 extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupFactoryTest.java create mode 100644 extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupSerDeserTest.java create mode 100644 extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupTest.java create mode 100644 extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/cache/loading/LoadingCacheTest.java create mode 100644 extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/cache/loading/OffHeapLoadingCacheTest.java create mode 100644 extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/cache/loading/OnHeapLoadingCacheTest.java create mode 100644 extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/jdbc/JdbcDataFetcherTest.java diff --git a/common/src/main/java/io/druid/metadata/DefaultPasswordProvider.java b/common/src/main/java/io/druid/metadata/DefaultPasswordProvider.java index e6cfabe27bdc..8c15758b161e 100644 --- a/common/src/main/java/io/druid/metadata/DefaultPasswordProvider.java +++ b/common/src/main/java/io/druid/metadata/DefaultPasswordProvider.java @@ -49,5 +49,26 @@ public String getPassword() public String toString() { return this.getClass().getCanonicalName(); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof DefaultPasswordProvider)) { + return false; + } + + DefaultPasswordProvider that = (DefaultPasswordProvider) o; + + return getPassword() != null ? getPassword().equals(that.getPassword()) : that.getPassword() == null; + } + + @Override + public int hashCode() + { + return getPassword() != null ? getPassword().hashCode() : 0; + } } diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageConnectorConfig.java b/common/src/main/java/io/druid/metadata/MetadataStorageConnectorConfig.java index bcaeee5cd4ea..c837ceefb1af 100644 --- a/common/src/main/java/io/druid/metadata/MetadataStorageConnectorConfig.java +++ b/common/src/main/java/io/druid/metadata/MetadataStorageConnectorConfig.java @@ -86,4 +86,47 @@ public String toString() ", passwordProvider=" + passwordProvider + '}'; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof MetadataStorageConnectorConfig)) { + return false; + } + + MetadataStorageConnectorConfig that = (MetadataStorageConnectorConfig) o; + + if (isCreateTables() != that.isCreateTables()) { + return false; + } + if (getPort() != that.getPort()) { + return false; + } + if (getHost() != null ? !getHost().equals(that.getHost()) : that.getHost() != null) { + return false; + } + if (getConnectURI() != null ? !getConnectURI().equals(that.getConnectURI()) : that.getConnectURI() != null) { + return false; + } + if (getUser() != null ? !getUser().equals(that.getUser()) : that.getUser() != null) { + return false; + } + return passwordProvider != null ? passwordProvider.equals(that.passwordProvider) : that.passwordProvider == null; + + } + + @Override + public int hashCode() + { + int result = (isCreateTables() ? 1 : 0); + result = 31 * result + (getHost() != null ? getHost().hashCode() : 0); + result = 31 * result + getPort(); + result = 31 * result + (getConnectURI() != null ? getConnectURI().hashCode() : 0); + result = 31 * result + (getUser() != null ? getUser().hashCode() : 0); + result = 31 * result + (passwordProvider != null ? passwordProvider.hashCode() : 0); + return result; + } } diff --git a/common/src/test/java/io/druid/metadata/MetadataStorageConnectorConfigTest.java b/common/src/test/java/io/druid/metadata/MetadataStorageConnectorConfigTest.java index fd9751074859..8ad962fd9488 100644 --- a/common/src/test/java/io/druid/metadata/MetadataStorageConnectorConfigTest.java +++ b/common/src/test/java/io/druid/metadata/MetadataStorageConnectorConfigTest.java @@ -23,7 +23,55 @@ import org.junit.Assert; import org.junit.Test; -public class MetadataStorageConnectorConfigTest { +import java.io.IOException; + +public class MetadataStorageConnectorConfigTest +{ + + private MetadataStorageConnectorConfig createMetadataStorageConfig( + boolean createTables, + String host, + int port, + String connectURI, + String user, + String pwdString + ) + throws IOException + { + return jsonMapper.readValue( + "{" + + "\"createTables\": \"" + createTables + "\"," + + "\"host\": \"" + host + "\"," + + "\"port\": \"" + port + "\"," + + "\"connectURI\": \"" + connectURI + "\"," + + "\"user\": \"" + user + "\"," + + "\"password\": " + pwdString + + "}", + MetadataStorageConnectorConfig.class + ); + } + + @Test + public void testEquals() throws IOException + { + MetadataStorageConnectorConfig metadataStorageConnectorConfig = createMetadataStorageConfig( + true, + "testHost", + 4000, + "url", + "user", + "\"nothing\"" + ); + MetadataStorageConnectorConfig metadataStorageConnectorConfig2 = createMetadataStorageConfig( + true, + "testHost", + 4000, + "url", + "user", + "\"nothing\"" + ); + Assert.assertTrue(metadataStorageConnectorConfig.equals(metadataStorageConnectorConfig2)); + } private static final ObjectMapper jsonMapper = new ObjectMapper(); @@ -37,7 +85,8 @@ public void testMetadaStorageConnectionConfigSimplePassword() throws Exception "connectURI", "user", "\"nothing\"", - "nothing"); + "nothing" + ); } @Test @@ -50,29 +99,31 @@ public void testMetadaStorageConnectionConfigWithDefaultProviderPassword() throw "connectURI", "user", "{\"type\":\"default\",\"password\":\"nothing\"}", - "nothing"); + "nothing" + ); } private void testMetadataStorageConnectionConfig( - boolean createTables, - String host, - int port, - String connectURI, - String user, - String pwdString, - String pwd - ) throws Exception + boolean createTables, + String host, + int port, + String connectURI, + String user, + String pwdString, + String pwd + ) throws Exception { MetadataStorageConnectorConfig config = jsonMapper.readValue( - "{" + - "\"createTables\": \"" + createTables + "\"," + - "\"host\": \"" + host + "\"," + - "\"port\": \"" + port + "\"," + - "\"connectURI\": \"" + connectURI + "\"," + - "\"user\": \"" + user + "\"," + - "\"password\": " + pwdString + - "}", - MetadataStorageConnectorConfig.class); + "{" + + "\"createTables\": \"" + createTables + "\"," + + "\"host\": \"" + host + "\"," + + "\"port\": \"" + port + "\"," + + "\"connectURI\": \"" + connectURI + "\"," + + "\"user\": \"" + user + "\"," + + "\"password\": " + pwdString + + "}", + MetadataStorageConnectorConfig.class + ); Assert.assertEquals(host, config.getHost()); Assert.assertEquals(port, config.getPort()); diff --git a/docs/content/development/extensions-core/druid-lookups.md b/docs/content/development/extensions-core/druid-lookups.md new file mode 100644 index 000000000000..a0db2b13c8bb --- /dev/null +++ b/docs/content/development/extensions-core/druid-lookups.md @@ -0,0 +1,129 @@ +--- +layout: doc_page +--- +# Cached Lookup Module + +
Please note that this is an experimental module and the development/testing still at early stage. Feel free to try it and give us your feedback.
+ +## Description +This module provides a per-lookup caching mechanism for JDBC data sources. +The main goal of this cache is to speed up the access to a high latency lookup sources and to provide a caching isolation for every lookup source. +Thus user can define various caching strategies or and implementation per lookup, even if the source is the same. +This module can be used side to side with other lookup module like the global cached lookup module. + +To use this extension please make sure to [include](../../operations/including-extensions.html) `druid-lookups-cached-single` as an extension. + +## Architecture +Generally speaking this module can be divided into two main component, namely, the data fetcher layer and caching layer. + +### Data Fetcher layer + +First part is the data fetcher layer API `DataFetcher`, that exposes a set of fetch methods to fetch data from the actual Lookup dimension source. +For instance `JdbcDataFetcher` provides an implementation of `DataFetcher` that can be used to fetch key/value from a RDBMS via JDBC driver. +If you need new type of data fetcher, all you need to do, is to implement the interface `DataFetcher` and load it via another druid module. +### Caching layer + +This extension comes with two different caching strategies. First strategy is a poll based and the second is a load based. +#### Poll lookup cache + +The poll strategy cache strategy will fetch and swap all the pair of key/values periodically from the lookup source. +Hence, user should make sure that the cache can fit all the data. +The current implementation provides 2 type of poll cache, the first is onheap (uses immutable map), while the second uses MapBD based offheap map. +User can also implement a different lookup polling cache by implementing `PollingCacheFactory` and `PollingCache` interfaces. + +#### Loading lookup +Loading cache strategy will load the key\value pair upon request on the key it self, the general algorithm is load key if absent. +Once the key/value pair is loaded eviction will occur according to the cache eviction policy. +This module comes with two loading lookup implementation, the first is onheap backed by a Guava cache implementation, the second is MapDB offheap implementation. +Both implementations offer various eviction strategies. +Same for Loading cache, developer can implement a new type of loading cache by implementing `LookupLoadingCache` interface. + +## Configuration and Operation: + + +### Polling Lookup + +**Note that the current implementation of `offHeapPolling` and `onHeapPolling` will create two caches one to lookup value based on key and the other to reverse lookup the key from value** + +|Field|Type|Description|Required|default| +|-----|----|-----------|--------|-------| +|dataFetcher|Json object|Specifies the lookup data fetcher type to use in order to fetch data|yes|null| +|cacheFactory|Json Object|Cache factory implementation|no |onHeapPolling| +|pollPeriod|Period|polling period |no |null (poll once)| + + +##### Example of Polling On-heap Lookup +This example demonstrates a polling cache that will update its on-heap cache every 10 minutes +```json +{ + "type":"pollingLookup", + "pollPeriod":"PT10M", + "dataFetcher":{ "type":"jdbcDataFetcher", "connectorConfig":"jdbc://mysql://localhost:3306/my_data_base", "table":"lookup_table_name", "keyColumn":"key_column_name", "valueColumn": "value_column_name"}, + "cacheFactory":{"type":"onHeapPolling"} +} + +``` + +##### Example Polling Off-heap Lookup +This example demonstrates an off-heap lookup that will be cached once and never swapped `(pollPeriod == null)` + +```json +{ + "type":"pollingLookup", + "dataFetcher":{ "type":"jdbcDataFetcher", "connectorConfig":"jdbc://mysql://localhost:3306/my_data_base", "table":"lookup_table_name", "keyColumn":"key_column_name", "valueColumn": "value_column_name"}, + "cacheFactory":{"type":"offHeapPolling"} +} + +``` + + +### Loading lookup + +|Field|Type|Description|Required|default| +|-----|----|-----------|--------|-------| +|dataFetcher|Json object|Specifies the lookup data fetcher type to use in order to fetch data|yes|null| +|loadingCacheSpec|Json Object|Lookup cache spec implementation|yes |null| +|reverseLoadingCacheSpec|Json Object| Reverse lookup cache implementation|yes |null| + + +##### Example Loading On-heap Guava + +Guava cache configuration spec. + +|Field|Type|Description|Required|default| +|-----|----|-----------|--------|-------| +|concurrencyLevel|int|Allowed concurrency among update operations|no|4| +|initialCapacity|int|Initial capacity size|no |null| +|maximumSize|long| Specifies the maximum number of entries the cache may contain.|no |null (infinite capacity)| +|expireAfterAccess|long| Specifies the eviction time after last read in milliseconds.|no |null (No read-time-based eviction when set to null)| +|expireAfterWrite|long| Specifies the eviction time after last write in milliseconds.|no |null (No write-time-based eviction when set to null)| + +```json +{ + "type":"loadingLookup", + "dataFetcher":{ "type":"jdbcDataFetcher", "connectorConfig":"jdbc://mysql://localhost:3306/my_data_base", "table":"lookup_table_name", "keyColumn":"key_column_name", "valueColumn": "value_column_name"}, + "loadingCacheSpec":{"type":"guava"}, + "reverseLoadingCacheSpec":{"type":"guava", "maximumSize":500000, "expireAfterAccess":100000, "expireAfterAccess":10000} +} +``` + +##### Example Loading Off-heap MapDB + +Off heap cache is backed by [MapDB](http://www.mapdb.org/) implementation. MapDB is using direct memory as memory pool, please take that into account when limiting the JVM direct memory setup. + +|Field|Type|Description|Required|default| +|-----|----|-----------|--------|-------| +|maxStoreSize|double|maximal size of store in GB, if store is larger entries will start expiring|no |0| +|maxEntriesSize|long| Specifies the maximum number of entries the cache may contain.|no |0 (infinite capacity)| +|expireAfterAccess|long| Specifies the eviction time after last read in milliseconds.|no |0 (No read-time-based eviction when set to null)| +|expireAfterWrite|long| Specifies the eviction time after last write in milliseconds.|no |0 (No write-time-based eviction when set to null)| + + +```json +{ + "type":"loadingLookup", + "dataFetcher":{ "type":"jdbcDataFetcher", "connectorConfig":"jdbc://mysql://localhost:3306/my_data_base", "table":"lookup_table_name", "keyColumn":"key_column_name", "valueColumn": "value_column_name"}, + "loadingCacheSpec":{"type":"mapDb", "maxEntriesSize":100000}, + "reverseLoadingCacheSpec":{"type":"mapDb", "maxStoreSize":5, "expireAfterAccess":100000, "expireAfterAccess":10000} +} +``` diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 97ce5429c469..91ac0add648a 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -29,6 +29,7 @@ Core extensions are maintained by Druid committers. |druid-kafka-eight|Kafka ingest firehose (high level consumer).|[link](../development/extensions-core/kafka-eight-firehose.html)| |druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)| |druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)| +|druid-lookups-cached-single| Per lookup caching module to support the use cases where a lookup need to be isolated from the global pool of lookups |[link](../development/extensions-core/druid-lookups.html)| |druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)| |druid-stats|Statistics related module including variance and standard deviation.|[link](../development/extensions-core/stats.html)| |mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)| diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java index 7a7ad0abba82..58887aa0902b 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java @@ -169,6 +169,6 @@ public Timestamp withHandle(Handle handle) throws Exception } ); return update.getTime(); - } + } } diff --git a/extensions-core/lookups-cached-single/pom.xml b/extensions-core/lookups-cached-single/pom.xml new file mode 100644 index 000000000000..465ba4823687 --- /dev/null +++ b/extensions-core/lookups-cached-single/pom.xml @@ -0,0 +1,96 @@ + + + + + 4.0.0 + io.druid.extensions + druid-lookups-cached-single + druid-lookups-cached-single + Extension to rename Druid dimension values using lookups + + + io.druid + druid + 0.9.2-SNAPSHOT + ../../pom.xml + + + + + io.druid + druid-api + ${project.parent.version} + provided + + + io.druid + druid-processing + ${project.parent.version} + provided + + + io.druid + druid-server + ${project.parent.version} + provided + + + org.mapdb + mapdb + + + io.dropwizard + dropwizard-jdbi + 0.9.2 + + + org.antlr + stringtemplate + 3.2 + + + + junit + junit + test + + + io.druid + druid-server + ${project.parent.version} + test-jar + test + + + org.easymock + easymock + test + + + io.druid + druid-processing + ${project.parent.version} + test-jar + test + + + + diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/DataFetcher.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/DataFetcher.java new file mode 100644 index 000000000000..001d36d73cfb --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/DataFetcher.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package io.druid.server.lookup; + + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.druid.server.lookup.jdbc.JdbcDataFetcher; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Map; + +/** + * This class will be used to pull data from a given lookup table. + * + * @param Keys type + * @param Values type + */ + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "jdbcDataFetcher", value = JdbcDataFetcher.class) +}) +public interface DataFetcher +{ + /** + * Function used to fetch all the pair of key-values from the lookup + * One use case of this method is to populate a polling cache {@link io.druid.server.lookup.cache.polling.PollingCache} + * + * @return Returns an {@link Iterable} of key-value pairs. + */ + Iterable> fetchAll(); + + /** + * Function to perform a one item lookup. + * For instance this can be used by a loading cache to fetch the value in case of the cache doesn't have it + * + * @param key non-null key used to lookup the value + * + * @return Returns null if the key doesn't have an associated value, or the value if it exists + */ + @Nullable V fetch(K key); + + /** + * Function used to perform a bulk lookup + * + * @param keys used to lookup the respective values + * + * @return Returns a {@link Iterable} containing the pair of existing key-value. + */ + Iterable> fetch(Iterable keys); + + /** + * Function used to perform reverse lookup + * + * @param value use to fetch it's keys from the lookup table + * + * @return Returns a list of keys of the given {@code value} or an empty list {@link java.util.Collections.EmptyList} + */ + List reverseFetchKeys(V value); + +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookup.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookup.java new file mode 100644 index 000000000000..4a6b19b27ab1 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookup.java @@ -0,0 +1,148 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup; + + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.metamx.common.logger.Logger; +import io.druid.query.lookup.LookupExtractor; +import io.druid.server.lookup.cache.loading.LoadingCache; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Loading lookup will load the key\value pair upon request on the key it self, the general algorithm is load key if absent. + * Once the key/value pair is loaded eviction will occur according to the cache eviction policy. + * This module comes with two loading cache implementations, the first {@link io.druid.server.lookup.cache.loading.OnHeapLoadingCache}is onheap backed by a Guava cache implementation, the second {@link io.druid.server.lookup.cache.loading.OffHeapLoadingCache}is MapDB offheap implementation. + * Both implementations offer various eviction strategies. + */ +public class LoadingLookup extends LookupExtractor +{ + private static final Logger LOGGER = new Logger(LoadingLookup.class); + + private final DataFetcher dataFetcher; + private final LoadingCache loadingCache; + private final LoadingCache> reverseLoadingCache; + private final AtomicBoolean isOpen; + private final String id = Integer.toHexString(System.identityHashCode(this)); + + public LoadingLookup( + DataFetcher dataFetcher, + LoadingCache loadingCache, + LoadingCache> reverseLoadingCache + ) + { + this.dataFetcher = Preconditions.checkNotNull(dataFetcher, "lookup must have a DataFetcher"); + this.loadingCache = Preconditions.checkNotNull(loadingCache, "loading lookup need a cache"); + this.reverseLoadingCache = Preconditions.checkNotNull(reverseLoadingCache, "loading lookup need reverse cache"); + this.isOpen = new AtomicBoolean(true); + } + + + @Override + public String apply(final String key) + { + if (key == null) { + return null; + } + final String presentVal; + try { + presentVal = loadingCache.get(key, new applyCallable(key)); + return Strings.emptyToNull(presentVal); + } + catch (ExecutionException e) { + LOGGER.debug("value not found for key [%s]", key); + return null; + } + } + + @Override + public List unapply(final String value) + { + // null value maps to empty list + if (value == null) { + return Collections.EMPTY_LIST; + } + final List retList; + try { + retList = reverseLoadingCache.get(value, new unapplyCallable(value)); + return retList; + } + catch (ExecutionException e) { + LOGGER.debug("list of keys not found for value [%s]", value); + return Collections.EMPTY_LIST; + } + } + + public synchronized void close() + { + if (isOpen.getAndSet(false)) { + LOGGER.info("Closing loading cache [%s]", id); + loadingCache.close(); + reverseLoadingCache.close(); + } else { + LOGGER.info("Closing already closed lookup"); + return; + } + } + + public boolean isOpen() + { + return isOpen.get(); + } + + @Override + public byte[] getCacheKey() + { + return LookupExtractionModule.getRandomCacheKey(); + } + + private class applyCallable implements Callable + { + private final String key; + + public applyCallable(String key) {this.key = key;} + + @Override + public String call() throws Exception + { + // avoid returning null and return an empty string to cache it. + return Strings.nullToEmpty(dataFetcher.fetch(key)); + } + } + + private class unapplyCallable implements Callable> + { + private final String value; + + public unapplyCallable(String value) {this.value = value;} + + @Override + public List call() throws Exception + { + return dataFetcher.reverseFetchKeys(value); + } + } +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookupFactory.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookupFactory.java new file mode 100644 index 000000000000..5a9648789854 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LoadingLookupFactory.java @@ -0,0 +1,140 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.metamx.common.logger.Logger; +import io.druid.query.lookup.LookupExtractorFactory; +import io.druid.query.lookup.LookupIntrospectHandler; +import io.druid.server.lookup.cache.loading.LoadingCache; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +@JsonTypeName("loadingLookup") +public class LoadingLookupFactory implements LookupExtractorFactory +{ + private final static Logger LOGGER = new Logger(LoadingLookupFactory.class); + + @JsonProperty("dataFetcher") + private final DataFetcher dataFetcher; + + @JsonProperty("loadingCacheSpec") + private final LoadingCache loadingCache; + + @JsonProperty("reverseLoadingCacheSpec") + private final LoadingCache> reverseLoadingCache; + + private final String id = Integer.toHexString(System.identityHashCode(this)); + private final LoadingLookup loadingLookup; + private final AtomicBoolean started = new AtomicBoolean(false); + + public LoadingLookupFactory( + @JsonProperty("dataFetcher") DataFetcher dataFetcher, + @JsonProperty("loadingCacheSpec") LoadingCache loadingCache, + @JsonProperty("reverseLoadingCacheSpec") LoadingCache> reverseLoadingCache + ) + { + this(dataFetcher, loadingCache, reverseLoadingCache, new LoadingLookup(dataFetcher,loadingCache,reverseLoadingCache)); + } + + protected LoadingLookupFactory( + DataFetcher dataFetcher, + LoadingCache loadingCache, + LoadingCache> reverseLoadingCache, + LoadingLookup loadingLookup + ) + { + this.dataFetcher = Preconditions.checkNotNull(dataFetcher); + this.loadingCache = Preconditions.checkNotNull(loadingCache); + this.reverseLoadingCache = Preconditions.checkNotNull(reverseLoadingCache); + this.loadingLookup = loadingLookup; + } + + @Override + public synchronized boolean start() + { + if (!started.get()) { + started.set(loadingLookup.isOpen()); + LOGGER.info("created loading lookup with id [%s]", id); + } + return started.get(); + } + + @Override + public synchronized boolean close() + { + if (started.getAndSet(false)) { + LOGGER.info("closing loading lookup [%s]", id); + loadingLookup.close(); + } + return !started.get(); + } + + @Override + public boolean replaces( + @Nullable LookupExtractorFactory lookupExtractorFactory + ) + { + if (lookupExtractorFactory == null) return true; + return !this.equals(lookupExtractorFactory); + } + + @Nullable + @Override + public LookupIntrospectHandler getIntrospectHandler() + { + //not supported yet + return null; + } + + @Override + public LoadingLookup get() + { + return loadingLookup; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof LoadingLookupFactory)) { + return false; + } + + LoadingLookupFactory that = (LoadingLookupFactory) o; + + if (dataFetcher != null ? !dataFetcher.equals(that.dataFetcher) : that.dataFetcher != null) { + return false; + } + if (loadingCache != null ? !loadingCache.equals(that.loadingCache) : that.loadingCache != null) { + return false; + } + return reverseLoadingCache != null + ? reverseLoadingCache.equals(that.reverseLoadingCache) + : that.reverseLoadingCache == null; + + } +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LookupExtractionModule.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LookupExtractionModule.java new file mode 100644 index 000000000000..afffd8b97900 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/LookupExtractionModule.java @@ -0,0 +1,59 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.metamx.common.StringUtils; +import io.druid.initialization.DruidModule; + +import java.util.List; +import java.util.UUID; + +public class LookupExtractionModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("SingleCached-LoadingOrPolling-Lookup-Module") + { + @Override + public void setupModule(SetupContext context) + { + context.registerSubtypes(LoadingLookupFactory.class); + context.registerSubtypes(PollingLookupFactory.class); + } + } + ); + } + + @Override + public void configure(Binder binder) + { + } + + public static byte[] getRandomCacheKey() + { + return StringUtils.toUtf8(UUID.randomUUID().toString()); + } +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookup.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookup.java new file mode 100644 index 000000000000..20744eacabd6 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookup.java @@ -0,0 +1,243 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * / + */ + +package io.druid.server.lookup; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.concurrent.Execs; +import io.druid.query.lookup.LookupExtractor; +import io.druid.server.lookup.cache.polling.OnHeapPollingCache; +import io.druid.server.lookup.cache.polling.PollingCache; +import io.druid.server.lookup.cache.polling.PollingCacheFactory; + +import javax.validation.constraints.NotNull; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +public class PollingLookup extends LookupExtractor +{ + private static final Logger LOGGER = new Logger(PollingLookup.class); + + private final long pollPeriodMs; + + private final DataFetcher dataFetcher; + + private final PollingCacheFactory cacheFactory; + + private final AtomicReference refOfCacheKeeper = new AtomicReference<>(); + + + private final ListeningScheduledExecutorService scheduledExecutorService; + + private final AtomicBoolean isOpen = new AtomicBoolean(false); + + private final ListenableFuture pollFuture; + + private final String id = Integer.toHexString(System.identityHashCode(this)); + + public PollingLookup( + long pollPeriodMs, + DataFetcher dataFetcher, + PollingCacheFactory cacheFactory + ) + { + this.pollPeriodMs = pollPeriodMs; + this.dataFetcher = Preconditions.checkNotNull(dataFetcher); + this.cacheFactory = cacheFactory == null ? new OnHeapPollingCache.OnHeapPollingCacheProvider() : cacheFactory; + refOfCacheKeeper.set(new CacheRefKeeper(this.cacheFactory.makeOf(dataFetcher.fetchAll()))); + if (pollPeriodMs > 0) { + scheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor( + Execs.makeThreadFactory("PollingLookup-" + id, Thread.MIN_PRIORITY) + )); + pollFuture = scheduledExecutorService.scheduleWithFixedDelay( + pollAndSwap(), + pollPeriodMs, + pollPeriodMs, + TimeUnit.MILLISECONDS + ); + } else { + scheduledExecutorService = null; + pollFuture = null; + } + this.isOpen.set(true); + } + + + public void close() + { + LOGGER.info("Closing polling lookup [%s]", id); + synchronized (isOpen) { + isOpen.getAndSet(false); + if (pollFuture != null) { + pollFuture.cancel(true); + scheduledExecutorService.shutdown(); + } + CacheRefKeeper cacheRefKeeper = refOfCacheKeeper.getAndSet(null); + if (cacheRefKeeper != null) { + cacheRefKeeper.doneWithIt(); + } + } + } + + @Override + public String apply(@NotNull String key) + { + final CacheRefKeeper cacheRefKeeper = refOfCacheKeeper.get(); + if (cacheRefKeeper == null) { + throw new ISE("Cache reference is null WTF"); + } + final PollingCache cache = cacheRefKeeper.getAndIncrementRef(); + try { + if (cache == null) { + // it must've been closed after swapping while I was getting it. Try again. + return this.apply(key); + } + return Strings.emptyToNull((String) cache.get(key)); + } + finally { + if (cacheRefKeeper != null && cache != null) { + cacheRefKeeper.doneWithIt(); + } + } + } + + @Override + public List unapply(final String value) + { + CacheRefKeeper cacheRefKeeper = refOfCacheKeeper.get(); + if (cacheRefKeeper == null) { + throw new ISE("pollingLookup id [%s] is closed", id); + } + PollingCache cache = cacheRefKeeper.getAndIncrementRef(); + try { + if (cache == null) { + // it must've been closed after swapping while I was getting it. Try again. + return this.unapply(value); + } + return cache.getKeys(value); + } + finally { + if (cacheRefKeeper != null && cache != null) { + cacheRefKeeper.doneWithIt(); + } + } + } + + @Override + public byte[] getCacheKey() + { + return LookupExtractionModule.getRandomCacheKey(); + } + + private Runnable pollAndSwap() + { + return new Runnable() + { + @Override + public void run() + { + LOGGER.debug("Polling and swapping of PollingLookup [%s]", id); + CacheRefKeeper newCacheKeeper = new CacheRefKeeper(cacheFactory.makeOf(dataFetcher.fetchAll())); + CacheRefKeeper oldCacheKeeper = refOfCacheKeeper.getAndSet(newCacheKeeper); + if (oldCacheKeeper != null) { + oldCacheKeeper.doneWithIt(); + } + } + }; + } + + @Override + public int hashCode() + { + int result = (int) (pollPeriodMs ^ (pollPeriodMs >>> 32)); + result = 31 * result + dataFetcher.hashCode(); + result = 31 * result + cacheFactory.hashCode(); + return result; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof PollingLookup)) { + return false; + } + + PollingLookup that = (PollingLookup) o; + + if (pollPeriodMs != that.pollPeriodMs) { + return false; + } + if (!dataFetcher.equals(that.dataFetcher)) { + return false; + } + return cacheFactory.equals(that.cacheFactory); + + } + + public boolean isOpen() + { + return isOpen.get(); + } + + + protected static class CacheRefKeeper + { + private final PollingCache pollingCache; + private final AtomicLong refCounts = new AtomicLong(0L); + + CacheRefKeeper(PollingCache pollingCache) {this.pollingCache = pollingCache;} + + PollingCache getAndIncrementRef() + { + synchronized (refCounts) { + if (refCounts.get() < 0) { + return null; + } + refCounts.incrementAndGet(); + return pollingCache; + } + } + + void doneWithIt() + { + synchronized (refCounts) { + if (refCounts.get() == 0) { + pollingCache.close(); + } + refCounts.decrementAndGet(); + } + } + } + +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookupFactory.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookupFactory.java new file mode 100644 index 000000000000..1c93d61d9cd9 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/PollingLookupFactory.java @@ -0,0 +1,156 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.annotations.VisibleForTesting; +import com.metamx.common.logger.Logger; +import io.druid.query.lookup.LookupExtractorFactory; +import io.druid.query.lookup.LookupIntrospectHandler; +import io.druid.server.lookup.cache.polling.PollingCacheFactory; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.util.concurrent.atomic.AtomicBoolean; + +@JsonTypeName("pollingLookup") +public class PollingLookupFactory implements LookupExtractorFactory +{ + private static final Logger LOGGER = new Logger(PollingLookupFactory.class); + + @JsonProperty("pollPeriod") + private final Period pollPeriod; + + @JsonProperty("dataFetcher") + private final DataFetcher dataFetcher; + + @JsonProperty("cacheProvider") + private final PollingCacheFactory cacheFactory; + + private final PollingLookup pollingLookup; + + private final AtomicBoolean started = new AtomicBoolean(false); + + private final String id = Integer.toHexString(System.identityHashCode(this)); + + @JsonCreator + public PollingLookupFactory( + @JsonProperty("pollPeriod") Period pollPeriod, + @JsonProperty("dataFetcher") DataFetcher dataFetcher, + @JsonProperty("cacheFactory") PollingCacheFactory cacheFactory + ) + { + this.pollPeriod = pollPeriod == null ? Period.ZERO : pollPeriod; + this.dataFetcher = dataFetcher; + this.cacheFactory = cacheFactory; + this.pollingLookup = new PollingLookup(this.pollPeriod.getMillis(), this.dataFetcher, this.cacheFactory); + } + + @VisibleForTesting + protected PollingLookupFactory( + Period pollPeriod, + DataFetcher dataFetcher, + PollingCacheFactory pollingCacheFactory, + PollingLookup pollingLookup + ) + { + this.pollPeriod = pollPeriod; + this.dataFetcher = dataFetcher; + this.cacheFactory = pollingCacheFactory; + this.pollingLookup = pollingLookup; + } + + @Override + public boolean start() + { + synchronized (started) { + if (!started.get()) { + LOGGER.info("started polling lookup [%s]", id); + started.set(pollingLookup.isOpen()); + } + return started.get(); + } + } + + @Override + public boolean close() + { + synchronized (started) { + if (started.getAndSet(false)) { + LOGGER.info("closing polling lookup [%s]", id); + pollingLookup.close(); + } + return !started.get(); + } + } + + @Override + public boolean replaces( + @Nullable LookupExtractorFactory lookupExtractorFactory + ) + { + if (lookupExtractorFactory == null) { + return true; + } + + return !this.equals(lookupExtractorFactory); + } + + @Nullable + @Override + public LookupIntrospectHandler getIntrospectHandler() + { + //not supported yet + return null; + } + + @Override + public PollingLookup get() + { + return pollingLookup; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof PollingLookupFactory)) { + return false; + } + + PollingLookupFactory that = (PollingLookupFactory) o; + + if (pollPeriod != null ? !pollPeriod.equals(that.pollPeriod) : that.pollPeriod != null) { + return false; + } + if (dataFetcher != null ? !dataFetcher.equals(that.dataFetcher) : that.dataFetcher != null) { + return false; + } + return cacheFactory != null + ? (that.cacheFactory != null + && cacheFactory.getClass() == (that.cacheFactory).getClass()) + : that.cacheFactory == null; + + } +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LoadingCache.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LoadingCache.java new file mode 100644 index 000000000000..2abde26b6f0d --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LoadingCache.java @@ -0,0 +1,132 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.cache.loading; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.cache.CacheLoader; +import com.google.common.util.concurrent.ExecutionError; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +/** + * A semi-persistent mapping from keys to values. Cache entries are added using + * {@link #get(Object, Callable)} and stored in the cache until either evicted or manually invalidated. + *

+ *

Implementations of this interface are expected to be thread-safe, and can be safely accessed + * by multiple concurrent threads. + * + * This interface borrows ideas (and in some cases methods and javadoc) from Guava and JCache cache interface. + * Thanks Guava and JSR ! + * We elected to make this as close as possible to JSR API like that users can build bridges between all the existing implementations of JSR. + */ + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "guava", value = OnHeapLoadingCache.class), + @JsonSubTypes.Type(name = "mapDb", value = OffHeapLoadingCache.class) +}) +public interface LoadingCache extends Closeable +{ + /** + * @param key must not be null + * Returns the value associated with {@code key} in this cache, or {@code null} if there is no + * cached value for {@code key}. + * a cache miss should be counted if the key is missing. + */ + @Nullable + V getIfPresent(K key); + + /** + * Returns a map of the values associated with {@code keys} in this cache. The returned map will + * only contain entries which are already present in the cache. + */ + + Map getAllPresent(Iterable keys); + + /** + * Returns the value associated with {@code key} in this cache, obtaining that value from + * {@code valueLoader} if necessary. No observable state associated with this cache is modified + * until loading completes. + * + *

Warning: as with {@link CacheLoader#load}, {@code valueLoader} must not return + * {@code null}; it may either return a non-null value or throw an exception. + * + * @throws ExecutionException if a checked exception was thrown while loading the value + * @throws UncheckedExecutionException if an unchecked exception was thrown while loading the + * value + * @throws ExecutionError if an error was thrown while loading the value + * + */ + V get(K key, Callable valueLoader) throws ExecutionException; + + /** + * Copies all of the mappings from the specified map to the cache. This method is used for bulk put. + * + * @throws IllegalStateException if the cache is {@link #isClosed()} + */ + + void putAll(Map m); + + /** + * Discards any cached value for key {@code key}. Eviction can be lazy or eager. + * + * @throws IllegalStateException if the cache is {@link #isClosed()} + */ + void invalidate(K key); + + /** + * Discards any cached values for keys {@code keys}. Eviction can be lazy or eager. + * + * @throws IllegalStateException if the cache is {@link #isClosed()} + */ + void invalidateAll(Iterable keys); + + /** + * Clears the contents of the cache. + * + * @throws IllegalStateException if the cache is {@link #isClosed()} + */ + void invalidateAll(); + + /** + * @return Stats of the cache. + * + * @throws IllegalStateException if the cache is {@link #isClosed()} + */ + LookupCacheStats getStats(); + + /** + * @return true if the Cache is closed + */ + boolean isClosed(); + + /** + * Clean the used resources of the cache. Still not sure about cache lifecycle but as an initial design + * the namespace deletion event should call this method to clean up resources. + */ + + void close(); +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LookupCacheStats.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LookupCacheStats.java new file mode 100644 index 000000000000..cf9335a76112 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/LookupCacheStats.java @@ -0,0 +1,189 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.cache.loading; + + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Statistics about the performance of a {@link LoadingCache}. Instances of this class are immutable. + * Cache statistics are incremented according to the following rules: + *

    + *
  • When a cache lookup encounters an existing cache entry {@code hitCount} is incremented. + *
  • When a cache lookup first encounters a missing cache entry {@code missCount} is incremented. + *
  • When an entry is evicted from the cache, {@code evictionCount} is incremented. + *
+ */ +public class LookupCacheStats +{ + private final long hitCount; + private final long missCount; + private final long evictionCount; + + /** + * Constructs a new {@code CacheStats} instance. + */ + public LookupCacheStats( + long hitCount, long missCount, + long evictionCount + ) + { + checkArgument(hitCount >= 0); + checkArgument(missCount >= 0); + checkArgument(evictionCount >= 0); + + this.hitCount = hitCount; + this.missCount = missCount; + this.evictionCount = evictionCount; + } + + /** + * Returns the number of times {@link LoadingCache} lookup methods have returned either a cached or + * uncached value. This is defined as {@code hitCount + missCount}. + */ + public long requestCount() + { + return hitCount + missCount; + } + + /** + * Returns the number of times {@link LoadingCache} lookup methods have returned a cached value. + */ + public long hitCount() + { + return hitCount; + } + + /** + * Returns the ratio of cache requests which were hits. This is defined as + * {@code hitCount / requestCount}, or {@code 1.0} when {@code requestCount == 0}. + * Note that {@code hitRate + missRate =~ 1.0}. + */ + public double hitRate() + { + long requestCount = requestCount(); + return (requestCount == 0) ? 1.0 : (double) hitCount / requestCount; + } + + /** + * Returns the number of times {@link LoadingCache} lookup methods have returned an uncached (newly + * loaded) value, or null. Multiple concurrent calls to {@link LoadingCache} lookup methods on an absent + * value can result in multiple misses, all returning the results of a single cache load + * operation. + */ + public long missCount() + { + return missCount; + } + + /** + * Returns the ratio of cache requests which were misses. This is defined as + * {@code missCount / requestCount}, or {@code 0.0} when {@code requestCount == 0}. + * Note that {@code hitRate + missRate =~ 1.0}. Cache misses include all requests which + * weren't cache hits, including requests which resulted in either successful or failed loading + * attempts, and requests which waited for other threads to finish loading. It is thus the case + * that {@code missCount >= loadSuccessCount + loadExceptionCount}. Multiple + * concurrent misses for the same key will result in a single load operation. + */ + public double missRate() + { + long requestCount = requestCount(); + return (requestCount == 0) ? 0.0 : (double) missCount / requestCount; + } + + + /** + * Returns the number of times an entry has been evicted. This count does not include manual + * {@linkplain LoadingCache#invalidate invalidations}. + */ + public long evictionCount() + { + return evictionCount; + } + + /** + * Returns a new {@code CacheStats} representing the difference between this {@code CacheStats} + * and {@code other}. Negative values, which aren't supported by {@code CacheStats} will be + * rounded up to zero. + */ + public LookupCacheStats minus(LookupCacheStats other) + { + return new LookupCacheStats( + Math.max(0, hitCount - other.hitCount), + Math.max(0, missCount - other.missCount), + Math.max(0, evictionCount - other.evictionCount) + ); + } + + /** + * Returns a new {@code CacheStats} representing the sum of this {@code CacheStats} + * and {@code other}. + */ + public LookupCacheStats plus(LookupCacheStats other) + { + return new LookupCacheStats( + hitCount + other.hitCount, + missCount + other.missCount, + evictionCount + other.evictionCount + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof LookupCacheStats)) { + return false; + } + + LookupCacheStats that = (LookupCacheStats) o; + + if (hitCount != that.hitCount) { + return false; + } + if (missCount != that.missCount) { + return false; + } + + return evictionCount == that.evictionCount; + + } + + @Override + public int hashCode() + { + int result = (int) (hitCount ^ (hitCount >>> 32)); + result = 31 * result + (int) (missCount ^ (missCount >>> 32)); + result = 31 * result + (int) (evictionCount ^ (evictionCount >>> 32)); + return result; + } + + @Override + public String toString() + { + return "LookupCacheStats{" + + "hitCount=" + hitCount + + ", missCount=" + missCount + + ", evictionCount=" + evictionCount + + '}'; + } +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OffHeapLoadingCache.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OffHeapLoadingCache.java new file mode 100644 index 000000000000..9bf22c5ab3ca --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OffHeapLoadingCache.java @@ -0,0 +1,238 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.cache.loading; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.ISE; +import org.mapdb.Bind; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.HTreeMap; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class OffHeapLoadingCache implements LoadingCache +{ + private static final DB DB = DBMaker.newMemoryDirectDB().transactionDisable().closeOnJvmShutdown().make(); + + private final HTreeMap cache; + private final AtomicLong hitCount = new AtomicLong(0); + private final AtomicLong missCount = new AtomicLong(0); + private final AtomicLong evictionCount = new AtomicLong(0); + private final AtomicBoolean closed = new AtomicBoolean(true); + + /** + * Sets store size limit. Disk or memory space consumed be storage should not grow over this space. + * maximal size of store in GB, if store is larger entries will start expiring + */ + @JsonProperty + private final double maxStoreSize; + + /** + * Sets maximal number of entries in this map. + * Less used entries will be expired and removed to make collection smaller + */ + @JsonProperty + private final long maxEntriesSize; + + /** + * Specifies that each entry should be automatically removed from the map once a fixed duration has elapsed after the entry's creation, or the most recent replacement of its value. + */ + @JsonProperty + private final long expireAfterWrite; + + /** + * Specifies that each entry should be automatically removed from the map once a fixed duration has elapsed after the entry's creation, the most recent replacement of its value, or its last access. Access time is reset by all map read and write operations + */ + @JsonProperty + private final long expireAfterAccess; + + private final String name = UUID.randomUUID().toString(); + + @JsonCreator + public OffHeapLoadingCache( + @JsonProperty("maxStoreSize") double maxStoreSize, + @JsonProperty("maxEntriesSize") long maxEntriesSize, + @JsonProperty("expireAfterWrite") long expireAfterWrite, + @JsonProperty("expireAfterAccess") long expireAfterAccess + ) + { + this.maxStoreSize = maxStoreSize; + this.maxEntriesSize = maxEntriesSize; + this.expireAfterWrite = expireAfterWrite; + this.expireAfterAccess = expireAfterAccess; + this.cache = DB.createHashMap(name) + .expireStoreSize(this.maxStoreSize) + .expireAfterWrite(this.expireAfterWrite, TimeUnit.MILLISECONDS) + .expireAfterAccess(this.expireAfterAccess, TimeUnit.MILLISECONDS) + .expireMaxSize(this.maxEntriesSize) + .make(); + cache.modificationListenerAdd(new Bind.MapListener() + { + @Override + public void update(K key, V oldVal, V newVal) + { + if (oldVal != null && newVal == null) { + // eviction or remove call + evictionCount.getAndIncrement(); + } + } + }); + this.closed.set(false); + } + + @Override + public V getIfPresent(K key) + { + V value = cache.get(key); + if (value == null) { + missCount.getAndIncrement(); + } else { + hitCount.getAndIncrement(); + } + return value; + } + + @Override + public Map getAllPresent(final Iterable keys) + { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (K key : keys + ) { + V value = getIfPresent(key); + if (value != null) { + builder.put(key, value); + } + } + return builder.build(); + } + + @Override + public V get(K key, final Callable valueLoader) throws ExecutionException + { + synchronized (key) { + V value = cache.get(key); + if (value != null) { + return value; + } + try { + value = valueLoader.call(); + cache.put(key, value); + return value; + } + catch (Exception e) { + throw new ISE(e, "got an exception while loading key [%s]", key); + } + } + } + + + @Override + public void putAll(Map m) + { + cache.putAll(m); + } + + @Override + public void invalidate(K key) + { + cache.remove(key); + } + + @Override + public void invalidateAll(Iterable keys) + { + for (K key : keys) { + invalidate(key); + } + } + + @Override + public void invalidateAll() + { + cache.clear(); + } + + @Override + public LookupCacheStats getStats() + { + return new LookupCacheStats(hitCount.get(), missCount.get(), evictionCount.get()); + } + + @Override + public boolean isClosed() + { + return closed.get(); + } + + @Override + public void close() + { + if (!closed.getAndSet(true)) { + DB.delete(String.valueOf(name)); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof OffHeapLoadingCache)) { + return false; + } + + OffHeapLoadingCache that = (OffHeapLoadingCache) o; + + if (Double.compare(that.maxStoreSize, maxStoreSize) != 0) { + return false; + } + if (maxEntriesSize != that.maxEntriesSize) { + return false; + } + if (expireAfterWrite != that.expireAfterWrite) { + return false; + } + return expireAfterAccess == that.expireAfterAccess; + + } + + @Override + public int hashCode() + { + int result; + long temp; + temp = Double.doubleToLongBits(maxStoreSize); + result = (int) (temp ^ (temp >>> 32)); + result = 31 * result + (int) (maxEntriesSize ^ (maxEntriesSize >>> 32)); + result = 31 * result + (int) (expireAfterWrite ^ (expireAfterWrite >>> 32)); + result = 31 * result + (int) (expireAfterAccess ^ (expireAfterAccess >>> 32)); + return result; + } +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OnHeapLoadingCache.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OnHeapLoadingCache.java new file mode 100644 index 000000000000..9fd8fb3aab81 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/loading/OnHeapLoadingCache.java @@ -0,0 +1,218 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * / + */ + +package io.druid.server.lookup.cache.loading; + + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.metamx.common.logger.Logger; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + + +public class OnHeapLoadingCache implements LoadingCache +{ + private final static Logger log = new Logger(OnHeapLoadingCache.class); + private static final int DEFAULT_INITIAL_CAPACITY = 16; + //See com.google.common.cache.CacheBuilder#DEFAULT_CONCURRENCY_LEVEL + private static final int DEFAULT_CONCURRENCY_LEVEL = 4; + + private final Cache cache; + private final AtomicBoolean isClosed = new AtomicBoolean(false); + @JsonProperty + private final int concurrencyLevel; + @JsonProperty + private final int initialCapacity; + @JsonProperty + private final Long maximumSize; + @JsonProperty + private final Long expireAfterAccess; + @JsonProperty + private final Long expireAfterWrite; + + + /** + * @param concurrencyLevel default to {@code DEFAULT_CONCURRENCY_LEVEL} + * @param initialCapacity default to {@code DEFAULT_INITIAL_CAPACITY} + * @param maximumSize Max number of entries that the cache can hold, When set to zero, elements will be evicted immediately after being loaded into the + * cache. + * When set to null, cache maximum size is infinity + * @param expireAfterAccess Specifies that each entry should be automatically removed from the cache once a fixed duration + * has elapsed after the entry's creation, the most recent replacement of its value, or its last + * access. Access time is reset by all cache read and write operations. + * No read-time-based eviction when set to null. + * @param expireAfterWrite Specifies that each entry should be automatically removed from the cache once a fixed duration + * has elapsed after the entry's creation, or the most recent replacement of its value. + * No write-time-based eviction when set to null. + */ + @JsonCreator + public OnHeapLoadingCache( + @JsonProperty("concurrencyLevel") int concurrencyLevel, + @JsonProperty("initialCapacity") int initialCapacity, + @JsonProperty("maximumSize") Long maximumSize, + @JsonProperty("expireAfterAccess") Long expireAfterAccess, + @JsonProperty("expireAfterWrite") Long expireAfterWrite + ) + { + this.concurrencyLevel = concurrencyLevel <= 0 ? DEFAULT_CONCURRENCY_LEVEL : concurrencyLevel; + this.initialCapacity = initialCapacity <= 0 ? DEFAULT_INITIAL_CAPACITY : initialCapacity; + this.maximumSize = maximumSize; + this.expireAfterAccess = expireAfterAccess; + this.expireAfterWrite = expireAfterWrite; + CacheBuilder builder = CacheBuilder.newBuilder() + .concurrencyLevel(this.concurrencyLevel) + .initialCapacity(this.initialCapacity) + .recordStats(); + if (this.expireAfterAccess != null) { + builder.expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS); + } + if (this.expireAfterWrite != null) { + builder.expireAfterWrite(this.expireAfterWrite, TimeUnit.MILLISECONDS); + } + if (this.maximumSize != null) { + builder.maximumSize(this.maximumSize); + } + + this.cache = builder.build(); + + if (isClosed.getAndSet(false)) { + log.info("Guava Based OnHeapCache started with spec [%s]", cache.toString()); + } + } + + + @Override + public V getIfPresent(K key) + { + return cache.getIfPresent(key); + } + + @Override + public void putAll(Map m) + { + cache.putAll(m); + } + + + @Override + public Map getAllPresent(Iterable keys) + { + return cache.getAllPresent(keys); + } + + @Override + public V get(K key, Callable valueLoader) throws ExecutionException + { + return cache.get(key, valueLoader); + } + + @Override + public void invalidate(K key) + { + cache.invalidate(key); + } + + @Override + public void invalidateAll(Iterable keys) + { + cache.invalidateAll(keys); + } + + @Override + public void invalidateAll() + { + cache.invalidateAll(); + cache.cleanUp(); + } + + @Override + public LookupCacheStats getStats() + { + return new LookupCacheStats( + cache.stats().hitCount(), + cache.stats().missCount(), + cache.stats().evictionCount() + ); + } + + @Override + public boolean isClosed() + { + return isClosed.get(); + } + + @Override + public void close() + { + if (!isClosed.getAndSet(true)) { + cache.cleanUp(); + } + } + + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof OnHeapLoadingCache)) { + return false; + } + + OnHeapLoadingCache that = (OnHeapLoadingCache) o; + + if (concurrencyLevel != that.concurrencyLevel) { + return false; + } + if (initialCapacity != that.initialCapacity) { + return false; + } + if (maximumSize != null ? !maximumSize.equals(that.maximumSize) : that.maximumSize != null) { + return false; + } + if (expireAfterAccess != null + ? !expireAfterAccess.equals(that.expireAfterAccess) + : that.expireAfterAccess != null) { + return false; + } + return expireAfterWrite != null ? expireAfterWrite.equals(that.expireAfterWrite) : that.expireAfterWrite == null; + + } + + @Override + public int hashCode() + { + int result = concurrencyLevel; + result = 31 * result + initialCapacity; + result = 31 * result + (maximumSize != null ? maximumSize.hashCode() : 0); + result = 31 * result + (expireAfterAccess != null ? expireAfterAccess.hashCode() : 0); + result = 31 * result + (expireAfterWrite != null ? expireAfterWrite.hashCode() : 0); + return result; + } +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OffHeapPollingCache.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OffHeapPollingCache.java new file mode 100644 index 000000000000..52025e633dd0 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OffHeapPollingCache.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * / + */ + +package io.druid.server.lookup.cache.polling; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.HTreeMap; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + + +public class OffHeapPollingCache implements PollingCache +{ + private static final DB DB = DBMaker.newMemoryDirectDB().transactionDisable().closeOnJvmShutdown().make(); + + private final HTreeMap mapCache; + private final HTreeMap> reverseCache; + private final AtomicBoolean started = new AtomicBoolean(false); + private final String cacheName; + private final String reverseCacheName; + + public OffHeapPollingCache(final Iterable> entries) + { + synchronized (started) { + this.cacheName = String.format("cache-%s", UUID.randomUUID()); + this.reverseCacheName = String.format("reverseCache-%s", UUID.randomUUID()); + mapCache = DB.createHashMap(cacheName).make(); + reverseCache = DB.createHashMap(reverseCacheName).make(); + ImmutableSet.Builder setOfValuesBuilder = ImmutableSet.builder(); + for (Map.Entry entry : entries) { + mapCache.put(entry.getKey(), entry.getValue()); + setOfValuesBuilder.add(entry.getValue()); + } + + final Set setOfValues = setOfValuesBuilder.build(); + reverseCache.putAll(Maps.asMap( + setOfValues, new Function>() + { + @Override + public List apply(final V input) + { + return Lists.newArrayList(Maps.filterKeys(mapCache, new Predicate() + { + @Override + public boolean apply(K key) + { + V retVal = mapCache.get(key); + if (retVal == null) { + return false; + } + return retVal.equals(input); + } + }).keySet()); + } + })); + started.getAndSet(true); + } + } + + @Override + public V get(K key) + { + return mapCache.get(key); + } + + @Override + public List getKeys(V value) + { + final List listOfKey = reverseCache.get(value); + if (listOfKey == null) { + return Collections.emptyList(); + } + return listOfKey; + } + + @Override + public void close() + { + synchronized (started) { + if (started.getAndSet(false)) { + DB.delete(cacheName); + DB.delete(reverseCacheName); + } + } + } + + public static class OffHeapPollingCacheProvider implements PollingCacheFactory + { + @Override + public PollingCache makeOf(Iterable> entries) + { + return new OffHeapPollingCache<>(entries); + } + } +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OnHeapPollingCache.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OnHeapPollingCache.java new file mode 100644 index 000000000000..c40534599a60 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/OnHeapPollingCache.java @@ -0,0 +1,111 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.cache.polling; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class OnHeapPollingCache implements PollingCache +{ + private final ImmutableMap immutableMap; + private final ImmutableMap> immutableReverseMap; + + + public OnHeapPollingCache(Iterable> entries) + { + + if (entries == null) { + immutableMap = ImmutableMap.of(); + immutableReverseMap = ImmutableMap.of(); + } else { + ImmutableSet.Builder setOfValuesBuilder = ImmutableSet.builder(); + ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); + for (Map.Entry entry: entries + ) { + setOfValuesBuilder.add(entry.getValue()); + mapBuilder.put(entry.getKey(), entry.getValue()); + } + final Set setOfValues = setOfValuesBuilder.build(); + immutableMap = mapBuilder.build(); + immutableReverseMap = ImmutableMap.copyOf(Maps.asMap( + setOfValues, new Function>() + { + @Override + public List apply(final V input) + { + return Lists.newArrayList(Maps.filterKeys(immutableMap, new Predicate() + { + @Override + public boolean apply(K key) + { + V retVal = immutableMap.get(key); + if (retVal == null) { + return false; + } + return retVal.equals(input); + } + }).keySet()); + } + })); + } + + } + + @Override + public V get(K key) + { + return immutableMap.get(key); + } + + @Override + public List getKeys(final V value) + { + final List listOfKeys = immutableReverseMap.get(value); + if (listOfKeys == null) { + return Collections.emptyList(); + } + return listOfKeys; + } + + @Override + public void close() + { + //noop + } + + + public static class OnHeapPollingCacheProvider implements PollingCacheFactory + { + @Override + public PollingCache makeOf(Iterable> entries) + { + return new OnHeapPollingCache(entries); + } + } +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCache.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCache.java new file mode 100644 index 000000000000..f9ac3414fea2 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCache.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.cache.polling; + +import java.util.List; + +public interface PollingCache +{ + /** + * @param key Given key to lookup its value from the cache. + * + * @return Returns the value associated to the {@code key} or {@code null} if no value exist. + */ + V get(K key); + + /** + * @param value Given value to reverse lookup its keys. + * + * @return Returns a {@link List} of keys associated to the given {@code value} otherwise {@link java.util.Collections.EmptyList} + */ + List getKeys(V value); + + /** + * close and clean the resources used by the cache + */ + void close(); + +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCacheFactory.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCacheFactory.java new file mode 100644 index 000000000000..18789351c100 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/cache/polling/PollingCacheFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.cache.polling; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import java.util.Map; + + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = OnHeapPollingCache.OnHeapPollingCacheProvider.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "onHeapPolling", value = OnHeapPollingCache.OnHeapPollingCacheProvider.class), + @JsonSubTypes.Type(name = "offHeapPolling", value = OffHeapPollingCache.OffHeapPollingCacheProvider.class) +}) +public interface PollingCacheFactory +{ + /** + * @param entries of keys and values used to populate the cache + * + * @return Returns a new {@link PollingCache} containing all the entries of {@code map} + */ + + PollingCache makeOf(Iterable> entries); +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/JdbcDataFetcher.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/JdbcDataFetcher.java new file mode 100644 index 000000000000..0c6b7b75eda9 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/JdbcDataFetcher.java @@ -0,0 +1,230 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * / + */ + +package io.druid.server.lookup.jdbc; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.metamx.common.logger.Logger; +import io.druid.metadata.MetadataStorageConnectorConfig; +import io.druid.server.lookup.DataFetcher; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.TransactionCallback; +import org.skife.jdbi.v2.TransactionStatus; +import org.skife.jdbi.v2.tweak.HandleCallback; +import org.skife.jdbi.v2.util.StringMapper; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; + +public class JdbcDataFetcher implements DataFetcher +{ + private static final Logger LOGGER = new Logger(JdbcDataFetcher.class); + private static final int DEFAULT_STREAMING_FETCH_SIZE = 1000; + + @JsonProperty + private final MetadataStorageConnectorConfig connectorConfig; + @JsonProperty + private final String table; + @JsonProperty + private final String keyColumn; + @JsonProperty + private final String valueColumn; + @JsonProperty + private final int streamingFetchSize; + + private final String fetchAllQuery; + private final String fetchQuery; + private final String reverseFetchQuery; + private final DBI dbi; + + public JdbcDataFetcher( + @JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig, + @JsonProperty("table") String table, + @JsonProperty("keyColumn") String keyColumn, + @JsonProperty("valueColumn") String valueColumn, + @JsonProperty("streamingFetchSize") Integer streamingFetchSize + ) + { + this.connectorConfig = Preconditions.checkNotNull(connectorConfig, "connectorConfig"); + this.streamingFetchSize = streamingFetchSize == null ? DEFAULT_STREAMING_FETCH_SIZE : streamingFetchSize; + Preconditions.checkNotNull(connectorConfig.getConnectURI(), "connectorConfig.connectURI"); + this.table = Preconditions.checkNotNull(table, "table"); + this.keyColumn = Preconditions.checkNotNull(keyColumn, "keyColumn"); + this.valueColumn = Preconditions.checkNotNull(valueColumn, "valueColumn"); + + this.fetchAllQuery = String.format( + "SELECT %s, %s FROM %s", + this.keyColumn, + this.valueColumn, + this.table + ); + this.fetchQuery = String.format( + "SELECT %s FROM %s WHERE %s = :val", + this.valueColumn, + this.table, + this.keyColumn + ); + this.reverseFetchQuery = String.format( + "SELECT %s FROM %s WHERE %s = :val", + this.keyColumn, + this.table, + this.valueColumn + ); + dbi = new DBI( + connectorConfig.getConnectURI(), + connectorConfig.getUser(), + connectorConfig.getPassword() + ); + dbi.registerMapper(new KeyValueResultSetMapper(keyColumn, valueColumn)); + } + + @Override + public Iterable> fetchAll() + { + return inReadOnlyTransaction(new TransactionCallback>>() + { + @Override + public List> inTransaction( + Handle handle, + TransactionStatus status + ) throws Exception + { + return handle.createQuery(fetchAllQuery) + .setFetchSize(streamingFetchSize) + .map(new KeyValueResultSetMapper(keyColumn, valueColumn)) + .list(); + } + + } + ); + } + + @Override + public String fetch(final String key) + { + List pairs = inReadOnlyTransaction( + new TransactionCallback>() + { + @Override + public List inTransaction(Handle handle, TransactionStatus status) throws Exception + { + return handle.createQuery(fetchQuery) + .bind("val", key) + .map(StringMapper.FIRST) + .list(); + } + } + ); + if (pairs.isEmpty()) { + return null; + } + return Strings.nullToEmpty(pairs.get(0)); + } + + @Override + public Iterable> fetch(final Iterable keys) + { + QueryKeys queryKeys = dbi.onDemand(QueryKeys.class); + return queryKeys.findNamesForIds(Lists.newArrayList(keys), table, keyColumn, valueColumn); + } + + @Override + public List reverseFetchKeys(final String value) + { + List results = inReadOnlyTransaction(new TransactionCallback>() + { + @Override + public List inTransaction(Handle handle, TransactionStatus status) throws Exception + { + return handle.createQuery(reverseFetchQuery) + .bind("val", value) + .map(StringMapper.FIRST) + .list(); + } + }); + return results; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof JdbcDataFetcher)) { + return false; + } + + JdbcDataFetcher that = (JdbcDataFetcher) o; + + if (!connectorConfig.equals(that.connectorConfig)) { + return false; + } + if (!table.equals(that.table)) { + return false; + } + if (!keyColumn.equals(that.keyColumn)) { + return false; + } + return valueColumn.equals(that.valueColumn); + + } + + private DBI getDbi() + { + return dbi; + } + + private T inReadOnlyTransaction(final TransactionCallback callback) + { + return getDbi().withHandle( + new HandleCallback() + { + @Override + public T withHandle(Handle handle) throws Exception + { + final Connection connection = handle.getConnection(); + final boolean readOnly = connection.isReadOnly(); + connection.setReadOnly(true); + try { + return handle.inTransaction(callback); + } + finally { + try { + connection.setReadOnly(readOnly); + } + catch (SQLException e) { + // at least try to log it so we don't swallow exceptions + LOGGER.error(e, "Unable to reset connection read-only state"); + } + } + } + } + ); + } + +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/KeyValueResultSetMapper.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/KeyValueResultSetMapper.java new file mode 100644 index 000000000000..14d27939ad29 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/KeyValueResultSetMapper.java @@ -0,0 +1,47 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.jdbc; + +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.ResultSetMapper; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.AbstractMap; +import java.util.Map; + + +public class KeyValueResultSetMapper implements ResultSetMapper> +{ + private final String keyColumn; + private final String valueColumn; + + public KeyValueResultSetMapper(String keyColumn, String valueColumn) + { + this.keyColumn = keyColumn; + this.valueColumn = valueColumn; + } + + @Override + public Map.Entry map(int index, ResultSet r, StatementContext ctx) throws SQLException + { + return new AbstractMap.SimpleEntry<>(r.getString(keyColumn), r.getString(valueColumn)); + } +} diff --git a/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/QueryKeys.java b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/QueryKeys.java new file mode 100644 index 000000000000..dce03904dd3b --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/java/io/druid/server/lookup/jdbc/QueryKeys.java @@ -0,0 +1,47 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.jdbc; + + +import com.google.common.collect.ImmutableSet; +import io.dropwizard.jdbi.ImmutableSetContainerFactory; +import org.skife.jdbi.v2.sqlobject.SqlQuery; +import org.skife.jdbi.v2.sqlobject.customizers.Define; +import org.skife.jdbi.v2.sqlobject.customizers.RegisterContainerMapper; +import org.skife.jdbi.v2.sqlobject.stringtemplate.UseStringTemplate3StatementLocator; +import org.skife.jdbi.v2.unstable.BindIn; + +import java.util.List; +import java.util.Map; + + +@UseStringTemplate3StatementLocator() +@RegisterContainerMapper(ImmutableSetContainerFactory.class) +public interface QueryKeys +{ + @SqlQuery("SELECT , FROM WHERE IN ()") + ImmutableSet> findNamesForIds( + @BindIn("keys") List keys, + @Define("table") String table, + @Define("keyColumn") String keyColumn, + @Define("valueColumn") String valueColumn + ); +} + diff --git a/extensions-core/lookups-cached-single/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/lookups-cached-single/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 000000000000..09592acd62f8 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1,22 @@ +# +# /* +# * Licensed to Metamarkets Group Inc. (Metamarkets) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. Metamarkets licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, +# * software distributed under the License is distributed on an +# * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# * KIND, either express or implied. See the License for the +# * specific language governing permissions and limitations +# * under the License. +# */ +# + +io.druid.server.lookup.LookupExtractionModule diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/CacheRefKeeperTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/CacheRefKeeperTest.java new file mode 100644 index 000000000000..94f2927c8f1a --- /dev/null +++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/CacheRefKeeperTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup; + +import io.druid.server.lookup.cache.polling.PollingCache; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +public class CacheRefKeeperTest +{ + + @Test + public void testGet() + { + PollingCache mockPollingCache = EasyMock.createStrictMock(PollingCache.class); + PollingLookup.CacheRefKeeper cacheRefKeeper = new PollingLookup.CacheRefKeeper(mockPollingCache); + Assert.assertEquals(mockPollingCache, cacheRefKeeper.getAndIncrementRef()); + } + + @Test + public void testDoneWithIt() + { + PollingCache mockPollingCache = EasyMock.createStrictMock(PollingCache.class); + PollingLookup.CacheRefKeeper cacheRefKeeper = new PollingLookup.CacheRefKeeper(mockPollingCache); + mockPollingCache.close(); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.replay(mockPollingCache); + cacheRefKeeper.doneWithIt(); + EasyMock.verify(mockPollingCache); + } + + @Test + public void testGetAfterDoneWithIt() + { + PollingCache mockPollingCache = EasyMock.createStrictMock(PollingCache.class); + PollingLookup.CacheRefKeeper cacheRefKeeper = new PollingLookup.CacheRefKeeper(mockPollingCache); + mockPollingCache.close(); + EasyMock.expectLastCall().atLeastOnce(); + EasyMock.replay(mockPollingCache); + cacheRefKeeper.doneWithIt(); + EasyMock.verify(mockPollingCache); + Assert.assertEquals(null, cacheRefKeeper.getAndIncrementRef()); + Assert.assertEquals(null, cacheRefKeeper.getAndIncrementRef()); + } + +} diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupFactoryTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupFactoryTest.java new file mode 100644 index 000000000000..06e17b47f5f7 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupFactoryTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup; + + +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.query.lookup.LookupExtractorFactory; +import io.druid.segment.TestHelper; +import io.druid.server.lookup.cache.loading.LoadingCache; +import io.druid.server.lookup.cache.loading.OffHeapLoadingCache; +import io.druid.server.lookup.cache.loading.OnHeapLoadingCache; +import org.codehaus.jackson.annotate.JsonCreator; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +public class LoadingLookupFactoryTest +{ + DataFetcher dataFetcher = EasyMock.createMock(DataFetcher.class); + LoadingCache lookupCache = EasyMock.createStrictMock(LoadingCache.class); + LoadingCache reverseLookupCache = EasyMock.createStrictMock(LoadingCache.class); + LoadingLookup loadingLookup = EasyMock.createMock(LoadingLookup.class); + LoadingLookupFactory loadingLookupFactory = new LoadingLookupFactory( + dataFetcher, + lookupCache, + reverseLookupCache, + loadingLookup + ); + + @Test + public void testStartStop() + { + EasyMock.expect(loadingLookup.isOpen()).andReturn(true).once(); + loadingLookup.close(); + EasyMock.expectLastCall().once(); + EasyMock.replay(loadingLookup); + Assert.assertTrue(loadingLookupFactory.start()); + Assert.assertTrue(loadingLookupFactory.close()); + EasyMock.verify(loadingLookup); + + } + + @Test + public void testReplacesWithNull() + { + Assert.assertTrue(loadingLookupFactory.replaces(null)); + } + + @Test + public void testReplacesWithSame() + { + Assert.assertFalse(loadingLookupFactory.replaces(loadingLookupFactory)); + } + + @Test + public void testReplacesWithDifferent() + { + Assert.assertTrue(loadingLookupFactory.replaces(new LoadingLookupFactory( + EasyMock.createMock(DataFetcher.class), + lookupCache, + reverseLookupCache + ))); + Assert.assertTrue(loadingLookupFactory.replaces(new LoadingLookupFactory( + dataFetcher, + EasyMock.createMock(LoadingCache.class), + reverseLookupCache + ))); + Assert.assertTrue(loadingLookupFactory.replaces(new LoadingLookupFactory( + dataFetcher, + lookupCache, + EasyMock.createMock(LoadingCache.class) + ))); + } + + + @Test + public void testGet() + { + Assert.assertEquals(loadingLookup, loadingLookupFactory.get()); + } + + @Test + public void testSerDeser() throws IOException + { + ObjectMapper mapper = TestHelper.getObjectMapper(); + LoadingLookupFactory loadingLookupFactory = new LoadingLookupFactory( + new MockDataFetcher(), + new OnHeapLoadingCache( + 0, + 100, + 100L, + 0L, + 0L + ), + new OffHeapLoadingCache>( + 100, + 100L, + 0L, + 0L + ) + ); + + mapper.registerSubtypes(MockDataFetcher.class); + mapper.registerSubtypes(LoadingLookupFactory.class); + Assert.assertEquals( + loadingLookupFactory, + mapper.reader(LookupExtractorFactory.class) + .readValue(mapper.writeValueAsString(loadingLookupFactory)) + ); + } + + + @JsonTypeName("mock") + private static class MockDataFetcher implements DataFetcher + { + @JsonCreator + public MockDataFetcher() + { + } + + @Override + public Iterable fetchAll() + { + return Collections.emptyMap().entrySet(); + } + + @Override + public Object fetch(Object key) + { + return null; + } + + @Override + public Iterable fetch(Iterable keys) + { + return null; + } + + @Override + public List reverseFetchKeys(Object value) + { + return null; + } + + @Override + public boolean equals(Object obj) + { + return obj instanceof MockDataFetcher; + } + } + +} diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupTest.java new file mode 100644 index 000000000000..9e648dd82dc0 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/LoadingLookupTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import io.druid.server.lookup.cache.loading.LoadingCache; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +public class LoadingLookupTest +{ + DataFetcher dataFetcher = EasyMock.createMock(DataFetcher.class); + LoadingCache lookupCache = EasyMock.createStrictMock(LoadingCache.class); + LoadingCache reverseLookupCache = EasyMock.createStrictMock(LoadingCache.class); + LoadingLookup loadingLookup = new LoadingLookup(dataFetcher, lookupCache, reverseLookupCache); + + @Test + public void testApplyEmptyOrNull() + { + Assert.assertEquals(null, loadingLookup.apply(null)); + Assert.assertEquals(null, loadingLookup.apply("")); + } + + @Test + public void testUnapplyNull() + { + Assert.assertEquals(Collections.EMPTY_LIST, loadingLookup.unapply(null)); + } + + @Test + public void testApply() throws ExecutionException + { + EasyMock.expect(lookupCache.get(EasyMock.eq("key"), EasyMock.anyObject(Callable.class))).andReturn("value").once(); + EasyMock.replay(lookupCache); + Assert.assertEquals(ImmutableMap.of("key","value"), loadingLookup.applyAll(ImmutableSet.of("key"))); + EasyMock.verify(lookupCache); + } + + @Test + public void testUnapplyAll() throws ExecutionException + { + EasyMock.expect(reverseLookupCache.get(EasyMock.eq("value"), EasyMock.anyObject(Callable.class))) + .andReturn(Lists.newArrayList("key")) + .once(); + EasyMock.replay(reverseLookupCache); + Assert.assertEquals(ImmutableMap.of("value",Lists.newArrayList("key")), loadingLookup.unapplyAll(ImmutableSet.of("value"))); + EasyMock.verify(reverseLookupCache); + } + + @Test + public void testClose() + { + lookupCache.close(); + reverseLookupCache.close(); + EasyMock.replay(lookupCache, reverseLookupCache); + loadingLookup.close(); + EasyMock.verify(lookupCache, reverseLookupCache); + } + + @Test + public void testApplyWithExecutionError() throws ExecutionException + { + EasyMock.expect(lookupCache.get(EasyMock.eq("key"), EasyMock.anyObject(Callable.class))) + .andThrow(new ExecutionException(null)) + .once(); + EasyMock.replay(lookupCache); + Assert.assertEquals(null, loadingLookup.apply("key")); + EasyMock.verify(lookupCache); + } + + @Test + public void testUnApplyWithExecutionError() throws ExecutionException + { + EasyMock.expect(reverseLookupCache.get(EasyMock.eq("value"), EasyMock.anyObject(Callable.class))) + .andThrow(new ExecutionException(null)) + .once(); + EasyMock.replay(reverseLookupCache); + Assert.assertEquals(Collections.EMPTY_LIST, loadingLookup.unapply("value")); + EasyMock.verify(reverseLookupCache); + } + + @Test + public void testGetCacheKey() + { + Assert.assertFalse(Arrays.equals(loadingLookup.getCacheKey(), loadingLookup.getCacheKey())); + } +} diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupFactoryTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupFactoryTest.java new file mode 100644 index 000000000000..e59c3cabff85 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupFactoryTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup; + +import org.easymock.EasyMock; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; + +public class PollingLookupFactoryTest +{ + + PollingLookup pollingLookup = EasyMock.createMock(PollingLookup.class); + PollingLookupFactory pollingLookupFactory = new PollingLookupFactory(Period.ZERO, null, null, pollingLookup); + + @Test + public void testStart() + { + EasyMock.expect(pollingLookup.isOpen()).andReturn(true).once(); + EasyMock.replay(pollingLookup); + Assert.assertTrue(pollingLookupFactory.start()); + EasyMock.verify(pollingLookup); + } + + @Test + public void testClose() + { + Assert.assertTrue(pollingLookupFactory.close()); + } + + @Test + public void testReplacesWithNull() + { + Assert.assertTrue(pollingLookupFactory.replaces(null)); + } + + @Test + public void testReplaces() + { + Assert.assertTrue(pollingLookupFactory.replaces(new PollingLookupFactory( + Period.millis(1), + null, + null, + pollingLookup + ))); + } + + @Test + public void testGet(){ + Assert.assertEquals(pollingLookup, pollingLookupFactory.get()); + } + +} diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupSerDeserTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupSerDeserTest.java new file mode 100644 index 000000000000..df994502ee49 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupSerDeserTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.lookup.LookupExtractorFactory; +import io.druid.server.lookup.cache.polling.OffHeapPollingCache; +import io.druid.server.lookup.cache.polling.OnHeapPollingCache; +import io.druid.server.lookup.cache.polling.PollingCacheFactory; +import org.codehaus.jackson.annotate.JsonCreator; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + + +@RunWith(Parameterized.class) +public class PollingLookupSerDeserTest +{ + @Parameterized.Parameters + public static Collection inputData() + { + return Arrays.asList(new Object[][]{ + {new OffHeapPollingCache.OffHeapPollingCacheProvider()}, {new OnHeapPollingCache.OnHeapPollingCacheProvider<>()} + }); + } + + public PollingLookupSerDeserTest(PollingCacheFactory cacheFactory) {this.cacheFactory = cacheFactory;} + private final PollingCacheFactory cacheFactory ; + private DataFetcher dataFetcher = new MockDataFetcher(); + @Test + public void testSerDeser() throws IOException + { + ObjectMapper mapper = new DefaultObjectMapper(); + PollingLookupFactory pollingLookupFactory = new PollingLookupFactory(Period.ZERO, dataFetcher, cacheFactory); + mapper.registerSubtypes(MockDataFetcher.class); + mapper.registerSubtypes(PollingLookupFactory.class); + Assert.assertEquals(pollingLookupFactory, mapper.reader(LookupExtractorFactory.class).readValue(mapper.writeValueAsString(pollingLookupFactory))); + } + + @JsonTypeName("mock") + private static class MockDataFetcher implements DataFetcher + { + @JsonCreator + public MockDataFetcher() + { + } + + @Override + public Iterable> fetchAll() + { + return Collections.emptyMap().entrySet(); + } + + @Override + public Object fetch(Object key) + { + return null; + } + + @Override + public Iterable fetch(Iterable keys) + { + return null; + } + + @Override + public List reverseFetchKeys(Object value) + { + return null; + } + + @Override + public boolean equals(Object obj) + { + return obj instanceof MockDataFetcher; + } + } + +} diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupTest.java new file mode 100644 index 000000000000..f73562a7cc7b --- /dev/null +++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/PollingLookupTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Function; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.metamx.common.ISE; +import io.druid.query.lookup.LookupExtractor; +import io.druid.server.lookup.cache.polling.OffHeapPollingCache; +import io.druid.server.lookup.cache.polling.OnHeapPollingCache; +import io.druid.server.lookup.cache.polling.PollingCacheFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@RunWith(Parameterized.class) +public class PollingLookupTest +{ + private static final Map firstLookupMap = ImmutableMap.of( + "foo", "bar", + "bad", "bar", + "how about that", "foo", + "empty string", "" + ); + + private static final Map secondLookupMap = ImmutableMap.of( + "new-foo", "new-bar", + "new-bad", "new-bar" + ); + + private static final long POLL_PERIOD = 100L; + + @JsonTypeName("mock") + private static class MockDataFetcher implements DataFetcher + { + private int callNumber = 0; + @Override + public Iterable fetchAll() + { + if (callNumber == 0) {callNumber +=1; return firstLookupMap.entrySet();} + return secondLookupMap.entrySet(); + } + + @Nullable + @Override + public Object fetch(Object key) + { + return null; + } + + @Override + public Iterable fetch(Iterable keys) + { + return null; + } + + @Override + public List reverseFetchKeys(Object value) + { + return null; + } + + @Override + public boolean equals(Object obj) + { + return obj instanceof MockDataFetcher; + } + } + + @Parameterized.Parameters + public static Collection inputData() + { + return Arrays.asList(new Object[][]{ + {new OffHeapPollingCache.OffHeapPollingCacheProvider()}, + {new OnHeapPollingCache.OnHeapPollingCacheProvider<>()} + }); + } + + public PollingLookupTest(PollingCacheFactory pollingCacheFactory) {this.pollingCacheFactory = pollingCacheFactory;} + + private final PollingCacheFactory pollingCacheFactory; + private final DataFetcher dataFetcher = new MockDataFetcher(); + private PollingLookup pollingLookup; + + + @Before + public void setUp() throws InterruptedException + { + pollingLookup = new PollingLookup(POLL_PERIOD, dataFetcher, pollingCacheFactory); + } + + @After + public void tearDown() + { + if (pollingLookup != null) { + pollingLookup.close(); + } + pollingLookup = null; + } + + @Test(expected = ISE.class) + public void testClose() + { + pollingLookup.close(); + pollingLookup.apply("key"); + } + + @Test + public void testApply() throws InterruptedException + { + assertMapLookup(firstLookupMap, pollingLookup); + } + + @Test(timeout = POLL_PERIOD * 3) + public void testApplyAfterDataChange() throws InterruptedException + { + assertMapLookup(firstLookupMap, pollingLookup); + Thread.sleep(POLL_PERIOD * 2); + assertMapLookup(secondLookupMap, pollingLookup); + } + + @Test + public void testUnapply() + { + Assert.assertEquals( + "reverse lookup should match", + Sets.newHashSet("foo", "bad"), + Sets.newHashSet(pollingLookup.unapply("bar")) + ); + Assert.assertEquals( + "reverse lookup should match", + Sets.newHashSet("how about that"), + Sets.newHashSet(pollingLookup.unapply("foo")) + ); + Assert.assertEquals( + "reverse lookup should match", + Sets.newHashSet("empty string"), + Sets.newHashSet(pollingLookup.unapply("")) + ); + Assert.assertEquals( + "reverse lookup of none existing value should be empty list", + Collections.EMPTY_LIST, + pollingLookup.unapply("does't exist") + ); + } + + @Test + public void testBulkApply() + { + Map map = pollingLookup.applyAll(firstLookupMap.keySet()); + Assert.assertEquals(firstLookupMap, Maps.transformValues(map, new Function() + { + @Override + public String apply(String input) + { + //make sure to rewrite null strings as empty. + return Strings.nullToEmpty(input); + } + })); + } + + @Test + public void testGetCacheKey() + { + PollingLookup pollingLookup2 = new PollingLookup(1L, dataFetcher, pollingCacheFactory); + Assert.assertFalse(Arrays.equals(pollingLookup2.getCacheKey(), pollingLookup.getCacheKey())); + } + + private void assertMapLookup(Map map, LookupExtractor lookup) + { + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + String val = entry.getValue(); + Assert.assertEquals("non-null check", Strings.emptyToNull(val), lookup.apply(key)); + } + } +} diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/cache/loading/LoadingCacheTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/cache/loading/LoadingCacheTest.java new file mode 100644 index 000000000000..c9c1f8f7bbbf --- /dev/null +++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/cache/loading/LoadingCacheTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.cache.loading; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +@RunWith(Parameterized.class) +public class LoadingCacheTest +{ + private static final ImmutableMap IMMUTABLE_MAP = ImmutableMap.of("key", "value"); + + @Parameterized.Parameters + public static Collection inputData() + { + return Arrays.asList(new Object[][]{ + {new OnHeapLoadingCache<>(4, 1000, null, null, null)}, { + new OffHeapLoadingCache( + 0, + 0L, + 0L, + 0L + ) + } + }); + } + + private final LoadingCache loadingCache; + + public LoadingCacheTest(LoadingCache loadingCache) {this.loadingCache = loadingCache;} + + @Before + public void setUp() throws InterruptedException + { + Assert.assertFalse(loadingCache.isClosed()); + loadingCache.putAll(IMMUTABLE_MAP); + } + + @After + public void tearDown() + { + loadingCache.invalidateAll(); + } + + @Test + public void testGetIfPresent() + { + Assert.assertNull(loadingCache.getIfPresent("not there")); + Assert.assertEquals(IMMUTABLE_MAP.get("key"), loadingCache.getIfPresent("key")); + } + + @Test + public void testGetAllPresent() + { + Assert.assertEquals(IMMUTABLE_MAP, loadingCache.getAllPresent(IMMUTABLE_MAP.keySet())); + } + + @Test + public void testPut() throws InterruptedException, ExecutionException + { + loadingCache.get("key2", new Callable() + { + @Override + public Object call() throws Exception + { + return "value2"; + } + }); + Assert.assertEquals("value2", loadingCache.getIfPresent("key2")); + } + + @Test + public void testInvalidate() throws ExecutionException + { + loadingCache.get("key2", new Callable() + { + @Override + public Object call() throws Exception + { + return "value2"; + } + }); + Assert.assertEquals("value2", loadingCache.getIfPresent("key2")); + loadingCache.invalidate("key2"); + Assert.assertEquals(null, loadingCache.getIfPresent("key2")); + } + + @Test + public void testInvalidateAll() throws ExecutionException + { + loadingCache.get("key2", new Callable() + { + @Override + public Object call() throws Exception + { + return "value2"; + } + }); + Assert.assertEquals("value2", loadingCache.getIfPresent("key2")); + loadingCache.invalidateAll(Lists.newArrayList("key2")); + Assert.assertEquals(null, loadingCache.getIfPresent("key2")); + } + + @Test + public void testInvalidateAll1() throws ExecutionException + { + loadingCache.invalidateAll(); + loadingCache.get("key2", new Callable() + { + @Override + public Object call() throws Exception + { + return "value2"; + } + }); + Assert.assertEquals(loadingCache.getAllPresent(IMMUTABLE_MAP.keySet()), Collections.EMPTY_MAP); + } + + @Test + public void testGetStats() + { + Assert.assertTrue(loadingCache.getStats() != null && loadingCache.getStats() instanceof LookupCacheStats); + } + + @Test + public void testIsClosed() + { + Assert.assertFalse(loadingCache.isClosed()); + } + + @Test + public void testSerDeser() throws IOException + { + ObjectMapper mapper = new DefaultObjectMapper(); + Assert.assertEquals(loadingCache, mapper.reader(LoadingCache.class).readValue(mapper.writeValueAsString(loadingCache))); + Assert.assertTrue(loadingCache.hashCode() == mapper.reader(LoadingCache.class).readValue(mapper.writeValueAsString(loadingCache)).hashCode()); + } + +} diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/cache/loading/OffHeapLoadingCacheTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/cache/loading/OffHeapLoadingCacheTest.java new file mode 100644 index 000000000000..e6690691a158 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/cache/loading/OffHeapLoadingCacheTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.cache.loading; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class OffHeapLoadingCacheTest +{ + @Test + public void testClose() throws IOException + { + LoadingCache loadingCache = new OffHeapLoadingCache<>(1000L, 1000L, 0L, 0L); + loadingCache.close(); + Assert.assertTrue(loadingCache.isClosed()); + } + +} diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/cache/loading/OnHeapLoadingCacheTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/cache/loading/OnHeapLoadingCacheTest.java new file mode 100644 index 000000000000..d5d03af7ea88 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/cache/loading/OnHeapLoadingCacheTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.cache.loading; + +import org.junit.Assert; +import org.junit.Test; + +public class OnHeapLoadingCacheTest +{ + @Test + public void testIsClosed() + { + OnHeapLoadingCache onHeapLookupLoadingCache = new OnHeapLoadingCache(4, 15, 100L, 10L, 10L); + onHeapLookupLoadingCache.close(); + Assert.assertTrue(onHeapLookupLoadingCache.isClosed()); + } +} diff --git a/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/jdbc/JdbcDataFetcherTest.java b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/jdbc/JdbcDataFetcherTest.java new file mode 100644 index 000000000000..f0d009274212 --- /dev/null +++ b/extensions-core/lookups-cached-single/src/test/java/io/druid/server/lookup/jdbc/JdbcDataFetcherTest.java @@ -0,0 +1,183 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * / + */ + +package io.druid.server.lookup.jdbc; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.metadata.MetadataStorageConnectorConfig; +import io.druid.metadata.TestDerbyConnector; +import io.druid.server.lookup.DataFetcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.skife.jdbi.v2.Handle; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +public class JdbcDataFetcherTest +{ + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + Handle handle; + + private JdbcDataFetcher jdbcDataFetcher; + private final String tableName = "tableName"; + private final String keyColumn = "keyColumn"; + private final String valueColumn = "valueColumn"; + + + + private static final Map lookupMap = ImmutableMap.of( + "foo", "bar", + "bad", "bar", + "how about that", "foo", + "empty string", "" + ); + + @Before + public void setUp() throws InterruptedException + { + jdbcDataFetcher = new JdbcDataFetcher(derbyConnectorRule.getMetadataConnectorConfig(), "tableName", "keyColumn", "valueColumn", + 100); + + handle = derbyConnectorRule.getConnector().getDBI().open(); + Assert.assertEquals( + 0, + handle.createStatement( + String.format( + "CREATE TABLE %s (%s VARCHAR(64), %s VARCHAR(64))", + tableName, + keyColumn, + valueColumn + ) + ).setQueryTimeout(1).execute() + ); + handle.createStatement(String.format("TRUNCATE TABLE %s", tableName)).setQueryTimeout(1).execute(); + + for (Map.Entry entry : lookupMap.entrySet()) { + insertValues(entry.getKey(), entry.getValue(), handle); + } + handle.commit(); + } + + @After + public void tearDown() + { + handle.createStatement("DROP TABLE " + tableName).setQueryTimeout(1).execute(); + handle.close(); + } + + @Test + public void testFetch() throws InterruptedException + { + Assert.assertEquals("null check", null, jdbcDataFetcher.fetch("baz")); + assertMapLookup(lookupMap, jdbcDataFetcher); + } + + @Test + public void testFetchAll() + { + ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); + for (Map.Entry entry: jdbcDataFetcher.fetchAll() + ) { + mapBuilder.put(entry.getKey(), entry.getValue()); + } + Assert.assertEquals("maps should match", lookupMap, mapBuilder.build()); + } + + @Test + public void testFetchKeys() + { + ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); + for (Map.Entry entry: jdbcDataFetcher.fetch(lookupMap.keySet()) + ) { + mapBuilder.put(entry.getKey(), entry.getValue()); + } + + Assert.assertEquals(lookupMap, mapBuilder.build()); + } + + @Test + public void testReverseFetch() throws InterruptedException + { + Assert.assertEquals( + "reverse lookup should match", + Sets.newHashSet("foo", "bad"), + Sets.newHashSet(jdbcDataFetcher.reverseFetchKeys("bar")) + ); + Assert.assertEquals( + "reverse lookup should match", + Sets.newHashSet("how about that"), + Sets.newHashSet(jdbcDataFetcher.reverseFetchKeys("foo")) + ); + Assert.assertEquals( + "reverse lookup should match", + Sets.newHashSet("empty string"), + Sets.newHashSet(jdbcDataFetcher.reverseFetchKeys("")) + ); + Assert.assertEquals( + "reverse lookup of none existing value should be empty list", + Collections.EMPTY_LIST, + jdbcDataFetcher.reverseFetchKeys("does't exist") + ); + } + + @Test + public void testSerDesr() throws IOException + { + JdbcDataFetcher jdbcDataFetcher = new JdbcDataFetcher(new MetadataStorageConnectorConfig(), "table", "keyColumn", "ValueColumn", + 100); + DefaultObjectMapper mapper = new DefaultObjectMapper(); + String jdbcDataFetcherSer = mapper.writeValueAsString(jdbcDataFetcher); + Assert.assertEquals(jdbcDataFetcher, mapper.reader(DataFetcher.class).readValue(jdbcDataFetcherSer)); + } + + private void assertMapLookup(Map map, DataFetcher dataFetcher) + { + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + String val = entry.getValue(); + Assert.assertEquals("non-null check", val, dataFetcher.fetch(key)); + } + } + + private void insertValues(final String key, final String val, Handle handle) + { + final String query; + handle.createStatement( + String.format("DELETE FROM %s WHERE %s='%s'", tableName, keyColumn, key) + ).setQueryTimeout(1).execute(); + query = String.format( + "INSERT INTO %s (%s, %s) VALUES ('%s', '%s')", + tableName, + keyColumn, valueColumn, + key, val + ); + Assert.assertEquals(1, handle.createStatement(query).setQueryTimeout(1).execute()); + handle.commit(); + } +} diff --git a/pom.xml b/pom.xml index 077b3fa30de2..def17d8d2752 100644 --- a/pom.xml +++ b/pom.xml @@ -96,6 +96,7 @@ extensions-core/mysql-metadata-storage extensions-core/postgresql-metadata-storage extensions-core/lookups-cached-global + extensions-core/lookups-cached-single extensions-core/s3-extensions extensions-contrib/azure-extensions