diff --git a/distribution/pom.xml b/distribution/pom.xml
index 9d08b006fb59..310018f4a625 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -85,7 +85,7 @@
-cio.druid.extensions:mysql-metadata-storage-c
- io.druid.extensions:druid-namespace-lookup
+ io.druid.extensions:druid-lookups-cached-global-cio.druid.extensions:postgresql-metadata-storage-c
diff --git a/docs/content/development/extensions-core/kafka-extraction-namespace.md b/docs/content/development/extensions-core/kafka-extraction-namespace.md
index a9b4fe0547f3..f100e6de9886 100644
--- a/docs/content/development/extensions-core/kafka-extraction-namespace.md
+++ b/docs/content/development/extensions-core/kafka-extraction-namespace.md
@@ -8,7 +8,7 @@ layout: doc_page
Lookups are an experimental feature.
-Make sure to [include](../../operations/including-extensions.html) `druid-namespace-lookup` and `druid-kafka-extraction-namespace` as an extension.
+Make sure to [include](../../operations/including-extensions.html) `druid-lookups-cached-global` and `druid-kafka-extraction-namespace` as an extension.
If you need updates to populate as promptly as possible, it is possible to plug into a kafka topic whose key is the old value and message is the desired new value (both in UTF-8) as a LookupExtractorFactory.
diff --git a/docs/content/development/extensions-core/namespaced-lookup.md b/docs/content/development/extensions-core/lookups-cached-global.md
similarity index 74%
rename from docs/content/development/extensions-core/namespaced-lookup.md
rename to docs/content/development/extensions-core/lookups-cached-global.md
index e563621addbe..9f9eb7bb1580 100644
--- a/docs/content/development/extensions-core/namespaced-lookup.md
+++ b/docs/content/development/extensions-core/lookups-cached-global.md
@@ -8,19 +8,25 @@ layout: doc_page
Lookups are an experimental feature.
-Make sure to [include](../../operations/including-extensions.html) `druid-namespace-lookup` as an extension.
+Make sure to [include](../../operations/including-extensions.html) `druid-lookups-cached-global` as an extension.
## Configuration
+
+Static configuration is no longer supported. Only cluster wide configuration is supported
+
+
+Cached namespace lookups are appropriate for lookups which are not possible to pass at query time due to their size,
+or are not desired to be passed at query time because the data is to reside in and be handled by the Druid servers,
+and are small enough to reasonably populate on a node. This usually means tens to tens of thousands of entries per lookup.
-Namespaced lookups are appropriate for lookups which are not possible to pass at query time due to their size,
-or are not desired to be passed at query time because the data is to reside in and be handled by the Druid servers.
-Namespaced lookups can be specified as part of the runtime properties file. The property is a list of the namespaces
-described as per the sections on this page. For example:
+Cached namespace lookups all draw from the same cache pool, allowing each node to have a fixed cache pool that can be used by namespace lookups.
+
+Cached namespace lookups can be specified as part of the [cluster wide config for lookups](../../querying/lookups.html) as a type of `cachedNamespace`
```json
- druid.query.extraction.namespace.lookups=
- [
- {
+ {
+ "type": "cachedNamespace",
+ "extractionNamespace": {
"type": "uri",
"namespace": "some_uri_lookup",
"uri": "file:/tmp/prefix/",
@@ -33,7 +39,14 @@ described as per the sections on this page. For example:
},
"pollPeriod": "PT5M"
},
- {
+ "firstCacheTimeout": 0
+ }
+ ```
+
+ ```json
+{
+ "type": "cachedNamespace",
+ "extractionNamespace": {
"type": "jdbc",
"namespace": "some_jdbc_lookup",
"connectorConfig": {
@@ -46,12 +59,21 @@ described as per the sections on this page. For example:
"keyColumn": "mykeyColumn",
"valueColumn": "MyValueColumn",
"tsColumn": "timeColumn"
- }
- ]
+ },
+ "firstCacheTimeout": 120000,
+ "injective":true
+}
```
+The parameters are as follows
+|Property|Description|Required|Default|
+|--------|-----------|--------|-------|
+|`extractionNamespace`|Specifies how to populate the local cache. See below|Yes|-|
+|`firstCacheTimeout`|How long to wait (in ms) for the first run of the cache to populate. 0 indicates to not wait|No|`60000` (1 minute)|
+|`injective`|If the underlying map is injective (keys and values are unique) then optimizations can occur internally by setting this to `true`|No|`false`|
+
Proper functionality of Namespaced lookups requires the following extension to be loaded on the broker, peon, and historical nodes:
-`druid-namespace-lookup`
+`druid-lookups-cached-global`
## Cache Settings
@@ -60,11 +82,15 @@ setting namespaces (broker, peon, historical)
|Property|Description|Default|
|--------|-----------|-------|
-|`druid.query.extraction.namespace.cache.type`|Specifies the type of caching to be used by the namespaces. May be one of [`offHeap`, `onHeap`]. `offHeap` uses a temporary file for off-heap storage of the namespace (memory mapped files). `onHeap` stores all cache on the heap in standard java map types.|`onHeap`|
+|`druid.lookup.namespace.cache.type`|Specifies the type of caching to be used by the namespaces. May be one of [`offHeap`, `onHeap`]. `offHeap` uses a temporary file for off-heap storage of the namespace (memory mapped files). `onHeap` stores all cache on the heap in standard java map types.|`onHeap`|
The cache is populated in different ways depending on the settings below. In general, most namespaces employ
a `pollPeriod` at the end of which time they poll the remote resource of interest for updates.
+`onHeap` uses `ConcurrentMap`s in the java heap, and thus affects garbage collection and heap sizing.
+`offHeap` uses a 10MB on-heap buffer and MapDB using memory-mapped files in the java temporary directory.
+So if total `cachedNamespace` lookup size is in excess of 10MB, the extra will be kept in memory as page cache, and paged in and out by general OS tunings.
+
# Supported Lookups
For additional lookups, please see our [extensions list](../extensions.html).
@@ -76,27 +102,25 @@ The remapping values for each namespaced lookup can be specified by a json objec
```json
{
"type":"uri",
- "namespace":"some_lookup",
"uri": "s3://bucket/some/key/prefix/renames-0003.gz",
"namespaceParseSpec":{
"format":"csv",
"columns":["key","value"]
},
- "pollPeriod":"PT5M",
+ "pollPeriod":"PT5M"
}
```
```json
{
"type":"uri",
- "namespace":"some_lookup",
"uriPrefix": "s3://bucket/some/key/prefix/",
"fileRegex":"renames-[0-9]*\\.gz",
"namespaceParseSpec":{
"format":"csv",
"columns":["key","value"]
},
- "pollPeriod":"PT5M",
+ "pollPeriod":"PT5M"
}
```
|Property|Description|Required|Default|
@@ -250,3 +274,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
"pollPeriod":600000
}
```
+
+# Introspection
+
+Cached namespace lookups have introspection points at `/keys` and `/values` which return a complete set of the keys and values (respectively) in the lookup. Introspection to `/` returns the entire map. Introspection to `/version` returns the version indicator for the lookup.
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index aaeac3aa7db1..0b61fc3902f0 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -27,7 +27,7 @@ Core extensions are maintained by Druid committers.
|druid-histogram|Approximate histograms and quantiles aggregator.|[link](../development/extensions-core/approximate-histograms.html)|
|druid-kafka-eight|Kafka ingest firehose (high level consumer).|[link](../development/extensions-core/kafka-eight-firehose.html)|
|druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)|
-|druid-namespace-lookup|Required module for [lookups](../querying/lookups.html).|[link](../development/extensions-core/namespaced-lookup.html)|
+|druid-lookups-cached-global|Required module for [lookups](../querying/lookups.html).|[link](../development/extensions-core/lookups-cached-global.html)|
|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)|
|mysql-metadata-storage|MySQL metadata store.|[link](../development/extensions-core/mysql.html)|
|postgresql-metadata-storage|PostgreSQL metadata store.|[link](../development/extensions-core/postgresql.html)|
diff --git a/examples/conf/druid/_common/common.runtime.properties b/examples/conf/druid/_common/common.runtime.properties
index 19b005257369..641ef03c6151 100644
--- a/examples/conf/druid/_common/common.runtime.properties
+++ b/examples/conf/druid/_common/common.runtime.properties
@@ -23,7 +23,7 @@
# This is not the full list of Druid extensions, but common ones that people often use. You may need to change this list
# based on your particular setup.
-druid.extensions.loadList=["druid-kafka-eight", "druid-s3-extensions", "druid-histogram", "druid-datasketches", "druid-namespace-lookup", "mysql-metadata-storage"]
+druid.extensions.loadList=["druid-kafka-eight", "druid-s3-extensions", "druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "mysql-metadata-storage"]
# If you have a different version of Hadoop, place your Hadoop client jar files in your hadoop-dependencies directory
# and uncomment the line below to point to your directory.
diff --git a/extensions-core/kafka-extraction-namespace/pom.xml b/extensions-core/kafka-extraction-namespace/pom.xml
index 58bda817cc3e..81e7b74e2962 100644
--- a/extensions-core/kafka-extraction-namespace/pom.xml
+++ b/extensions-core/kafka-extraction-namespace/pom.xml
@@ -48,7 +48,7 @@
io.druid.extensions
- druid-namespace-lookup
+ druid-lookups-cached-global${project.parent.version}
diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java
index 286a8d0d538b..834efea0c97d 100644
--- a/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java
+++ b/extensions-core/kafka-extraction-namespace/src/main/java/io/druid/query/lookup/KafkaLookupExtractorFactory.java
@@ -37,7 +37,18 @@
import com.metamx.common.logger.Logger;
import io.druid.concurrent.Execs;
import io.druid.query.extraction.MapLookupExtractor;
-import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
+import io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.consumer.Whitelist;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.Decoder;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.Min;
+import javax.ws.rs.GET;
+import javax.ws.rs.core.Response;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
@@ -52,16 +63,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import javax.validation.constraints.Min;
-import javax.ws.rs.GET;
-import javax.ws.rs.core.Response;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.Whitelist;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import kafka.serializer.Decoder;
@JsonTypeName("kafka")
public class KafkaLookupExtractorFactory implements LookupExtractorFactory
@@ -95,7 +96,7 @@ public String fromBytes(byte[] bytes)
private final long connectTimeout;
@JsonProperty
- private final boolean isOneToOne;
+ private final boolean injective;
@JsonCreator
public KafkaLookupExtractorFactory(
@@ -103,7 +104,7 @@ public KafkaLookupExtractorFactory(
@JsonProperty("kafkaTopic") final String kafkaTopic,
@JsonProperty("kafkaProperties") final Map kafkaProperties,
@JsonProperty("connectTimeout") @Min(0) long connectTimeout,
- @JsonProperty("isOneToOne") boolean isOneToOne
+ @JsonProperty("injective") boolean injective
)
{
this.kafkaTopic = Preconditions.checkNotNull(kafkaTopic, "kafkaTopic required");
@@ -114,7 +115,7 @@ public KafkaLookupExtractorFactory(
));
this.cacheManager = cacheManager;
this.connectTimeout = connectTimeout;
- this.isOneToOne = isOneToOne;
+ this.injective = injective;
}
public KafkaLookupExtractorFactory(
@@ -141,9 +142,9 @@ public long getConnectTimeout()
return connectTimeout;
}
- public boolean isOneToOne()
+ public boolean isInjective()
{
- return isOneToOne;
+ return injective;
}
@Override
@@ -335,7 +336,7 @@ public boolean replaces(@Nullable LookupExtractorFactory other)
return !(getKafkaTopic().equals(that.getKafkaTopic())
&& getKafkaProperties().equals(that.getKafkaProperties())
&& getConnectTimeout() == that.getConnectTimeout()
- && isOneToOne() == that.isOneToOne()
+ && isInjective() == that.isInjective()
);
}
@@ -351,7 +352,7 @@ public LookupExtractor get()
{
final Map map = Preconditions.checkNotNull(mapRef.get(), "Not started");
final long startCount = doubleEventCount.get();
- return new MapLookupExtractor(map, isOneToOne())
+ return new MapLookupExtractor(map, isInjective())
{
@Override
public byte[] getCacheKey()
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
index a8a40d0deafc..d62d412fb041 100644
--- a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
+++ b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/KafkaLookupExtractorFactoryTest.java
@@ -28,7 +28,7 @@
import com.google.common.collect.ImmutableMap;
import com.metamx.common.StringUtils;
import io.druid.jackson.DefaultObjectMapper;
-import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
+import io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
@@ -72,7 +72,7 @@ public Object findInjectableValue(
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
)
{
- if ("io.druid.server.namespace.cache.NamespaceExtractionCacheManager".equals(valueId)) {
+ if ("io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager".equals(valueId)) {
return cacheManager;
} else {
return null;
@@ -507,6 +507,31 @@ public void testFailsGetNotStarted()
).get();
}
+ @Test
+ public void testSerDe() throws Exception
+ {
+ final NamespaceExtractionCacheManager cacheManager = EasyMock.createStrictMock(NamespaceExtractionCacheManager.class);
+ final String kafkaTopic = "some_topic";
+ final Map kafkaProperties = ImmutableMap.of("some_key", "some_value");
+ final long connectTimeout = 999;
+ final boolean injective = true;
+ final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
+ cacheManager,
+ kafkaTopic,
+ kafkaProperties,
+ connectTimeout,
+ injective
+ );
+ final KafkaLookupExtractorFactory otherFactory = mapper.readValue(
+ mapper.writeValueAsString(factory),
+ KafkaLookupExtractorFactory.class
+ );
+ Assert.assertEquals(kafkaTopic, otherFactory.getKafkaTopic());
+ Assert.assertEquals(kafkaProperties, otherFactory.getKafkaProperties());
+ Assert.assertEquals(connectTimeout, otherFactory.getConnectTimeout());
+ Assert.assertEquals(injective, otherFactory.isInjective());
+ }
+
@Test
public void testDefaultDecoder()
{
diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
index bcd1aebe9bb9..66c3e2307407 100644
--- a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
+++ b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java
@@ -32,7 +32,7 @@
import com.metamx.common.logger.Logger;
import io.druid.guice.GuiceInjectors;
import io.druid.initialization.Initialization;
-import io.druid.server.namespace.NamespacedExtractionModule;
+import io.druid.server.lookup.namespace.NamespaceExtractionModule;
import kafka.admin.AdminUtils;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
@@ -249,7 +249,7 @@ public void configure(Binder binder)
}
},
// These injections fail under IntelliJ but are required for maven
- new NamespacedExtractionModule(),
+ new NamespaceExtractionModule(),
new KafkaExtractionNamespaceModule()
)
);
diff --git a/extensions-core/namespace-lookup/pom.xml b/extensions-core/lookups-cached-global/pom.xml
similarity index 92%
rename from extensions-core/namespace-lookup/pom.xml
rename to extensions-core/lookups-cached-global/pom.xml
index 8b82d3220841..4225f7cd22be 100644
--- a/extensions-core/namespace-lookup/pom.xml
+++ b/extensions-core/lookups-cached-global/pom.xml
@@ -22,8 +22,8 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4.0.0io.druid.extensions
- druid-namespace-lookup
- druid-namespace-lookup
+ druid-lookups-cached-global
+ druid-lookups-cached-globalExtension to rename Druid dimension values using namespaces
@@ -77,5 +77,10 @@
3.0.1test
+
+ org.easymock
+ easymock
+ test
+
diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/data/input/MapPopulator.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/data/input/MapPopulator.java
similarity index 99%
rename from extensions-core/namespace-lookup/src/main/java/io/druid/data/input/MapPopulator.java
rename to extensions-core/lookups-cached-global/src/main/java/io/druid/data/input/MapPopulator.java
index 509d9016b016..971370c9f7bb 100644
--- a/extensions-core/namespace-lookup/src/main/java/io/druid/data/input/MapPopulator.java
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/data/input/MapPopulator.java
@@ -30,7 +30,7 @@
/**
* Simple class that takes a `ByteSource` and uses a `Parser` to populate a `Map`
* The `ByteSource` must be UTF-8 encoded
- *
+ *
* If this is handy for other use cases pleaes move this class into a common module
*/
public class MapPopulator
diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/NamespaceLookupExtractorFactory.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/NamespaceLookupExtractorFactory.java
new file mode 100644
index 000000000000..9dd55f1c1523
--- /dev/null
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/NamespaceLookupExtractorFactory.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.lookup;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.metamx.common.ISE;
+import com.metamx.common.StringUtils;
+import com.metamx.common.logger.Logger;
+import io.druid.common.utils.ServletResourceUtils;
+import io.druid.query.extraction.MapLookupExtractor;
+import io.druid.query.lookup.namespace.ExtractionNamespace;
+import io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+@JsonTypeName("cachedNamespace")
+public class NamespaceLookupExtractorFactory implements LookupExtractorFactory
+{
+ private static final Logger LOG = new Logger(NamespaceLookupExtractorFactory.class);
+
+ private static final long DEFAULT_SCHEDULE_TIMEOUT = 60_000;
+ private static final byte[] CLASS_CACHE_KEY;
+
+ static {
+ final byte[] keyUtf8 = StringUtils.toUtf8(NamespaceLookupExtractorFactory.class.getCanonicalName());
+ CLASS_CACHE_KEY = ByteBuffer.allocate(keyUtf8.length + 1).put(keyUtf8).put((byte) 0xFF).array();
+ }
+
+ private volatile boolean started = false;
+ private final ReadWriteLock startStopSync = new ReentrantReadWriteLock();
+ private final NamespaceExtractionCacheManager manager;
+ private final LookupIntrospectHandler lookupIntrospectHandler;
+ private final ExtractionNamespace extractionNamespace;
+ private final long firstCacheTimeout;
+ private final boolean injective;
+
+ private final String extractorID;
+
+ @JsonCreator
+ public NamespaceLookupExtractorFactory(
+ @JsonProperty("extractionNamespace") ExtractionNamespace extractionNamespace,
+ @JsonProperty("firstCacheTimeout") Long firstCacheTimeout,
+ @JsonProperty("injective") boolean injective,
+ @JacksonInject final NamespaceExtractionCacheManager manager
+ )
+ {
+ this.extractionNamespace = Preconditions.checkNotNull(
+ extractionNamespace,
+ "extractionNamespace should be specified"
+ );
+ this.firstCacheTimeout = firstCacheTimeout == null ? DEFAULT_SCHEDULE_TIMEOUT : firstCacheTimeout;
+ Preconditions.checkArgument(this.firstCacheTimeout >= 0);
+ this.injective = injective;
+ this.manager = manager;
+ this.extractorID = buildID();
+ this.lookupIntrospectHandler = new LookupIntrospectHandler()
+ {
+ @GET
+ @Path("/keys")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getKeys()
+ {
+ try {
+ return Response.ok(getLatest().keySet()).build();
+ }
+ catch (ISE e) {
+ return Response.status(Response.Status.NOT_FOUND).entity(ServletResourceUtils.sanitizeException(e)).build();
+ }
+ }
+
+ @GET
+ @Path("/values")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getValues()
+ {
+ try {
+ return Response.ok(getLatest().values()).build();
+ }
+ catch (ISE e) {
+ return Response.status(Response.Status.NOT_FOUND).entity(ServletResourceUtils.sanitizeException(e)).build();
+ }
+ }
+
+ @GET
+ @Path("/version")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getVersion()
+ {
+ final String version = manager.getVersion(extractorID);
+ if (null == version) {
+ // Handle race between delete and this method being called
+ return Response.status(Response.Status.NOT_FOUND).build();
+ } else {
+ return Response.ok(ImmutableMap.of("version", version)).build();
+ }
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getMap()
+ {
+ try {
+ return Response.ok(getLatest()).build();
+ }
+ catch (ISE e) {
+ return Response.status(Response.Status.NOT_FOUND).entity(ServletResourceUtils.sanitizeException(e)).build();
+ }
+ }
+
+ private Map getLatest()
+ {
+ return ((MapLookupExtractor) get()).getMap();
+ }
+ };
+ }
+
+ @VisibleForTesting
+ public NamespaceLookupExtractorFactory(
+ ExtractionNamespace extractionNamespace,
+ NamespaceExtractionCacheManager manager
+ )
+ {
+ this(extractionNamespace, null, false, manager);
+ }
+
+ @Override
+ public boolean start()
+ {
+ final Lock writeLock = startStopSync.writeLock();
+ writeLock.lock();
+ try {
+ if (started) {
+ LOG.warn("Already started! [%s]", extractorID);
+ return true;
+ }
+ if (!manager.scheduleAndWait(extractorID, extractionNamespace, firstCacheTimeout)) {
+ LOG.error("Failed to schedule lookup [%s]", extractorID);
+ return false;
+ }
+ LOG.debug("NamespaceLookupExtractorFactory[%s] started", extractorID);
+ started = true;
+ return true;
+ }
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean close()
+ {
+ final Lock writeLock = startStopSync.writeLock();
+ writeLock.lock();
+ try {
+ if (!started) {
+ LOG.warn("Not started! [%s]", extractorID);
+ return true;
+ }
+ started = false;
+ return manager.checkedDelete(extractorID);
+ }
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean replaces(@Nullable LookupExtractorFactory other)
+ {
+ if (other != null && other instanceof NamespaceLookupExtractorFactory) {
+ NamespaceLookupExtractorFactory that = (NamespaceLookupExtractorFactory) other;
+ if (isInjective() != ((NamespaceLookupExtractorFactory) other).isInjective()) {
+ return true;
+ }
+ if (getFirstCacheTimeout() != ((NamespaceLookupExtractorFactory) other).getFirstCacheTimeout()) {
+ return true;
+ }
+ return !extractionNamespace.equals(that.extractionNamespace);
+ }
+ return true;
+ }
+
+ @Override
+ public LookupIntrospectHandler getIntrospectHandler()
+ {
+ return lookupIntrospectHandler;
+ }
+
+ @JsonProperty
+ public ExtractionNamespace getExtractionNamespace()
+ {
+ return extractionNamespace;
+ }
+
+ @JsonProperty
+ public long getFirstCacheTimeout()
+ {
+ return firstCacheTimeout;
+ }
+
+ @JsonProperty
+ public boolean isInjective()
+ {
+ return injective;
+ }
+
+ private String buildID()
+ {
+ return UUID.randomUUID().toString();
+ }
+
+ // Grab the latest snapshot from the cache manager
+ @Override
+ public LookupExtractor get()
+ {
+ final Lock readLock = startStopSync.readLock();
+ readLock.lock();
+ try {
+ if (!started) {
+ throw new ISE("Factory [%s] not started", extractorID);
+ }
+ String preVersion = null, postVersion = null;
+ Map map = null;
+ // Make sure we absolutely know what version of map we grabbed (for caching purposes)
+ do {
+ preVersion = manager.getVersion(extractorID);
+ if (preVersion == null) {
+ throw new ISE("Namespace vanished for [%s]", extractorID);
+ }
+ map = manager.getCacheMap(extractorID);
+ postVersion = manager.getVersion(extractorID);
+ if (postVersion == null) {
+ // We lost some horrible race... make sure we clean up
+ manager.delete(extractorID);
+ throw new ISE("Lookup [%s] is deleting", extractorID);
+ }
+ } while (!preVersion.equals(postVersion));
+ final byte[] v = StringUtils.toUtf8(postVersion);
+ final byte[] id = StringUtils.toUtf8(extractorID);
+ return new MapLookupExtractor(map, isInjective())
+ {
+ @Override
+ public byte[] getCacheKey()
+ {
+ return ByteBuffer
+ .allocate(CLASS_CACHE_KEY.length + id.length + 1 + v.length + 1 + 1)
+ .put(CLASS_CACHE_KEY)
+ .put(id).put((byte) 0xFF)
+ .put(v).put((byte) 0xFF)
+ .put(isOneToOne() ? (byte) 1 : (byte) 0)
+ .array();
+ }
+ };
+ }
+ finally {
+ readLock.unlock();
+ }
+ }
+}
diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/ExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/ExtractionNamespace.java
similarity index 80%
rename from extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/ExtractionNamespace.java
rename to extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/ExtractionNamespace.java
index 83d72a485903..8eb26137295e 100644
--- a/extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/ExtractionNamespace.java
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/ExtractionNamespace.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package io.druid.query.extraction.namespace;
+package io.druid.query.lookup.namespace;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
@@ -25,7 +25,8 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "jdbc", value = JDBCExtractionNamespace.class),
- @JsonSubTypes.Type(name = "uri", value = URIExtractionNamespace.class)
+ @JsonSubTypes.Type(name = "uri", value = URIExtractionNamespace.class),
+ @JsonSubTypes.Type(name = StaticMapExtractionNamespace.TYPE_NAME, value = StaticMapExtractionNamespace.class)
})
/**
* The ExtractionNamespace is a simple object for extracting namespaceLookup values from a source of data.
@@ -34,11 +35,5 @@
*/
public interface ExtractionNamespace
{
- /**
- * This is expected to return the namespace name. As an additional requirement, the implementation MUST supply a
- * "namespace" field in the json representing the object which is equal to the return of this function
- * @return The name of the namespace
- */
- String getNamespace();
long getPollMs();
}
diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/ExtractionNamespaceFunctionFactory.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/ExtractionNamespaceCacheFactory.java
similarity index 62%
rename from extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/ExtractionNamespaceFunctionFactory.java
rename to extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/ExtractionNamespaceCacheFactory.java
index a45ae0b0cc5c..c23aace46eba 100644
--- a/extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/ExtractionNamespaceFunctionFactory.java
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/ExtractionNamespaceCacheFactory.java
@@ -17,40 +17,16 @@
* under the License.
*/
-package io.druid.query.extraction.namespace;
+package io.druid.query.lookup.namespace;
-import com.google.common.base.Function;
-
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
/**
*
*/
-public interface ExtractionNamespaceFunctionFactory
+public interface ExtractionNamespaceCacheFactory
{
-
- /**
- * Create a function for the given namespace which will do the manipulation requested in the extractionNamespace.
- * A simple implementation would simply use the cache supplied by the `NamespaceExtractionCacheManager`.
- * More advanced implementations may need more than just what can be cached by `NamespaceExtractionCacheManager`.
- *
- * @param extractionNamespace The ExtractionNamespace for which a manipulating function is needed.
- *
- * @return A function which will perform an extraction in accordance with the desires of the ExtractionNamespace
- */
- Function buildFn(T extractionNamespace, Map cache);
-
-
- /**
- * @param extractionNamespace The ExtractionNamespace for which a manipulating reverse function is needed.
- * @param cache view of the cache containing the function mapping.
- *
- * @return A function that will perform reverse lookup.
- */
- Function> buildReverseFn(T extractionNamespace, final Map cache);
-
/**
* This function is called once if `ExtractionNamespace.getUpdateMs() == 0`, or every update if
* `ExtractionNamespace.getUpdateMs() > 0`
@@ -60,6 +36,7 @@ public interface ExtractionNamespaceFunctionFactory getCachePopulator(T extractionNamespace, String lastVersion, Map swap);
+ Callable getCachePopulator(String id, T extractionNamespace, String lastVersion, Map swap);
}
diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/JDBCExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/JDBCExtractionNamespace.java
similarity index 87%
rename from extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/JDBCExtractionNamespace.java
rename to extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/JDBCExtractionNamespace.java
index b03bf987d49a..9fb9c3c5de8c 100644
--- a/extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/JDBCExtractionNamespace.java
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/JDBCExtractionNamespace.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package io.druid.query.extraction.namespace;
+package io.druid.query.lookup.namespace;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -47,14 +47,10 @@ public class JDBCExtractionNamespace implements ExtractionNamespace
@JsonProperty
private final String tsColumn;
@JsonProperty
- private final String namespace;
- @JsonProperty
private final Period pollPeriod;
@JsonCreator
public JDBCExtractionNamespace(
- @NotNull @JsonProperty(value = "namespace", required = true)
- final String namespace,
@NotNull @JsonProperty(value = "connectorConfig", required = true)
final MetadataStorageConnectorConfig connectorConfig,
@NotNull @JsonProperty(value = "table", required = true)
@@ -75,16 +71,9 @@ public JDBCExtractionNamespace(
this.keyColumn = Preconditions.checkNotNull(keyColumn, "keyColumn");
this.valueColumn = Preconditions.checkNotNull(valueColumn, "valueColumn");
this.tsColumn = tsColumn;
- this.namespace = Preconditions.checkNotNull(namespace, "namespace");
this.pollPeriod = pollPeriod == null ? new Period(0L) : pollPeriod;
}
- @Override
- public String getNamespace()
- {
- return namespace;
- }
-
public MetadataStorageConnectorConfig getConnectorConfig()
{
return connectorConfig;
@@ -120,8 +109,7 @@ public long getPollMs()
public String toString()
{
return String.format(
- "JDBCExtractionNamespace = { namespace = %s, connectorConfig = { %s }, table = %s, keyColumn = %s, valueColumn = %s, tsColumn = %s, pollPeriod = %s}",
- namespace,
+ "JDBCExtractionNamespace = { connectorConfig = { %s }, table = %s, keyColumn = %s, valueColumn = %s, tsColumn = %s, pollPeriod = %s}",
connectorConfig.toString(),
table,
keyColumn,
@@ -158,9 +146,6 @@ public boolean equals(Object o)
if (tsColumn != null ? !tsColumn.equals(that.tsColumn) : that.tsColumn != null) {
return false;
}
- if (!namespace.equals(that.namespace)) {
- return false;
- }
return pollPeriod.equals(that.pollPeriod);
}
@@ -173,7 +158,6 @@ public int hashCode()
result = 31 * result + keyColumn.hashCode();
result = 31 * result + valueColumn.hashCode();
result = 31 * result + (tsColumn != null ? tsColumn.hashCode() : 0);
- result = 31 * result + namespace.hashCode();
result = 31 * result + pollPeriod.hashCode();
return result;
}
diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/StaticMapExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/StaticMapExtractionNamespace.java
new file mode 100644
index 000000000000..ae05d5c931c3
--- /dev/null
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/StaticMapExtractionNamespace.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.lookup.namespace;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+import java.util.Map;
+
+/**
+ * This class is intended to be used in general cluster testing, and not as a serious lookup.
+ * Any desire to use a static map in a lookup in *general* should use `io.druid.query.extraction.MapLookupExtractor`
+ * Any desire to test the *caching mechanisms in this extension* can use this class.
+ */
+@JsonTypeName(StaticMapExtractionNamespace.TYPE_NAME)
+public class StaticMapExtractionNamespace implements ExtractionNamespace
+{
+ static final String TYPE_NAME = "staticMap";
+ private final Map map;
+
+ @JsonCreator
+ public StaticMapExtractionNamespace(
+ @JsonProperty("map") Map map
+ )
+ {
+ this.map = Preconditions.checkNotNull(map, "`map` required");
+ }
+
+ @JsonProperty
+ public Map getMap()
+ {
+ return map;
+ }
+
+ @Override
+ public long getPollMs()
+ {
+ // Load once and forget it
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ StaticMapExtractionNamespace that = (StaticMapExtractionNamespace) o;
+
+ return getMap().equals(that.getMap());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return getMap().hashCode();
+ }
+}
diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/URIExtractionNamespace.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java
similarity index 89%
rename from extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/URIExtractionNamespace.java
rename to extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java
index b8654b3a2cae..705c3ea614fe 100644
--- a/extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/URIExtractionNamespace.java
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/query/lookup/namespace/URIExtractionNamespace.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package io.druid.query.extraction.namespace;
+package io.druid.query.lookup.namespace;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -59,8 +59,6 @@
@JsonTypeName("uri")
public class URIExtractionNamespace implements ExtractionNamespace
{
- @JsonProperty
- private final String namespace;
@JsonProperty
private final URI uri;
@JsonProperty
@@ -74,8 +72,6 @@ public class URIExtractionNamespace implements ExtractionNamespace
@JsonCreator
public URIExtractionNamespace(
- @NotNull @JsonProperty(value = "namespace", required = true)
- String namespace,
@JsonProperty(value = "uri", required = false)
URI uri,
@JsonProperty(value = "uriPrefix", required = false)
@@ -91,7 +87,6 @@ public URIExtractionNamespace(
String versionRegex
)
{
- this.namespace = Preconditions.checkNotNull(namespace, "namespace");
this.uri = uri;
this.uriPrefix = uriPrefix;
if ((uri != null) == (uriPrefix != null)) {
@@ -118,12 +113,6 @@ public URIExtractionNamespace(
}
}
- @Override
- public String getNamespace()
- {
- return namespace;
- }
-
public String getFileRegex()
{
return fileRegex;
@@ -154,8 +143,7 @@ public long getPollMs()
public String toString()
{
return "URIExtractionNamespace{" +
- "namespace='" + namespace + '\'' +
- ", uri=" + uri +
+ "uri=" + uri +
", uriPrefix=" + uriPrefix +
", namespaceParseSpec=" + namespaceParseSpec +
", fileRegex='" + fileRegex + '\'' +
@@ -175,9 +163,6 @@ public boolean equals(Object o)
URIExtractionNamespace that = (URIExtractionNamespace) o;
- if (!getNamespace().equals(that.getNamespace())) {
- return false;
- }
if (getUri() != null ? !getUri().equals(that.getUri()) : that.getUri() != null) {
return false;
}
@@ -197,8 +182,7 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
- int result = getNamespace().hashCode();
- result = 31 * result + (getUri() != null ? getUri().hashCode() : 0);
+ int result = getUri() != null ? getUri().hashCode() : 0;
result = 31 * result + (getUriPrefix() != null ? getUriPrefix().hashCode() : 0);
result = 31 * result + getNamespaceParseSpec().hashCode();
result = 31 * result + (getFileRegex() != null ? getFileRegex().hashCode() : 0);
@@ -338,6 +322,28 @@ public Parser getParser()
return parser;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ CSVFlatDataParser that = (CSVFlatDataParser) o;
+
+ if (!getColumns().equals(that.getColumns())) {
+ return false;
+ }
+ if (!getKeyColumn().equals(that.getKeyColumn())) {
+ return false;
+ }
+
+ return getValueColumn().equals(that.getValueColumn());
+ }
+
@Override
public String toString()
{
@@ -439,6 +445,31 @@ public Parser getParser()
return parser;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TSVFlatDataParser that = (TSVFlatDataParser) o;
+
+ if (!getColumns().equals(that.getColumns())) {
+ return false;
+ }
+ if ((getDelimiter() == null) ? that.getDelimiter() == null : getDelimiter().equals(that.getDelimiter())) {
+ return false;
+ }
+ if (!getKeyColumn().equals(that.getKeyColumn())) {
+ return false;
+ }
+
+ return getValueColumn().equals(that.getValueColumn());
+ }
+
@Override
public String toString()
{
@@ -496,6 +527,25 @@ public Parser getParser()
return this.parser;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ JSONFlatDataParser that = (JSONFlatDataParser) o;
+
+ if (!getKeyFieldName().equals(that.getKeyFieldName())) {
+ return false;
+ }
+
+ return getValueFieldName().equals(that.getValueFieldName());
+ }
+
@Override
public String toString()
{
@@ -555,6 +605,19 @@ public Parser getParser()
return parser;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ return true;
+ }
+
@Override
public String toString()
{
diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/JDBCExtractionNamespaceFunctionFactory.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java
similarity index 71%
rename from extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/JDBCExtractionNamespaceFunctionFactory.java
rename to extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java
index ae2b50dc18e9..7a7ad0abba82 100644
--- a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/JDBCExtractionNamespaceFunctionFactory.java
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/JDBCExtractionNamespaceCacheFactory.java
@@ -17,18 +17,13 @@
* under the License.
*/
-package io.druid.server.namespace;
+package io.druid.server.lookup.namespace;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
-import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
-import io.druid.query.extraction.namespace.JDBCExtractionNamespace;
+import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory;
+import io.druid.query.lookup.namespace.JDBCExtractionNamespace;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
@@ -36,7 +31,6 @@
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.TimestampMapper;
-import javax.annotation.Nullable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
@@ -49,60 +43,22 @@
/**
*
*/
-public class JDBCExtractionNamespaceFunctionFactory
- implements ExtractionNamespaceFunctionFactory
+public class JDBCExtractionNamespaceCacheFactory
+ implements ExtractionNamespaceCacheFactory
{
- private static final Logger LOG = new Logger(JDBCExtractionNamespaceFunctionFactory.class);
+ private static final Logger LOG = new Logger(JDBCExtractionNamespaceCacheFactory.class);
private final ConcurrentMap dbiCache = new ConcurrentHashMap<>();
- @Override
- public Function buildFn(JDBCExtractionNamespace extractionNamespace, final Map cache)
- {
- return new Function()
- {
- @Nullable
- @Override
- public String apply(String input)
- {
- if (Strings.isNullOrEmpty(input)) {
- return null;
- }
- return Strings.emptyToNull(cache.get(input));
- }
- };
- }
-
- @Override
- public Function> buildReverseFn(
- JDBCExtractionNamespace extractionNamespace, final Map cache
- )
- {
- return new Function>()
- {
- @Nullable
- @Override
- public List apply(@Nullable final String value)
- {
- return Lists.newArrayList(Maps.filterKeys(cache, new Predicate()
- {
- @Override public boolean apply(@Nullable String key)
- {
- return cache.get(key).equals(Strings.nullToEmpty(value));
- }
- }).keySet());
- }
- };
- }
-
@Override
public Callable getCachePopulator(
+ final String id,
final JDBCExtractionNamespace namespace,
final String lastVersion,
final Map cache
)
{
final long lastCheck = lastVersion == null ? JodaUtils.MIN_INSTANT : Long.parseLong(lastVersion);
- final Long lastDBUpdate = lastUpdates(namespace);
+ final Long lastDBUpdate = lastUpdates(id, namespace);
if (lastDBUpdate != null && lastDBUpdate <= lastCheck) {
return new Callable()
{
@@ -118,12 +74,12 @@ public String call() throws Exception
@Override
public String call()
{
- final DBI dbi = ensureDBI(namespace);
+ final DBI dbi = ensureDBI(id, namespace);
final String table = namespace.getTable();
final String valueColumn = namespace.getValueColumn();
final String keyColumn = namespace.getKeyColumn();
- LOG.debug("Updating [%s]", namespace.getNamespace());
+ LOG.debug("Updating [%s]", id);
final List> pairs = dbi.withHandle(
new HandleCallback>>()
{
@@ -161,15 +117,15 @@ public Pair map(
for (Pair pair : pairs) {
cache.put(pair.lhs, pair.rhs);
}
- LOG.info("Finished loading %d values for namespace[%s]", cache.size(), namespace.getNamespace());
+ LOG.info("Finished loading %d values for namespace[%s]", cache.size(), id);
return String.format("%d", System.currentTimeMillis());
}
};
}
- private DBI ensureDBI(JDBCExtractionNamespace namespace)
+ private DBI ensureDBI(String id, JDBCExtractionNamespace namespace)
{
- final String key = namespace.getNamespace();
+ final String key = id;
DBI dbi = null;
if (dbiCache.containsKey(key)) {
dbi = dbiCache.get(key);
@@ -186,9 +142,9 @@ private DBI ensureDBI(JDBCExtractionNamespace namespace)
return dbi;
}
- private Long lastUpdates(JDBCExtractionNamespace namespace)
+ private Long lastUpdates(String id, JDBCExtractionNamespace namespace)
{
- final DBI dbi = ensureDBI(namespace);
+ final DBI dbi = ensureDBI(id, namespace);
final String table = namespace.getTable();
final String tsColumn = namespace.getTsColumn();
if (tsColumn == null) {
diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/NamespaceExtractionModule.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/NamespaceExtractionModule.java
new file mode 100644
index 000000000000..84dc64149984
--- /dev/null
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/NamespaceExtractionModule.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup.namespace;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+import com.google.inject.multibindings.MapBinder;
+import io.druid.guice.LazySingleton;
+import io.druid.guice.PolyBind;
+import io.druid.initialization.DruidModule;
+import io.druid.query.lookup.NamespaceLookupExtractorFactory;
+import io.druid.query.lookup.namespace.ExtractionNamespace;
+import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory;
+import io.druid.query.lookup.namespace.JDBCExtractionNamespace;
+import io.druid.query.lookup.namespace.StaticMapExtractionNamespace;
+import io.druid.query.lookup.namespace.URIExtractionNamespace;
+import io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
+import io.druid.server.lookup.namespace.cache.OffHeapNamespaceExtractionCacheManager;
+import io.druid.server.lookup.namespace.cache.OnHeapNamespaceExtractionCacheManager;
+
+import java.util.List;
+
+/**
+ *
+ */
+public class NamespaceExtractionModule implements DruidModule
+{
+ public static final String TYPE_PREFIX = "druid.lookup.namespace.cache.type";
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(
+ new SimpleModule("DruidNamespacedCachedExtractionModule")
+ .registerSubtypes(
+ NamespaceLookupExtractorFactory.class
+ )
+ );
+ }
+
+ public static MapBinder, ExtractionNamespaceCacheFactory>> getNamespaceFactoryMapBinder(
+ final Binder binder
+ )
+ {
+ return MapBinder.newMapBinder(
+ binder,
+ new TypeLiteral>()
+ {
+ },
+ new TypeLiteral>()
+ {
+ }
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ PolyBind.createChoiceWithDefault(
+ binder,
+ TYPE_PREFIX,
+ Key.get(NamespaceExtractionCacheManager.class),
+ Key.get(OnHeapNamespaceExtractionCacheManager.class),
+ "onHeap"
+ ).in(LazySingleton.class);
+
+ PolyBind
+ .optionBinder(binder, Key.get(NamespaceExtractionCacheManager.class))
+ .addBinding("offHeap")
+ .to(OffHeapNamespaceExtractionCacheManager.class)
+ .in(LazySingleton.class);
+
+ getNamespaceFactoryMapBinder(binder)
+ .addBinding(JDBCExtractionNamespace.class)
+ .to(JDBCExtractionNamespaceCacheFactory.class)
+ .in(LazySingleton.class);
+ getNamespaceFactoryMapBinder(binder)
+ .addBinding(URIExtractionNamespace.class)
+ .to(URIExtractionNamespaceCacheFactory.class)
+ .in(LazySingleton.class);
+ getNamespaceFactoryMapBinder(binder)
+ .addBinding(StaticMapExtractionNamespace.class)
+ .to(StaticMapExtractionNamespaceCacheFactory.class)
+ .in(LazySingleton.class);
+ }
+}
diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/StaticMapExtractionNamespaceCacheFactory.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/StaticMapExtractionNamespaceCacheFactory.java
new file mode 100644
index 000000000000..4b13e4d44aa8
--- /dev/null
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/StaticMapExtractionNamespaceCacheFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.lookup.namespace;
+
+import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory;
+import io.druid.query.lookup.namespace.StaticMapExtractionNamespace;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
+public class StaticMapExtractionNamespaceCacheFactory
+ implements ExtractionNamespaceCacheFactory
+{
+ private final String version = UUID.randomUUID().toString();
+
+ @Override
+ public Callable getCachePopulator(
+ final String id,
+ final StaticMapExtractionNamespace extractionNamespace,
+ final String lastVersion,
+ final Map swap
+ )
+ {
+ return new Callable()
+ {
+ @Override
+ public String call() throws Exception
+ {
+ if (version.equals(lastVersion)) {
+ return null;
+ } else {
+ swap.putAll(extractionNamespace.getMap());
+ return version;
+ }
+ }
+ };
+ }
+
+ String getVersion()
+ {
+ return version;
+ }
+}
diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactory.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/URIExtractionNamespaceCacheFactory.java
similarity index 78%
rename from extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactory.java
rename to extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/URIExtractionNamespaceCacheFactory.java
index 9b058544b278..11ba66002d47 100644
--- a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactory.java
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/URIExtractionNamespaceCacheFactory.java
@@ -17,14 +17,9 @@
* under the License.
*/
-package io.druid.server.namespace;
+package io.druid.server.lookup.namespace;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.base.Strings;
import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.metamx.common.CompressionUtils;
@@ -34,80 +29,40 @@
import io.druid.common.utils.JodaUtils;
import io.druid.data.SearchableVersionedDataFinder;
import io.druid.data.input.MapPopulator;
-import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
-import io.druid.query.extraction.namespace.URIExtractionNamespace;
+import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory;
+import io.druid.query.lookup.namespace.URIExtractionNamespace;
import io.druid.segment.loading.URIDataPuller;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
/**
*
*/
-public class URIExtractionNamespaceFunctionFactory implements ExtractionNamespaceFunctionFactory
+public class URIExtractionNamespaceCacheFactory implements ExtractionNamespaceCacheFactory
{
private static final int DEFAULT_NUM_RETRIES = 3;
- private static final Logger log = new Logger(URIExtractionNamespaceFunctionFactory.class);
+ private static final Logger log = new Logger(URIExtractionNamespaceCacheFactory.class);
private final Map pullers;
@Inject
- public URIExtractionNamespaceFunctionFactory(
+ public URIExtractionNamespaceCacheFactory(
Map pullers
)
{
this.pullers = pullers;
}
- @Override
- public Function buildFn(URIExtractionNamespace extractionNamespace, final Map cache)
- {
- return new Function()
- {
- @Nullable
- @Override
- public String apply(String input)
- {
- if (Strings.isNullOrEmpty(input)) {
- return null;
- }
- return Strings.emptyToNull(cache.get(input));
- }
- };
- }
-
- @Override
- public Function> buildReverseFn(
- URIExtractionNamespace extractionNamespace, final Map cache
- )
- {
- return new Function>()
- {
- @Nullable
- @Override
- public List apply(@Nullable final String value)
- {
- return Lists.newArrayList(Maps.filterKeys(cache, new Predicate()
- {
- @Override
- public boolean apply(@Nullable String key)
- {
- return cache.get(key).equals(Strings.nullToEmpty(value));
- }
- }).keySet());
- }
- };
- }
-
@Override
public Callable getCachePopulator(
+ final String id,
final URIExtractionNamespace extractionNamespace,
final String lastVersion,
final Map cache
@@ -184,7 +139,7 @@ public String call() throws Exception
log.debug(
"URI [%s] for namespace [%s] was las modified [%s] but was last cached [%s]. Skipping ",
uri.toString(),
- extractionNamespace.getNamespace(),
+ id,
fmt.print(lastModified),
fmt.print(lastCached)
);
@@ -223,7 +178,7 @@ public InputStream openStream() throws IOException
log.info(
"Finished loading %d lines for namespace [%s]",
lineCount,
- extractionNamespace.getNamespace()
+ id
);
return version;
}
diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java
similarity index 66%
rename from extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java
rename to extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java
index d07f384658fb..d9badf8a1c74 100644
--- a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManager.java
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/NamespaceExtractionCacheManager.java
@@ -17,14 +17,9 @@
* under the License.
*/
-package io.druid.server.namespace.cache;
+package io.druid.server.lookup.namespace.cache;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.google.common.base.Function;
import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -38,14 +33,11 @@
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
-import io.druid.query.extraction.namespace.ExtractionNamespace;
-import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
+import io.druid.query.lookup.namespace.ExtractionNamespace;
+import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory;
-import javax.annotation.Nullable;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
@@ -56,16 +48,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
/**
*
*/
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = OnHeapNamespaceExtractionCacheManager.class)
-@JsonSubTypes(value = {
- @JsonSubTypes.Type(name = "offHeap", value = OffHeapNamespaceExtractionCacheManager.class),
- @JsonSubTypes.Type(name = "onHeap", value = OnHeapNamespaceExtractionCacheManager.class)
-})
public abstract class NamespaceExtractionCacheManager
{
protected static class NamespaceImplData
@@ -85,27 +71,21 @@ public NamespaceImplData(
final ExtractionNamespace namespace;
final String name;
final AtomicBoolean enabled = new AtomicBoolean(false);
- final AtomicReference> fn = new AtomicReference<>(null);
- final AtomicReference>> reverseFn = new AtomicReference<>(null);
+ final CountDownLatch firstRun = new CountDownLatch(1);
}
private static final Logger log = new Logger(NamespaceExtractionCacheManager.class);
private final ListeningScheduledExecutorService listeningScheduledExecutorService;
- protected final ConcurrentMap> fnCache;
- protected final ConcurrentMap>> reverseFnCache;
protected final ConcurrentMap implData = new ConcurrentHashMap<>();
protected final AtomicLong tasksStarted = new AtomicLong(0);
- protected final AtomicLong dataSize = new AtomicLong(0);
protected final ServiceEmitter serviceEmitter;
private final ConcurrentHashMap lastVersion = new ConcurrentHashMap<>();
- private final Map, ExtractionNamespaceFunctionFactory>> namespaceFunctionFactoryMap;
+ private final Map, ExtractionNamespaceCacheFactory>> namespaceFunctionFactoryMap;
public NamespaceExtractionCacheManager(
Lifecycle lifecycle,
- final ConcurrentMap> fnCache,
- final ConcurrentMap>> reverseFnCache,
final ServiceEmitter serviceEmitter,
- final Map, ExtractionNamespaceFunctionFactory>> namespaceFunctionFactoryMap
+ final Map, ExtractionNamespaceCacheFactory>> namespaceFunctionFactoryMap
)
{
this.listeningScheduledExecutorService = MoreExecutors.listeningDecorator(
@@ -120,8 +100,6 @@ public NamespaceExtractionCacheManager(
);
ExecutorServices.manageLifecycle(lifecycle, listeningScheduledExecutorService);
this.serviceEmitter = serviceEmitter;
- this.fnCache = fnCache;
- this.reverseFnCache = reverseFnCache;
this.namespaceFunctionFactoryMap = namespaceFunctionFactoryMap;
listeningScheduledExecutorService.scheduleAtFixedRate(
new Runnable()
@@ -133,16 +111,16 @@ public void run()
{
try {
final long tasks = tasksStarted.get();
- serviceEmitter.emit(ServiceMetricEvent.builder().build("namespace/size", dataSize.get()));
- serviceEmitter.emit(ServiceMetricEvent.builder().build("namespace/count", fnCache.size()));
serviceEmitter.emit(
ServiceMetricEvent.builder()
.build("namespace/deltaTasksStarted", tasks - priorTasksStarted)
);
priorTasksStarted = tasks;
- }catch(Exception e){
+ monitor(serviceEmitter);
+ }
+ catch (Exception e) {
log.error(e, "Error emitting namespace stats");
- if(Thread.currentThread().isInterrupted()){
+ if (Thread.currentThread().isInterrupted()) {
throw Throwables.propagate(e);
}
}
@@ -153,6 +131,16 @@ public void run()
);
}
+ /**
+ * Optional monitoring for overriding classes. `super.monitor` does *NOT* need to be called by overriding methods
+ *
+ * @param serviceEmitter The emitter to emit to
+ */
+ protected void monitor(ServiceEmitter serviceEmitter)
+ {
+ // Noop by default
+ }
+
protected boolean waitForServiceToEnd(long time, TimeUnit unit) throws InterruptedException
{
return listeningScheduledExecutorService.awaitTermination(time, unit);
@@ -160,8 +148,9 @@ protected boolean waitForServiceToEnd(long time, TimeUnit unit) throws Interrupt
protected Runnable getPostRunnable(
+ final String id,
final T namespace,
- final ExtractionNamespaceFunctionFactory factory,
+ final ExtractionNamespaceCacheFactory factory,
final String cacheId
)
{
@@ -170,93 +159,101 @@ protected Runnable getPostRunnable(
@Override
public void run()
{
- final String nsName = namespace.getNamespace();
- final NamespaceImplData namespaceDatum = implData.get(nsName);
+ final NamespaceImplData namespaceDatum = implData.get(id);
if (namespaceDatum == null) {
// was removed
return;
}
synchronized (namespaceDatum.enabled) {
- if (!namespaceDatum.enabled.get()) {
- // skip because it was disabled
- return;
- }
- swapAndClearCache(nsName, cacheId);
- final Function fn = factory.buildFn(namespace, getCacheMap(nsName));
- final Function> reverseFn = factory.buildReverseFn(namespace, getCacheMap(nsName));
- final Function priorFn = fnCache.put(nsName, fn);
- final Function> priorReverseFn = reverseFnCache.put(nsName, reverseFn);
- if (priorFn != null && priorFn != namespaceDatum.fn.get()) {
- log.warn("Replaced prior function for namespace [%s]", nsName);
+ try {
+ if (!namespaceDatum.enabled.get()) {
+ // skip because it was disabled
+ return;
+ }
+ swapAndClearCache(id, cacheId);
}
- if (priorReverseFn != null && priorReverseFn != namespaceDatum.reverseFn.get()) {
- log.warn("Replaced prior reverse function for namespace [%s]", nsName);
+ finally {
+ namespaceDatum.firstRun.countDown();
}
- namespaceDatum.fn.set(fn);
- namespaceDatum.reverseFn.set(reverseFn);
}
}
};
}
- public void scheduleOrUpdate(
- final Collection namespaces
+ // return value means actually delete or not
+ public boolean checkedDelete(
+ String namespaceName
)
{
- Set differentNamespaces = Sets.difference(
- implData.keySet(), Sets.newHashSet(
- Iterables.transform(
- namespaces,
- new Function()
- {
- @Nullable
- @Override
- public String apply(
- ExtractionNamespace input
- )
- {
- return input.getNamespace();
- }
- }
- )
- )
- );
+ final NamespaceImplData implDatum = implData.get(namespaceName);
+ if (implDatum == null) {
+ // Delete but we don't have it?
+ log.wtf("Asked to delete something I just lost [%s]", namespaceName);
+ return false;
+ }
+ return delete(namespaceName);
+ }
+
+ // return value means actually schedule or not
+ public boolean scheduleOrUpdate(
+ final String id,
+ ExtractionNamespace namespace
+ )
+ {
+ final NamespaceImplData implDatum = implData.get(id);
+ if (implDatum == null) {
+ // New, probably
+ schedule(id, namespace);
+ return true;
+ }
+ if (!implDatum.enabled.get()) {
+ // Race condition. Someone else disabled it first, go ahead and reschedule
+ schedule(id, namespace);
+ return true;
+ }
+
+ // Live one. Check if it needs updated
+ if (implDatum.namespace.equals(namespace)) {
+ // skip if no update
+ return false;
+ }
if (log.isDebugEnabled()) {
- log.debug("Deleting %d namespaces: %s", differentNamespaces.size(), differentNamespaces);
+ log.debug("Namespace [%s] needs updated to [%s]", implDatum.namespace, namespace);
}
- for (String namespaceName : differentNamespaces) {
- final NamespaceImplData implDatum = implData.get(namespaceName);
- if (implDatum == null) {
- // Delete but we don't have it?
- log.wtf("Asked to delete something I just lost [%s]", namespaceName);
- continue;
- }
- delete(namespaceName);
+ removeNamespaceLocalMetadata(implDatum);
+ schedule(id, namespace);
+ return true;
+ }
+
+ public boolean scheduleAndWait(
+ final String id,
+ ExtractionNamespace namespace,
+ long waitForFirstRun
+ )
+ {
+ if (scheduleOrUpdate(id, namespace)) {
+ log.debug("Scheduled new namespace [%s]: %s", id, namespace);
+ } else {
+ log.debug("Namespace [%s] already running: %s", id, namespace);
}
- for (final ExtractionNamespace namespace : namespaces) {
- final NamespaceImplData implDatum = implData.get(namespace.getNamespace());
- if (implDatum == null) {
- // New, probably
- schedule(namespace);
- continue;
- }
- if (!implDatum.enabled.get()) {
- // Race condition. Someone else disabled it first, go ahead and reschedule
- schedule(namespace);
- continue;
- }
- // Live one. Check if it needs updated
- if (implDatum.namespace.equals(namespace)) {
- // skip if no update
- continue;
- }
- if (log.isDebugEnabled()) {
- log.debug("Namespace [%s] needs updated to [%s]", implDatum.namespace, namespace);
- }
- removeNamespaceLocalMetadata(implDatum);
- schedule(namespace);
+ final NamespaceImplData namespaceImplData = implData.get(id);
+ if (namespaceImplData == null) {
+ log.warn("NamespaceLookupExtractorFactory[%s] - deleted during start", id);
+ return false;
}
+
+ boolean success = false;
+ try {
+ success = namespaceImplData.firstRun.await(waitForFirstRun, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ log.error(e, "NamespaceLookupExtractorFactory[%s] - interrupted during start", id);
+ }
+ if (!success) {
+ delete(id);
+ }
+ return success;
}
private void cancelFuture(final NamespaceImplData implDatum)
@@ -315,28 +312,28 @@ private boolean removeNamespaceLocalMetadata(final NamespaceImplData implDatum)
}
// Optimistic scheduling of updates to a namespace.
- public ListenableFuture> schedule(final T namespace)
+ public ListenableFuture> schedule(final String id, final T namespace)
{
- final ExtractionNamespaceFunctionFactory factory = (ExtractionNamespaceFunctionFactory)
+ final ExtractionNamespaceCacheFactory factory = (ExtractionNamespaceCacheFactory)
namespaceFunctionFactoryMap.get(namespace.getClass());
if (factory == null) {
throw new ISE("Cannot find factory for namespace [%s]", namespace);
}
final String cacheId = UUID.randomUUID().toString();
- return schedule(namespace, factory, getPostRunnable(namespace, factory, cacheId), cacheId);
+ return schedule(id, namespace, factory, getPostRunnable(id, namespace, factory, cacheId), cacheId);
}
// For testing purposes this is protected
protected ListenableFuture> schedule(
+ final String id,
final T namespace,
- final ExtractionNamespaceFunctionFactory factory,
+ final ExtractionNamespaceCacheFactory factory,
final Runnable postRunnable,
final String cacheId
)
{
- final String namespaceName = namespace.getNamespace();
- log.debug("Trying to update namespace [%s]", namespaceName);
- final NamespaceImplData implDatum = implData.get(namespaceName);
+ log.debug("Trying to update namespace [%s]", id);
+ final NamespaceImplData implDatum = implData.get(id);
if (implDatum != null) {
synchronized (implDatum.enabled) {
if (implDatum.enabled.get()) {
@@ -357,8 +354,8 @@ public void run()
startLatch.await(); // wait for "election" to leadership or cancellation
if (!Thread.currentThread().isInterrupted()) {
final Map cache = getCacheMap(cacheId);
- final String preVersion = lastVersion.get(namespaceName);
- final Callable runnable = factory.getCachePopulator(namespace, preVersion, cache);
+ final String preVersion = lastVersion.get(id);
+ final Callable runnable = factory.getCachePopulator(id, namespace, preVersion, cache);
tasksStarted.incrementAndGet();
final String newVersion = runnable.call();
@@ -366,20 +363,20 @@ public void run()
throw new CancellationException(String.format("Version `%s` already exists", preVersion));
}
if (newVersion != null) {
- lastVersion.put(namespaceName, newVersion);
+ lastVersion.put(id, newVersion);
}
postRunnable.run();
- log.debug("Namespace [%s] successfully updated", namespaceName);
+ log.debug("Namespace [%s] successfully updated", id);
}
}
catch (Throwable t) {
delete(cacheId);
if (t instanceof CancellationException) {
- log.debug(t, "Namespace [%s] cancelled", namespaceName);
+ log.debug(t, "Namespace [%s] cancelled", id);
} else {
log.error(t, "Failed update namespace [%s]", namespace);
}
- if(Thread.currentThread().isInterrupted()) {
+ if (Thread.currentThread().isInterrupted()) {
throw Throwables.propagate(t);
}
}
@@ -394,18 +391,18 @@ public void run()
future = listeningScheduledExecutorService.schedule(command, 0, TimeUnit.MILLISECONDS);
}
- final NamespaceImplData me = new NamespaceImplData(future, namespace, namespaceName);
- final NamespaceImplData other = implData.putIfAbsent(namespaceName, me);
+ final NamespaceImplData me = new NamespaceImplData(future, namespace, id);
+ final NamespaceImplData other = implData.putIfAbsent(id, me);
if (other != null) {
if (!future.isDone() && !future.cancel(true)) {
- log.warn("Unable to cancel future for namespace[%s] on race loss", namespaceName);
+ log.warn("Unable to cancel future for namespace[%s] on race loss", id);
}
throw new IAE("Namespace [%s] already exists! Leaving prior running", namespace);
} else {
if (!me.enabled.compareAndSet(false, true)) {
log.wtf("How did someone enable this before ME?");
}
- log.debug("I own namespace [%s]", namespaceName);
+ log.debug("I own namespace [%s]", id);
return future;
}
}
@@ -452,7 +449,6 @@ public boolean delete(final String ns)
if (deleted) {
log.info("Deleting namespace [%s]", ns);
lastVersion.remove(implDatum.name);
- fnCache.remove(implDatum.name);
return true;
} else {
log.debug("Did not delete namespace [%s]", ns);
@@ -469,8 +465,8 @@ public String getVersion(String namespace)
}
}
- public Collection getKnownNamespaces()
+ public Collection getKnownIDs()
{
- return fnCache.keySet();
+ return implData.keySet();
}
}
diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OffHeapNamespaceExtractionCacheManager.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
similarity index 73%
rename from extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
rename to extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
index bbe4d826232b..f33f25949f9c 100644
--- a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/OffHeapNamespaceExtractionCacheManager.java
@@ -17,25 +17,23 @@
* under the License.
*/
-package io.druid.server.namespace.cache;
+package io.druid.server.lookup.namespace.cache;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Striped;
import com.google.inject.Inject;
-import com.google.inject.name.Named;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
-import io.druid.query.extraction.namespace.ExtractionNamespace;
-import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
+import com.metamx.emitter.service.ServiceMetricEvent;
+import io.druid.query.lookup.namespace.ExtractionNamespace;
+import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import java.io.File;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -56,15 +54,11 @@ public class OffHeapNamespaceExtractionCacheManager extends NamespaceExtractionC
@Inject
public OffHeapNamespaceExtractionCacheManager(
Lifecycle lifecycle,
- @Named("namespaceExtractionFunctionCache")
- ConcurrentMap> fnCache,
- @Named("namespaceReverseExtractionFunctionCache")
- ConcurrentMap>> reverseFnCache,
ServiceEmitter emitter,
- final Map, ExtractionNamespaceFunctionFactory>> namespaceFunctionFactoryMap
+ final Map, ExtractionNamespaceCacheFactory>> namespaceFunctionFactoryMap
)
{
- super(lifecycle, fnCache, reverseFnCache, emitter, namespaceFunctionFactoryMap);
+ super(lifecycle, emitter, namespaceFunctionFactoryMap);
try {
tmpFile = File.createTempFile("druidMapDB", getClass().getCanonicalName());
log.info("Using file [%s] for mapDB off heap namespace cache", tmpFile.getAbsolutePath());
@@ -83,27 +77,32 @@ public OffHeapNamespaceExtractionCacheManager(
.commitFileSyncDisable()
.cacheSize(10_000_000)
.make();
- lifecycle.addHandler(
- new Lifecycle.Handler()
- {
- @Override
- public void start() throws Exception
+ try {
+ lifecycle.addMaybeStartHandler(
+ new Lifecycle.Handler()
{
- // NOOP
- }
+ @Override
+ public void start() throws Exception
+ {
+ // NOOP
+ }
- @Override
- public void stop()
- {
- if (!mmapDB.isClosed()) {
- mmapDB.close();
- if (!tmpFile.delete()) {
- log.warn("Unable to delete file at [%s]", tmpFile.getAbsolutePath());
+ @Override
+ public synchronized void stop()
+ {
+ if (!mmapDB.isClosed()) {
+ mmapDB.close();
+ if (!tmpFile.delete()) {
+ log.warn("Unable to delete file at [%s]", tmpFile.getAbsolutePath());
+ }
}
}
}
- }
- );
+ );
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
}
@Override
@@ -121,10 +120,8 @@ protected boolean swapAndClearCache(String namespaceKey, String cacheKey)
if (priorCache != null) {
// TODO: resolve what happens here if query is actively going on
mmapDB.delete(priorCache);
- dataSize.set(tmpFile.length());
return true;
} else {
- dataSize.set(tmpFile.length());
return false;
}
}
@@ -136,18 +133,19 @@ protected boolean swapAndClearCache(String namespaceKey, String cacheKey)
@Override
public boolean delete(final String namespaceKey)
{
-
final Lock lock = nsLocks.get(namespaceKey);
lock.lock();
try {
- super.delete(namespaceKey);
- final String mmapDBkey = currentNamespaceCache.get(namespaceKey);
- if (mmapDBkey != null) {
- final long pre = tmpFile.length();
- mmapDB.delete(mmapDBkey);
- dataSize.set(tmpFile.length());
- log.debug("MapDB file size: pre %d post %d", pre, dataSize.get());
- return true;
+ if (super.delete(namespaceKey)) {
+ final String mmapDBkey = currentNamespaceCache.get(namespaceKey);
+ if (mmapDBkey != null) {
+ final long pre = tmpFile.length();
+ mmapDB.delete(mmapDBkey);
+ log.debug("MapDB file size: pre %d post %d", pre, tmpFile.length());
+ return true;
+ } else {
+ return false;
+ }
} else {
return false;
}
@@ -184,4 +182,10 @@ public ConcurrentMap getCacheMap(String namespaceOrCacheKey)
lock.unlock();
}
}
+
+ @Override
+ protected void monitor(ServiceEmitter serviceEmitter)
+ {
+ serviceEmitter.emit(ServiceMetricEvent.builder().build("namespace/cache/diskSize", tmpFile.length()));
+ }
}
diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
similarity index 63%
rename from extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
rename to extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
index 2d55f6489835..ef78312bb82b 100644
--- a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
+++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/cache/OnHeapNamespaceExtractionCacheManager.java
@@ -17,19 +17,19 @@
* under the License.
*/
-package io.druid.server.namespace.cache;
+package io.druid.server.lookup.namespace.cache;
-import com.google.common.base.Function;
+import com.google.common.primitives.Chars;
import com.google.common.util.concurrent.Striped;
import com.google.inject.Inject;
-import com.google.inject.name.Named;
import com.metamx.common.IAE;
import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
-import io.druid.query.extraction.namespace.ExtractionNamespace;
-import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
+import com.metamx.emitter.service.ServiceMetricEvent;
+import io.druid.query.lookup.namespace.ExtractionNamespace;
+import io.druid.query.lookup.namespace.ExtractionNamespaceCacheFactory;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -40,21 +40,18 @@
*/
public class OnHeapNamespaceExtractionCacheManager extends NamespaceExtractionCacheManager
{
+ private static final Logger LOG = new Logger(OnHeapNamespaceExtractionCacheManager.class);
private final ConcurrentMap> mapMap = new ConcurrentHashMap<>();
private final Striped nsLocks = Striped.lock(32);
@Inject
public OnHeapNamespaceExtractionCacheManager(
final Lifecycle lifecycle,
- @Named("namespaceExtractionFunctionCache")
- final ConcurrentMap> fnCache,
- @Named("namespaceReverseExtractionFunctionCache")
- final ConcurrentMap>> reverseFnCache,
final ServiceEmitter emitter,
- final Map, ExtractionNamespaceFunctionFactory>> namespaceFunctionFactoryMap
+ final Map, ExtractionNamespaceCacheFactory>> namespaceFunctionFactoryMap
)
{
- super(lifecycle, fnCache, reverseFnCache, emitter, namespaceFunctionFactoryMap);
+ super(lifecycle, emitter, namespaceFunctionFactoryMap);
}
@Override
@@ -67,11 +64,9 @@ protected boolean swapAndClearCache(String namespaceKey, String cacheKey)
if (cacheMap == null) {
throw new IAE("Extraction Cache [%s] does not exist", cacheKey);
}
- dataSize.addAndGet(cacheMap.size());
ConcurrentMap prior = mapMap.put(namespaceKey, cacheMap);
mapMap.remove(cacheKey);
if (prior != null) {
- dataSize.addAndGet(-prior.size());
// Old map will get GC'd when it is not used anymore
return true;
} else {
@@ -100,11 +95,36 @@ public boolean delete(final String namespaceKey)
final Lock lock = nsLocks.get(namespaceKey);
lock.lock();
try {
- super.delete(namespaceKey);
- return mapMap.remove(namespaceKey) != null;
+ return super.delete(namespaceKey) && mapMap.remove(namespaceKey) != null;
}
finally {
lock.unlock();
}
}
+
+ @Override
+ protected void monitor(ServiceEmitter serviceEmitter)
+ {
+ long numEntries = 0;
+ long size = 0;
+ for (Map.Entry> entry : mapMap.entrySet()) {
+ final ConcurrentMap map = entry.getValue();
+ if (map == null) {
+ LOG.debug("missing cache key for reporting [%s]", entry.getKey());
+ continue;
+ }
+ numEntries += map.size();
+ for (Map.Entry sEntry : map.entrySet()) {
+ final String key = sEntry.getKey();
+ final String value = sEntry.getValue();
+ if (key == null || value == null) {
+ LOG.debug("Missing entries for cache key [%s]", entry.getKey());
+ continue;
+ }
+ size += key.length() + value.length();
+ }
+ }
+ serviceEmitter.emit(ServiceMetricEvent.builder().build("namespace/cache/numEntries", numEntries));
+ serviceEmitter.emit(ServiceMetricEvent.builder().build("namespace/cache/heapSizeInBytes", size * Chars.BYTES));
+ }
}
diff --git a/extensions-core/namespace-lookup/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/lookups-cached-global/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
similarity index 93%
rename from extensions-core/namespace-lookup/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
rename to extensions-core/lookups-cached-global/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
index 6466ed7e1535..0903ef4dee6d 100644
--- a/extensions-core/namespace-lookup/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
+++ b/extensions-core/lookups-cached-global/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -17,4 +17,4 @@
# under the License.
#
-io.druid.server.namespace.NamespacedExtractionModule
+io.druid.server.lookup.namespace.NamespaceExtractionModule
diff --git a/extensions-core/lookups-cached-global/src/test/java/io/druid/query/lookup/NamespaceLookupExtractorFactoryTest.java b/extensions-core/lookups-cached-global/src/test/java/io/druid/query/lookup/NamespaceLookupExtractorFactoryTest.java
new file mode 100644
index 000000000000..0bb9984ee91b
--- /dev/null
+++ b/extensions-core/lookups-cached-global/src/test/java/io/druid/query/lookup/NamespaceLookupExtractorFactoryTest.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.lookup;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.BeanProperty;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Binder;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import com.metamx.common.ISE;
+import io.druid.guice.GuiceInjectors;
+import io.druid.guice.JsonConfigProvider;
+import io.druid.guice.annotations.Json;
+import io.druid.guice.annotations.Self;
+import io.druid.initialization.Initialization;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.query.lookup.namespace.ExtractionNamespace;
+import io.druid.query.lookup.namespace.URIExtractionNamespace;
+import io.druid.server.DruidNode;
+import io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
+import org.easymock.EasyMock;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import javax.ws.rs.core.Response;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class NamespaceLookupExtractorFactoryTest
+{
+ private final ObjectMapper mapper = new DefaultObjectMapper();
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private final NamespaceExtractionCacheManager cacheManager = EasyMock.createStrictMock(NamespaceExtractionCacheManager.class);
+
+ @Before
+ public void setUp()
+ {
+ mapper.setInjectableValues(
+ new InjectableValues()
+ {
+ @Override
+ public Object findInjectableValue(
+ Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
+ )
+ {
+ if ("io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager".equals(valueId)) {
+ return cacheManager;
+ }
+ return null;
+ }
+ }
+ );
+ }
+
+ @Test
+ public void testSimpleSerde() throws Exception
+ {
+ final URIExtractionNamespace uriExtractionNamespace = new URIExtractionNamespace(
+ temporaryFolder.newFolder().toURI(),
+ null, null,
+ new URIExtractionNamespace.ObjectMapperFlatDataParser(mapper),
+
+ Period.millis(0),
+ null
+ );
+ final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
+ uriExtractionNamespace,
+ cacheManager
+ );
+ Assert.assertEquals(
+ uriExtractionNamespace,
+ mapper.readValue(
+ mapper.writeValueAsString(namespaceLookupExtractorFactory),
+ NamespaceLookupExtractorFactory.class
+ ).getExtractionNamespace()
+ );
+ }
+
+ @Test
+ public void testMissingSpec()
+ {
+ expectedException.expectMessage("extractionNamespace should be specified");
+ new NamespaceLookupExtractorFactory(null, null);
+ }
+
+ @Test
+ public void testSimpleStartStop()
+ {
+ final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
+ {
+ @Override
+ public long getPollMs()
+ {
+ return 0;
+ }
+ };
+ EasyMock.expect(cacheManager.scheduleAndWait(
+ EasyMock.anyString(),
+ EasyMock.eq(extractionNamespace),
+ EasyMock.eq(60000L)
+ )).andReturn(true).once();
+ EasyMock.expect(
+ cacheManager.checkedDelete(EasyMock.anyString())
+ ).andReturn(true).once();
+ EasyMock.replay(cacheManager);
+
+ final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
+ extractionNamespace,
+ cacheManager
+ );
+ Assert.assertTrue(namespaceLookupExtractorFactory.start());
+ Assert.assertTrue(namespaceLookupExtractorFactory.close());
+ EasyMock.verify(cacheManager);
+ }
+
+ @Test
+ public void testSimpleStartStopStop()
+ {
+ final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
+ {
+ @Override
+ public long getPollMs()
+ {
+ return 0;
+ }
+ };
+ EasyMock.expect(cacheManager.scheduleAndWait(
+ EasyMock.anyString(),
+ EasyMock.eq(extractionNamespace),
+ EasyMock.eq(60000L)
+ )).andReturn(true).once();
+ EasyMock.expect(
+ cacheManager.checkedDelete(EasyMock.anyString())
+ ).andReturn(true).once();
+ EasyMock.replay(cacheManager);
+
+ final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
+ extractionNamespace,
+ cacheManager
+ );
+ Assert.assertTrue(namespaceLookupExtractorFactory.start());
+ Assert.assertTrue(namespaceLookupExtractorFactory.close());
+ Assert.assertTrue(namespaceLookupExtractorFactory.close());
+ EasyMock.verify(cacheManager);
+ }
+
+ @Test
+ public void testSimpleStartStart()
+ {
+ final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
+ {
+ @Override
+ public long getPollMs()
+ {
+ return 0;
+ }
+ };
+ EasyMock.expect(cacheManager.scheduleAndWait(
+ EasyMock.anyString(),
+ EasyMock.eq(extractionNamespace),
+ EasyMock.eq(60000L)
+ )).andReturn(true).once();
+ EasyMock.replay(cacheManager);
+
+ final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
+ extractionNamespace,
+ cacheManager
+ );
+ Assert.assertTrue(namespaceLookupExtractorFactory.start());
+ Assert.assertTrue(namespaceLookupExtractorFactory.start());
+ EasyMock.verify(cacheManager);
+ }
+
+
+ @Test
+ public void testSimpleStartGetStop()
+ {
+ final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
+ {
+ @Override
+ public long getPollMs()
+ {
+ return 0;
+ }
+ };
+ EasyMock.expect(cacheManager.scheduleAndWait(
+ EasyMock.anyString(),
+ EasyMock.eq(extractionNamespace),
+ EasyMock.eq(60000L)
+ )).andReturn(true).once();
+ EasyMock.expect(cacheManager.getVersion(EasyMock.anyString())).andReturn("0").once();
+ EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
+ .andReturn(new ConcurrentHashMap())
+ .once();
+ EasyMock.expect(cacheManager.getVersion(EasyMock.anyString())).andReturn("0").once();
+ EasyMock.expect(
+ cacheManager.checkedDelete(EasyMock.anyString())
+ ).andReturn(true).once();
+ EasyMock.replay(cacheManager);
+
+ final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
+ extractionNamespace,
+ cacheManager
+ );
+ Assert.assertTrue(namespaceLookupExtractorFactory.start());
+ final LookupExtractor extractor = namespaceLookupExtractorFactory.get();
+ Assert.assertNull(extractor.apply("foo"));
+ Assert.assertTrue(namespaceLookupExtractorFactory.close());
+ EasyMock.verify(cacheManager);
+ }
+
+
+ @Test
+ public void testSimpleStartRacyGetDuringDelete()
+ {
+ final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
+ {
+ @Override
+ public long getPollMs()
+ {
+ return 0;
+ }
+ };
+ EasyMock.expect(cacheManager.scheduleAndWait(
+ EasyMock.anyString(),
+ EasyMock.eq(extractionNamespace),
+ EasyMock.eq(60000L)
+ )).andReturn(true).once();
+ EasyMock.expect(cacheManager.getVersion(EasyMock.anyString())).andReturn("0").once();
+ EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
+ .andReturn(new ConcurrentHashMap())
+ .once();
+ EasyMock.expect(cacheManager.getVersion(EasyMock.anyString())).andReturn(null).once();
+ EasyMock.expect(cacheManager.delete(EasyMock.anyString())).andReturn(true).once();
+ EasyMock.replay(cacheManager);
+
+ final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
+ extractionNamespace,
+ cacheManager
+ );
+ Assert.assertTrue(namespaceLookupExtractorFactory.start());
+ try {
+ namespaceLookupExtractorFactory.get();
+ Assert.fail("Should have thrown ISE");
+ }
+ catch (ISE ise) {
+ // NOOP
+ }
+
+ EasyMock.verify(cacheManager);
+ }
+
+
+ @Test
+ public void testSimpleStartRacyGetDuringUpdate()
+ {
+ final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
+ {
+ @Override
+ public long getPollMs()
+ {
+ return 0;
+ }
+ };
+ EasyMock.expect(cacheManager.scheduleAndWait(
+ EasyMock.anyString(),
+ EasyMock.eq(extractionNamespace),
+ EasyMock.eq(60000L)
+ )).andReturn(true).once();
+ EasyMock.expect(cacheManager.getVersion(EasyMock.anyString())).andReturn("0").once();
+ EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
+ .andReturn(new ConcurrentHashMap(ImmutableMap.of("foo", "bar")))
+ .once();
+ EasyMock.expect(cacheManager.getVersion(EasyMock.anyString())).andReturn("1").once();
+
+ EasyMock.expect(cacheManager.getVersion(EasyMock.anyString())).andReturn("2").once();
+ EasyMock.expect(cacheManager.getCacheMap(EasyMock.anyString()))
+ .andReturn(new ConcurrentHashMap())
+ .once();
+ EasyMock.expect(cacheManager.getVersion(EasyMock.anyString())).andReturn("2").once();
+ EasyMock.expect(cacheManager.checkedDelete(EasyMock.anyString())).andReturn(true).once();
+ EasyMock.replay(cacheManager);
+
+ final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
+ extractionNamespace,
+ cacheManager
+ );
+ Assert.assertTrue(namespaceLookupExtractorFactory.start());
+ final LookupExtractor extractor = namespaceLookupExtractorFactory.get();
+ Assert.assertNull(extractor.apply("foo"));
+ Assert.assertNotNull(extractor.getCacheKey());
+ Assert.assertTrue(namespaceLookupExtractorFactory.close());
+ EasyMock.verify(cacheManager);
+ }
+
+
+ @Test
+ public void testSimpleStartRacyGetAfterDelete()
+ {
+ final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
+ {
+ @Override
+ public long getPollMs()
+ {
+ return 0;
+ }
+ };
+ EasyMock.expect(cacheManager.scheduleAndWait(
+ EasyMock.anyString(),
+ EasyMock.eq(extractionNamespace),
+ EasyMock.eq(60000L)
+ )).andReturn(true).once();
+ EasyMock.expect(cacheManager.getVersion(EasyMock.anyString())).andReturn(null).once();
+ EasyMock.replay(cacheManager);
+
+ final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
+ extractionNamespace,
+ cacheManager
+ );
+ Assert.assertTrue(namespaceLookupExtractorFactory.start());
+ try {
+ namespaceLookupExtractorFactory.get();
+ Assert.fail("Should have thrown ISE");
+ }
+ catch (ISE ise) {
+ // NOOP
+ }
+
+ EasyMock.verify(cacheManager);
+ }
+
+
+ @Test
+ public void testSartFailsToSchedule()
+ {
+ final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
+ {
+ @Override
+ public long getPollMs()
+ {
+ return 0;
+ }
+ };
+ EasyMock.expect(cacheManager.scheduleAndWait(
+ EasyMock.anyString(),
+ EasyMock.eq(extractionNamespace),
+ EasyMock.eq(60000L)
+ )).andReturn(false).once();
+ EasyMock.replay(cacheManager);
+
+ final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
+ extractionNamespace,
+ cacheManager
+ );
+ Assert.assertFalse(namespaceLookupExtractorFactory.start());
+ // true because it never fully started
+ Assert.assertTrue(namespaceLookupExtractorFactory.close());
+ EasyMock.verify(cacheManager);
+ }
+
+ @Test
+ public void testReplaces()
+ {
+ final ExtractionNamespace en1 = EasyMock.createStrictMock(ExtractionNamespace.class), en2 = EasyMock.createStrictMock(
+ ExtractionNamespace.class);
+ EasyMock.replay(en1, en2);
+ final NamespaceLookupExtractorFactory f1 = new NamespaceLookupExtractorFactory(
+ en1,
+ cacheManager
+ ), f2 = new NamespaceLookupExtractorFactory(en2, cacheManager), f1b = new NamespaceLookupExtractorFactory(
+ en1,
+ cacheManager
+ );
+ Assert.assertTrue(f1.replaces(f2));
+ Assert.assertTrue(f2.replaces(f1));
+ Assert.assertFalse(f1.replaces(f1b));
+ Assert.assertFalse(f1b.replaces(f1));
+ Assert.assertFalse(f1.replaces(f1));
+ Assert.assertTrue(f1.replaces(EasyMock.createNiceMock(LookupExtractorFactory.class)));
+ EasyMock.verify(en1, en2);
+ }
+
+ @Test(expected = ISE.class)
+ public void testMustBeStarted()
+ {
+ final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
+ {
+ @Override
+ public long getPollMs()
+ {
+ return 0;
+ }
+ };
+
+ final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
+ extractionNamespace,
+ cacheManager
+ );
+
+ namespaceLookupExtractorFactory.get();
+ }
+
+ // Note this does NOT catch problems with returning factories as failed in error messages.
+ @Test
+ public void testSerDe() throws Exception
+ {
+ final Injector injector = Initialization.makeInjectorWithModules(
+ GuiceInjectors.makeStartupInjector(),
+ ImmutableList.of(
+ new Module()
+ {
+ @Override
+ public void configure(Binder binder)
+ {
+ JsonConfigProvider.bindInstance(
+ binder, Key.get(DruidNode.class, Self.class), new DruidNode("test-inject", null, null)
+ );
+ }
+ }
+ )
+ );
+ final ObjectMapper mapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class));
+ mapper.registerSubtypes(NamespaceLookupExtractorFactory.class);
+ final String str = "{ \"type\": \"cachedNamespace\", \"extractionNamespace\": { \"type\": \"uri\", \"uriPrefix\": \"s3://bucket/prefix/\", \"fileRegex\": \"foo.*\\\\.gz\", \"namespaceParseSpec\": { \"format\": \"customJson\", \"keyFieldName\": \"someKey\", \"valueFieldName\": \"someVal\" }, \"pollPeriod\": \"PT5M\" } } }";
+ final LookupExtractorFactory factory = mapper.readValue(str, LookupExtractorFactory.class);
+ Assert.assertTrue(factory instanceof NamespaceLookupExtractorFactory);
+ final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = (NamespaceLookupExtractorFactory) factory;
+ Assert.assertNotNull(mapper.writeValueAsString(factory));
+ Assert.assertFalse(factory.replaces(mapper.readValue(
+ mapper.writeValueAsString(factory),
+ LookupExtractorFactory.class
+ )));
+ Assert.assertEquals(
+ URIExtractionNamespace.class,
+ namespaceLookupExtractorFactory.getExtractionNamespace().getClass()
+ );
+ Assert.assertFalse(namespaceLookupExtractorFactory.replaces(mapper.readValue(str, LookupExtractorFactory.class)));
+ final Map map = new HashMap<>(mapper.