From 608d568b9e3502b2a3fbd51d7031417d38a1d9bb Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 18 Mar 2016 15:04:37 -0700 Subject: [PATCH 1/5] Make URI Exctraction Namespace take more sane arguments * Fixes https://github.com/druid-io/druid/issues/2669 --- .../data/SearchableVersionedDataFinder.java | 3 +- .../extensions-core/namespaced-lookup.md | 26 ++- .../hdfs/HdfsFileTimestampVersionFinder.java | 3 +- .../namespace/URIExtractionNamespace.java | 115 ++++++++---- ...URIExtractionNamespaceFunctionFactory.java | 27 ++- ...ffHeapNamespaceExtractionCacheManager.java | 1 + ...OnHeapNamespaceExtractionCacheManager.java | 9 +- .../namespace/URIExtractionNamespaceTest.java | 35 +++- .../NamespacedExtractorModuleTest.java | 4 + ...xtractionNamespaceFunctionFactoryTest.java | 175 ++++++++++++++++-- ...ceExtractionCacheManagerExecutorsTest.java | 6 + .../NamespaceExtractionCacheManagersTest.java | 58 ++++-- .../s3/S3TimestampVersionedDataFinder.java | 19 +- .../S3TimestampVersionedDataFinderTest.java | 36 +++- .../LocalFileTimestampVersionFinder.java | 3 +- 15 files changed, 420 insertions(+), 100 deletions(-) diff --git a/common/src/main/java/io/druid/data/SearchableVersionedDataFinder.java b/common/src/main/java/io/druid/data/SearchableVersionedDataFinder.java index f1105ea05a79..8755e9ae1dca 100644 --- a/common/src/main/java/io/druid/data/SearchableVersionedDataFinder.java +++ b/common/src/main/java/io/druid/data/SearchableVersionedDataFinder.java @@ -19,6 +19,7 @@ package io.druid.data; +import javax.annotation.Nullable; import java.util.regex.Pattern; /** @@ -41,7 +42,7 @@ public interface SearchableVersionedDataFinder * * @return A DataDescriptor which matches pattern, is a child of descriptorBase, and is of the most recent "version" at some point during the method execution. */ - DataDescriptor getLatestVersion(DataDescriptor descriptorBase, final Pattern pattern); + DataDescriptor getLatestVersion(DataDescriptor descriptorBase, @Nullable final Pattern pattern); /** * @return The class of the descriptor for the data diff --git a/docs/content/development/extensions-core/namespaced-lookup.md b/docs/content/development/extensions-core/namespaced-lookup.md index a178b0c342a0..ad073dc58539 100644 --- a/docs/content/development/extensions-core/namespaced-lookup.md +++ b/docs/content/development/extensions-core/namespaced-lookup.md @@ -75,32 +75,46 @@ For additional lookups, please see our [extensions list](../development/extensio ## URI namespace update -The remapping values for each namespaced lookup can be specified by json as per +The remapping values for each namespaced lookup can be specified by a json object as per the following examples: ```json { "type":"uri", "namespace":"some_lookup", - "uri": "s3://bucket/some/key/prefix/", + "uri": "s3://bucket/some/key/prefix/renames-0003.gz", "namespaceParseSpec":{ "format":"csv", "columns":["key","value"] }, "pollPeriod":"PT5M", - "versionRegex": "renames-[0-9]*\\.gz" } ``` +```json +{ + "type":"uri", + "namespace":"some_lookup", + "uriPrefix": "s3://bucket/some/key/prefix/", + "fileRegex":"renames-[0-9]*\\.gz", + "namespaceParseSpec":{ + "format":"csv", + "columns":["key","value"] + }, + "pollPeriod":"PT5M", +} +``` |Property|Description|Required|Default| |--------|-----------|--------|-------| |`namespace`|The namespace to define|Yes|| |`pollPeriod`|Period between polling for updates|No|0 (only once)| -|`versionRegex`|Regex to help find newer versions of the namespace data|Yes|| +|`uri`|URI for the file of interest|No|Use `uriPrefix`| +|`uriPrefix`|A URI which specifies a directory (or other searchable resource) in which to search for files|No|Use `uri`| +|`fileRegex`|Optional regex for matching the file name under `uriPrefix`. Only used if `uriPrefix` is used|No|`".*"`| |`namespaceParseSpec`|How to interpret the data at the URI|Yes|| -The `pollPeriod` value specifies the period in ISO 8601 format between checks for updates. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to update. Whenever an update occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data. +One of either `uri` xor `uriPrefix` must be specified. -The `versionRegex` value specifies a regex to use to determine if a filename in the parent path of the uri should be considered when trying to find the latest version. Omitting this setting or setting it equal to `null` will match to all files it can find (equivalent to using `".*"`). The search occurs in the most significant "directory" of the uri. +The `pollPeriod` value specifies the period in ISO 8601 format between checks for updates. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to update. Whenever an update occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data. The `namespaceParseSpec` can be one of a number of values. Each of the examples below would rename foo to bar, baz to bat, and buck to truck. All parseSpec types assumes each input is delimited by a new line. See below for the types of parseSpec supported. diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java index a708259121ed..7f872f5a0366 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsFileTimestampVersionFinder.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import javax.annotation.Nullable; import java.io.IOException; import java.net.URI; import java.util.concurrent.Callable; @@ -81,7 +82,7 @@ public boolean accept(Path path) * @return The URI of the file with the most recent modified timestamp. */ @Override - public URI getLatestVersion(final URI uri, final Pattern pattern) + public URI getLatestVersion(final URI uri, final @Nullable Pattern pattern) { final Path path = new Path(uri); try { diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/URIExtractionNamespace.java b/extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/URIExtractionNamespace.java index b6d2bf58bcce..a55d3f524675 100644 --- a/extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/URIExtractionNamespace.java +++ b/extensions-core/namespace-lookup/src/main/java/io/druid/query/extraction/namespace/URIExtractionNamespace.java @@ -64,40 +64,58 @@ public class URIExtractionNamespace implements ExtractionNamespace @JsonProperty private final URI uri; @JsonProperty + private final URI uriPrefix; + @JsonProperty private final FlatDataParser namespaceParseSpec; @JsonProperty - private final Period pollPeriod; + private final String fileRegex; @JsonProperty - private final String versionRegex; + private final Period pollPeriod; @JsonCreator public URIExtractionNamespace( @NotNull @JsonProperty(value = "namespace", required = true) - String namespace, - @NotNull @JsonProperty(value = "uri", required = true) - URI uri, + String namespace, + @JsonProperty(value = "uri", required = false) + URI uri, + @JsonProperty(value = "uriPrefix", required = false) + URI uriPrefix, + @JsonProperty(value = "fileRegex", required = false) + String fileRegex, @JsonProperty(value = "namespaceParseSpec", required = true) - FlatDataParser namespaceParseSpec, + FlatDataParser namespaceParseSpec, @Min(0) @Nullable @JsonProperty(value = "pollPeriod", required = false) - Period pollPeriod, + Period pollPeriod, + @Deprecated @JsonProperty(value = "versionRegex", required = false) - String versionRegex + String versionRegex ) { - if (versionRegex != null) { + this.namespace = Preconditions.checkNotNull(namespace, "namespace"); + this.uri = uri; + this.uriPrefix = uriPrefix; + if ((uri != null) == (uriPrefix != null)) { + throw new IAE("Either uri xor uriPrefix required"); + } + this.namespaceParseSpec = Preconditions.checkNotNull(namespaceParseSpec, "namespaceParseSpec"); + this.pollPeriod = pollPeriod == null ? Period.ZERO : pollPeriod; + this.fileRegex = fileRegex == null ? versionRegex : fileRegex; + if (fileRegex != null && versionRegex != null) { + throw new IAE("Cannot specify both versionRegex and fileRegex. versionRegex is deprecated"); + } + + if (uri != null && this.fileRegex != null) { + throw new IAE("Cannot define both uri and fileRegex"); + } + + if (this.fileRegex != null) { try { - Pattern.compile(versionRegex); + Pattern.compile(this.fileRegex); } catch (PatternSyntaxException ex) { - throw new IAE(ex, "Could not parse `versionRegex` [%s]", versionRegex); + throw new IAE(ex, "Could not parse `fileRegex` [%s]", this.fileRegex); } } - this.namespace = Preconditions.checkNotNull(namespace, "namespace"); - this.uri = Preconditions.checkNotNull(uri, "uri"); - this.namespaceParseSpec = Preconditions.checkNotNull(namespaceParseSpec, "namespaceParseSpec"); - this.pollPeriod = pollPeriod == null ? Period.ZERO : pollPeriod; - - this.versionRegex = versionRegex; } @Override @@ -106,9 +124,9 @@ public String getNamespace() return namespace; } - public String getVersionRegex() + public String getFileRegex() { - return versionRegex; + return fileRegex; } public FlatDataParser getNamespaceParseSpec() @@ -121,6 +139,11 @@ public URI getUri() return uri; } + public URI getUriPrefix() + { + return uriPrefix; + } + @Override public long getPollMs() { @@ -130,17 +153,16 @@ public long getPollMs() @Override public String toString() { - return String.format( - "URIExtractionNamespace = { namespace = %s, uri = %s, namespaceParseSpec = %s, pollPeriod = %s, versionRegex = %s }", - namespace, - uri.toString(), - namespaceParseSpec.toString(), - pollPeriod.toString(), - versionRegex - ); + return "URIExtractionNamespace{" + + "namespace='" + namespace + '\'' + + ", uri=" + uri + + ", uriPrefix=" + uriPrefix + + ", namespaceParseSpec=" + namespaceParseSpec + + ", fileRegex='" + fileRegex + '\'' + + ", pollPeriod=" + pollPeriod + + '}'; } - @Override public boolean equals(Object o) { @@ -151,22 +173,39 @@ public boolean equals(Object o) return false; } - URIExtractionNamespace namespace1 = (URIExtractionNamespace) o; - return toString().equals(namespace1.toString()); + URIExtractionNamespace that = (URIExtractionNamespace) o; + + if (!getNamespace().equals(that.getNamespace())) { + return false; + } + if (getUri() != null ? !getUri().equals(that.getUri()) : that.getUri() != null) { + return false; + } + if (getUriPrefix() != null ? !getUriPrefix().equals(that.getUriPrefix()) : that.getUriPrefix() != null) { + return false; + } + if (!getNamespaceParseSpec().equals(that.getNamespaceParseSpec())) { + return false; + } + if (getFileRegex() != null ? !getFileRegex().equals(that.getFileRegex()) : that.getFileRegex() != null) { + return false; + } + return pollPeriod.equals(that.pollPeriod); + } @Override public int hashCode() { - int result = namespace.hashCode(); - result = 31 * result + uri.hashCode(); - result = 31 * result + namespaceParseSpec.hashCode(); + int result = getNamespace().hashCode(); + result = 31 * result + (getUri() != null ? getUri().hashCode() : 0); + result = 31 * result + (getUriPrefix() != null ? getUriPrefix().hashCode() : 0); + result = 31 * result + getNamespaceParseSpec().hashCode(); + result = 31 * result + (getFileRegex() != null ? getFileRegex().hashCode() : 0); result = 31 * result + pollPeriod.hashCode(); - result = 31 * result + (versionRegex != null ? versionRegex.hashCode() : 0); return result; } - private static class DelegateParser implements Parser { private final Parser delegate; @@ -268,7 +307,11 @@ public CSVFlatDataParser( Arrays.toString(columns.toArray()) ); - this.parser = new DelegateParser(new CSVParser(Optional.absent(), columns), this.keyColumn, this.valueColumn); + this.parser = new DelegateParser( + new CSVParser(Optional.absent(), columns), + this.keyColumn, + this.valueColumn + ); } @JsonProperty diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactory.java b/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactory.java index dba8b90fa8dc..3c1011190c9a 100644 --- a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactory.java +++ b/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactory.java @@ -45,6 +45,8 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -120,7 +122,8 @@ public Callable getCachePopulator( @Override public String call() { - final URI originalUri = extractionNamespace.getUri(); + final boolean doSearch = extractionNamespace.getUriPrefix() != null; + final URI originalUri = doSearch ? extractionNamespace.getUriPrefix() : extractionNamespace.getUri(); final SearchableVersionedDataFinder pullerRaw = pullers.get(originalUri.getScheme()); if (pullerRaw == null) { throw new IAE( @@ -132,15 +135,29 @@ public String call() if (!(pullerRaw instanceof URIDataPuller)) { throw new IAE( "Cannot load data from location [%s]. Data pulling from [%s] not supported", - originalUri.toString(), + originalUri, originalUri.getScheme() ); } final URIDataPuller puller = (URIDataPuller) pullerRaw; - final String versionRegex = extractionNamespace.getVersionRegex(); + final Pattern versionRegex; + final URI uriBase; + if (doSearch) { + uriBase = extractionNamespace.getUriPrefix(); + + if (extractionNamespace.getFileRegex() != null) { + versionRegex = Pattern.compile(extractionNamespace.getFileRegex()); + } else { + versionRegex = null; + } + } else { + final Path filePath = Paths.get(extractionNamespace.getUri()); + versionRegex = Pattern.compile(Pattern.quote(filePath.getFileName().toString())); + uriBase = filePath.getParent().toUri(); + } final URI uri = pullerRaw.getLatestVersion( - originalUri, - versionRegex == null ? null : Pattern.compile(versionRegex) + uriBase, + versionRegex ); if (uri == null) { throw new RuntimeException( diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OffHeapNamespaceExtractionCacheManager.java b/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OffHeapNamespaceExtractionCacheManager.java index a56ace9c9563..bbe4d826232b 100644 --- a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OffHeapNamespaceExtractionCacheManager.java +++ b/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OffHeapNamespaceExtractionCacheManager.java @@ -119,6 +119,7 @@ protected boolean swapAndClearCache(String namespaceKey, String cacheKey) final String priorCache = currentNamespaceCache.put(namespaceKey, swapCacheKey); if (priorCache != null) { + // TODO: resolve what happens here if query is actively going on mmapDB.delete(priorCache); dataSize.set(tmpFile.length()); return true; diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java b/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java index 9af6cb508415..a09560871a79 100644 --- a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java +++ b/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.Striped; import com.google.inject.Inject; import com.google.inject.name.Named; +import com.metamx.common.IAE; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.emitter.service.ServiceEmitter; import io.druid.query.extraction.namespace.ExtractionNamespace; @@ -53,7 +54,7 @@ public OnHeapNamespaceExtractionCacheManager( final Map, ExtractionNamespaceFunctionFactory> namespaceFunctionFactoryMap ) { - super(lifecycle, fnCache, reverseFnCache,emitter, namespaceFunctionFactoryMap); + super(lifecycle, fnCache, reverseFnCache, emitter, namespaceFunctionFactoryMap); } @Override @@ -64,14 +65,14 @@ protected boolean swapAndClearCache(String namespaceKey, String cacheKey) try { ConcurrentMap cacheMap = mapMap.get(cacheKey); if (cacheMap == null) { - // Sometimes cache will not be populated (for example: if it doesn't contain new data) - return false; + throw new IAE("Namespace [%s] does not exist", cacheKey); } dataSize.addAndGet(cacheMap.size()); ConcurrentMap prior = mapMap.put(namespaceKey, cacheMap); mapMap.remove(cacheKey); if (prior != null) { dataSize.addAndGet(-prior.size()); + // Old map will get GC'd when it is not used anymore return true; } else { return false; @@ -87,7 +88,7 @@ public ConcurrentMap getCacheMap(String namespaceOrCacheKey) { ConcurrentMap map = mapMap.get(namespaceOrCacheKey); if (map == null) { - mapMap.putIfAbsent(namespaceOrCacheKey, new ConcurrentHashMap(32)); + mapMap.putIfAbsent(namespaceOrCacheKey, new ConcurrentHashMap()); map = mapMap.get(namespaceOrCacheKey); } return map; diff --git a/extensions-core/namespace-lookup/src/test/java/io/druid/query/extraction/namespace/URIExtractionNamespaceTest.java b/extensions-core/namespace-lookup/src/test/java/io/druid/query/extraction/namespace/URIExtractionNamespaceTest.java index 982926a1acc5..5673bcc3ea35 100644 --- a/extensions-core/namespace-lookup/src/test/java/io/druid/query/extraction/namespace/URIExtractionNamespaceTest.java +++ b/extensions-core/namespace-lookup/src/test/java/io/druid/query/extraction/namespace/URIExtractionNamespaceTest.java @@ -19,6 +19,7 @@ package io.druid.query.extraction.namespace; +import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; import com.google.common.collect.ImmutableList; @@ -323,12 +324,31 @@ public void testSimpleToString() throws IOException } } + @Test + public void testMatchedJson() throws IOException + { + final ObjectMapper mapper = registerTypes(new DefaultObjectMapper()); + URIExtractionNamespace namespace = mapper.readValue( + "{\"type\":\"uri\", \"uriPrefix\":\"file:/foo\", \"namespaceParseSpec\":{\"format\":\"simpleJson\"}, \"pollPeriod\":\"PT5M\", \"versionRegex\":\"a.b.c\", \"namespace\":\"testNamespace\"}", + URIExtractionNamespace.class + ); + + Assert.assertEquals( + URIExtractionNamespace.ObjectMapperFlatDataParser.class.getCanonicalName(), + namespace.getNamespaceParseSpec().getClass().getCanonicalName() + ); + Assert.assertEquals("file:/foo", namespace.getUriPrefix().toString()); + Assert.assertEquals("testNamespace", namespace.getNamespace()); + Assert.assertEquals("a.b.c", namespace.getFileRegex()); + Assert.assertEquals(5L * 60_000L, namespace.getPollMs()); + } + @Test public void testExplicitJson() throws IOException { final ObjectMapper mapper = registerTypes(new DefaultObjectMapper()); URIExtractionNamespace namespace = mapper.readValue( - "{\"type\":\"uri\", \"uri\":\"file:/foo\", \"namespaceParseSpec\":{\"format\":\"simpleJson\"}, \"pollPeriod\":\"PT5M\", \"versionRegex\":\"a.b.c\", \"namespace\":\"testNamespace\"}", + "{\"type\":\"uri\", \"uri\":\"file:/foo/a.b.c\", \"namespaceParseSpec\":{\"format\":\"simpleJson\"}, \"pollPeriod\":\"PT5M\", \"namespace\":\"testNamespace\"}", URIExtractionNamespace.class ); @@ -336,12 +356,21 @@ public void testExplicitJson() throws IOException URIExtractionNamespace.ObjectMapperFlatDataParser.class.getCanonicalName(), namespace.getNamespaceParseSpec().getClass().getCanonicalName() ); - Assert.assertEquals("file:/foo", namespace.getUri().toString()); + Assert.assertEquals("file:/foo/a.b.c", namespace.getUri().toString()); Assert.assertEquals("testNamespace", namespace.getNamespace()); - Assert.assertEquals("a.b.c", namespace.getVersionRegex()); Assert.assertEquals(5L * 60_000L, namespace.getPollMs()); } + @Test(expected = JsonMappingException.class) + public void testExplicitJsonException() throws IOException + { + final ObjectMapper mapper = registerTypes(new DefaultObjectMapper()); + mapper.readValue( + "{\"type\":\"uri\", \"uri\":\"file:/foo\", \"namespaceParseSpec\":{\"format\":\"simpleJson\"}, \"pollPeriod\":\"PT5M\", \"versionRegex\":\"a.b.c\", \"namespace\":\"testNamespace\"}", + URIExtractionNamespace.class + ); + } + @Test public void testFlatDataNumeric() { diff --git a/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/NamespacedExtractorModuleTest.java b/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/NamespacedExtractorModuleTest.java index f38ffdf42583..3bc49241642c 100644 --- a/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/NamespacedExtractorModuleTest.java +++ b/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/NamespacedExtractorModuleTest.java @@ -110,6 +110,7 @@ public void testNewTask() throws Exception final URIExtractionNamespace namespace = new URIExtractionNamespace( "ns", tmpFile.toURI(), + null, null, new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new DefaultObjectMapper()) ), @@ -132,6 +133,7 @@ public void testListNamespaces() throws Exception final URIExtractionNamespace namespace = new URIExtractionNamespace( "ns", tmpFile.toURI(), + null, null, new URIExtractionNamespace.ObjectMapperFlatDataParser(URIExtractionNamespaceTest.registerTypes(new DefaultObjectMapper())), new Period(0), null @@ -159,6 +161,7 @@ public void testDeleteNamespaces() throws Exception final URIExtractionNamespace namespace = new URIExtractionNamespace( "ns", tmpFile.toURI(), + null, null, new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new DefaultObjectMapper()) ), @@ -181,6 +184,7 @@ public void testNewUpdate() throws Exception final URIExtractionNamespace namespace = new URIExtractionNamespace( "ns", tmpFile.toURI(), + null, null, new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new DefaultObjectMapper()) ), diff --git a/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactoryTest.java b/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactoryTest.java index e6a6009231c3..8c3aca0b80e4 100644 --- a/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactoryTest.java +++ b/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/URIExtractionNamespaceFunctionFactoryTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; +import com.metamx.common.IAE; import com.metamx.common.UOE; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.emitter.service.ServiceEmitter; @@ -40,12 +41,15 @@ import io.druid.server.namespace.cache.NamespaceExtractionCacheManagersTest; import io.druid.server.namespace.cache.OffHeapNamespaceExtractionCacheManager; import io.druid.server.namespace.cache.OnHeapNamespaceExtractionCacheManager; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -60,13 +64,13 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -248,6 +252,8 @@ public URIExtractionNamespaceFunctionFactoryTest( @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); private final String suffix; private final Function outStreamSupplier; @@ -288,11 +294,12 @@ public void setUp() throws Exception namespace = new URIExtractionNamespace( "ns", tmpFile.toURI(), + null, null, new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new ObjectMapper()) ), new Period(0), - Pattern.quote(tmpFile.getName()) + null ); } @@ -308,7 +315,27 @@ public void simpleTest() throws IOException, ExecutionException, InterruptedExce Assert.assertNull(fnCache.get(namespace.getNamespace())); NamespaceExtractionCacheManagersTest.waitFor(manager.schedule(namespace)); Function fn = fnCache.get(namespace.getNamespace()); - Assert.assertNotNull(fn); + Assert.assertNotNull(fn); + Assert.assertEquals("bar", fn.apply("foo")); + Assert.assertEquals(null, fn.apply("baz")); + } + + @Test + public void simpleTestRegex() throws IOException, ExecutionException, InterruptedException + { + final URIExtractionNamespace namespace = new URIExtractionNamespace( + this.namespace.getNamespace(), + null, + Paths.get(this.namespace.getUri()).getParent().toUri(), + Pattern.quote(Paths.get(this.namespace.getUri()).getFileName().toString()), + this.namespace.getNamespaceParseSpec(), + Period.millis((int) this.namespace.getPollMs()), + null + ); + Assert.assertNull(fnCache.get(namespace.getNamespace())); + NamespaceExtractionCacheManagersTest.waitFor(manager.schedule(namespace)); + Function fn = fnCache.get(namespace.getNamespace()); + Assert.assertNotNull(fn); Assert.assertEquals("bar", fn.apply("foo")); Assert.assertEquals(null, fn.apply("baz")); } @@ -336,11 +363,12 @@ public void simplePileONamespacesTest() throws InterruptedException URIExtractionNamespace namespace = new URIExtractionNamespace( String.format("%d-ns-%d", i << 10, i), tmpFile.toURI(), + null, null, new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new ObjectMapper()) ), new Period(0), - Pattern.quote(tmpFile.getName()) + null ); namespaces.add(namespace); @@ -386,20 +414,137 @@ public void testMissing() throws Exception URIExtractionNamespace badNamespace = new URIExtractionNamespace( namespace.getNamespace(), namespace.getUri(), + null, null, namespace.getNamespaceParseSpec(), Period.millis((int) namespace.getPollMs()), - "\\QNEVER GONNA FIND ME" + UUID - .randomUUID().toString() + "\\E" + null ); + Assert.assertTrue(new File(namespace.getUri()).delete()); ConcurrentMap map = new ConcurrentHashMap<>(); - try { - factory.getCachePopulator(badNamespace, null, map).call(); - } - catch (RuntimeException e) { - Assert.assertNotNull(e.getCause()); - Assert.assertEquals(FileNotFoundException.class, e.getCause().getClass()); - return; - } - Assert.fail("Did not have exception"); + expectedException.expect(new BaseMatcher() + { + @Override + public void describeTo(Description description) + { + + } + + @Override + public boolean matches(Object o) + { + if (!(o instanceof Throwable)) { + return false; + } + final Throwable t = (Throwable) o; + return t.getCause() != null && t.getCause() instanceof FileNotFoundException; + } + }); + factory.getCachePopulator(badNamespace, null, map).call(); + } + + @Test + public void testMissingRegex() throws Exception + { + URIExtractionNamespace badNamespace = new URIExtractionNamespace( + namespace.getNamespace(), + null, + Paths.get(namespace.getUri()).getParent().toUri(), + Pattern.quote(Paths.get(namespace.getUri()).getFileName().toString()), + namespace.getNamespaceParseSpec(), + Period.millis((int) namespace.getPollMs()), + null + ); + Assert.assertTrue(new File(namespace.getUri()).delete()); + ConcurrentMap map = new ConcurrentHashMap<>(); + expectedException.expect(new BaseMatcher() + { + @Override + public void describeTo(Description description) + { + + } + + @Override + public boolean matches(Object o) + { + if (!(o instanceof Throwable)) { + return false; + } + final Throwable t = (Throwable) o; + return t.getCause() != null && t.getCause() instanceof FileNotFoundException; + } + }); + factory.getCachePopulator(badNamespace, null, map).call(); + } + + @Test(expected = IAE.class) + public void testExceptionalCreationDoubleURI() + { + new URIExtractionNamespace( + namespace.getNamespace(), + namespace.getUri(), + namespace.getUri(), + null, + namespace.getNamespaceParseSpec(), + Period.millis((int)namespace.getPollMs()), + null + ); + } + + @Test(expected = IAE.class) + public void testExceptionalCreationURIWithPattern() + { + new URIExtractionNamespace( + namespace.getNamespace(), + namespace.getUri(), + null, + "", + namespace.getNamespaceParseSpec(), + Period.millis((int)namespace.getPollMs()), + null + ); + } + + @Test(expected = IAE.class) + public void testExceptionalCreationURIWithLegacyPattern() + { + new URIExtractionNamespace( + namespace.getNamespace(), + namespace.getUri(), + null, + null, + namespace.getNamespaceParseSpec(), + Period.millis((int)namespace.getPollMs()), + "" + ); + } + + @Test(expected = IAE.class) + public void testLegacyMix() + { + new URIExtractionNamespace( + namespace.getNamespace(), + null, + namespace.getUri(), + "", + namespace.getNamespaceParseSpec(), + Period.millis((int)namespace.getPollMs()), + "" + ); + } + + + @Test(expected = IAE.class) + public void testBadPattern() + { + new URIExtractionNamespace( + namespace.getNamespace(), + null, + namespace.getUri(), + "[", + namespace.getNamespaceParseSpec(), + Period.millis((int)namespace.getPollMs()), + null + ); } } diff --git a/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java b/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java index bde0f303b3e3..0167277b1a14 100644 --- a/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java +++ b/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagerExecutorsTest.java @@ -177,6 +177,7 @@ public void testDoubleSubmission() URIExtractionNamespace namespace = new URIExtractionNamespace( "ns", tmpFile.toURI(), + null, null, new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new ObjectMapper()) ), @@ -201,6 +202,7 @@ public void testSimpleSubmission() throws ExecutionException, InterruptedExcepti URIExtractionNamespace namespace = new URIExtractionNamespace( "ns", tmpFile.toURI(), + null, null, new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new ObjectMapper()) ), @@ -221,6 +223,7 @@ public void testRepeatSubmission() throws ExecutionException, InterruptedExcepti final URIExtractionNamespace namespace = new URIExtractionNamespace( "ns", tmpFile.toURI(), + null, null, new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new ObjectMapper()) ), @@ -353,6 +356,7 @@ public void testDelete(final String ns) final URIExtractionNamespace namespace = new URIExtractionNamespace( ns, tmpFile.toURI(), + null, null, new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new ObjectMapper()) ), @@ -431,6 +435,7 @@ public void testShutdown() final URIExtractionNamespace namespace = new URIExtractionNamespace( "ns", tmpFile.toURI(), + null, null, new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new ObjectMapper()) ), @@ -479,6 +484,7 @@ public void testRunCount() final URIExtractionNamespace namespace = new URIExtractionNamespace( "ns", tmpFile.toURI(), + null, null, new URIExtractionNamespace.ObjectMapperFlatDataParser( URIExtractionNamespaceTest.registerTypes(new ObjectMapper()) ), diff --git a/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java b/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java index d9f80a61c005..ca9c167da4da 100644 --- a/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java +++ b/extensions-core/namespace-lookup/src/test/java/io/druid/server/namespace/cache/NamespaceExtractionCacheManagersTest.java @@ -24,9 +24,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import io.druid.query.extraction.namespace.ExtractionNamespace; @@ -45,7 +42,6 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -58,7 +54,7 @@ public class NamespaceExtractionCacheManagersTest private static final Logger log = new Logger(NamespaceExtractionCacheManagersTest.class); private static final Lifecycle lifecycle = new Lifecycle(); - @Parameterized.Parameters + @Parameterized.Parameters(name = "{0}") public static Collection getParameters() { ArrayList params = new ArrayList<>(); @@ -67,14 +63,14 @@ public static Collection getParameters() ConcurrentMap>> reverserFnMap = new ConcurrentHashMap>>(); params.add( new Object[]{ - new OffHeapNamespaceExtractionCacheManager( - lifecycle, - fnMap, - reverserFnMap, - new NoopServiceEmitter(), - ImmutableMap., ExtractionNamespaceFunctionFactory>of() - ), fnMap - } + new OffHeapNamespaceExtractionCacheManager( + lifecycle, + fnMap, + reverserFnMap, + new NoopServiceEmitter(), + ImmutableMap., ExtractionNamespaceFunctionFactory>of() + ), fnMap + } ); params.add( new Object[]{ @@ -137,6 +133,36 @@ public void testSimpleCacheCreate() } } + @Test + public void testSimpleCacheSwap() + { + for (String ns : nsList) { + ConcurrentMap map = extractionCacheManager.getCacheMap(ns + "old_cache"); + map.put("key", "val"); + extractionCacheManager.swapAndClearCache(ns, ns + "old_cache"); + Assert.assertEquals("val", map.get("key")); + Assert.assertEquals("val", extractionCacheManager.getCacheMap(ns).get("key")); + + ConcurrentMap map2 = extractionCacheManager.getCacheMap(ns + "cache"); + map2.put("key", "val2"); + Assert.assertTrue(extractionCacheManager.swapAndClearCache(ns, ns + "cache")); + Assert.assertEquals("val2", map2.get("key")); + Assert.assertEquals("val2", extractionCacheManager.getCacheMap(ns).get("key")); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testMissingCacheThrowsIAE() + { + for (String ns : nsList) { + ConcurrentMap map = extractionCacheManager.getCacheMap(ns); + map.put("key", "val"); + Assert.assertEquals("val", map.get("key")); + Assert.assertEquals("val", extractionCacheManager.getCacheMap(ns).get("key")); + Assert.assertFalse(extractionCacheManager.swapAndClearCache(ns, "I don't exist")); + } + } + @Test public void testCacheList() { @@ -147,6 +173,12 @@ public void testCacheList() Assert.assertArrayEquals(nsList.toArray(), retvalList.toArray()); } + @Test + public void testNoDeleteNonexistant() + { + Assert.assertFalse(extractionCacheManager.delete("I don't exist")); + } + public static void waitFor(Future future) throws InterruptedException { while (!future.isDone()) { diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TimestampVersionedDataFinder.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TimestampVersionedDataFinder.java index 7fd93749b773..995e654e738f 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TimestampVersionedDataFinder.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TimestampVersionedDataFinder.java @@ -26,8 +26,8 @@ import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Object; +import javax.annotation.Nullable; import java.net.URI; -import java.nio.file.Paths; import java.util.concurrent.Callable; import java.util.regex.Pattern; @@ -48,13 +48,14 @@ public S3TimestampVersionedDataFinder(RestS3Service s3Client) * delimited paths. If the uri path ends with '/', the path is assumed to be the parent. * * @param uri The URI of interest whose "parent" will be searched as a key prefix for the latest version - * @param pattern The pattern matcher to determine if a *key* is of interest. This will match against the entire key, - * not just the equivalent "filename" like some other implementations. A null value matches everything + * @param pattern The pattern matcher to determine if a *key* is of interest. This will match against the portion of the key that is beyond the URI path, + * not just the equivalent "filename" like some other implementations. A null value matches everything. + * If there is a "/" delimiter between the uri path and the file match, it is ignore. Patterns should **not** account for a leading "/" unless there's a double "/" for some reason * * @return A URI to the most recently modified object which matched the pattern. */ @Override - public URI getLatestVersion(final URI uri, final Pattern pattern) + public URI getLatestVersion(final URI uri, final @Nullable Pattern pattern) { try { return RetryUtils.retry( @@ -66,19 +67,13 @@ public URI call() throws Exception final S3Coords coords = new S3Coords(checkURI(uri)); long mostRecent = Long.MIN_VALUE; URI latest = null; - String parentPath = coords.path.endsWith("/") - ? coords.path - : Paths.get(coords.path).getParent().toString(); - if (!parentPath.endsWith("/")) { - parentPath = parentPath + "/"; - } - S3Object[] objects = s3Client.listObjects(coords.bucket, parentPath, "/"); + S3Object[] objects = s3Client.listObjects(coords.bucket, coords.path, "/"); if (objects == null) { return null; } for (S3Object storageObject : objects) { storageObject.closeDataInputStream(); - String keyString = storageObject.getKey().substring(parentPath.length()); + String keyString = storageObject.getKey().substring(coords.path.length()); if (keyString.startsWith("/")) { keyString = keyString.substring(1); } diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3TimestampVersionedDataFinderTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3TimestampVersionedDataFinderTest.java index 5826da72f63f..e4422a4079f2 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3TimestampVersionedDataFinderTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3TimestampVersionedDataFinderTest.java @@ -60,7 +60,7 @@ public void testSimpleLatestVersion() throws S3ServiceException EasyMock.replay(s3Client); - URI latest = finder.getLatestVersion(URI.create(String.format("s3://%s/%s", bucket, object0.getKey())), pattern); + URI latest = finder.getLatestVersion(URI.create(String.format("s3://%s/%s", bucket, keyPrefix)), pattern); EasyMock.verify(s3Client); @@ -96,7 +96,7 @@ public void testMissing() throws S3ServiceException EasyMock.replay(s3Client); - URI latest = finder.getLatestVersion(URI.create(String.format("s3://%s/%s", bucket, object0.getKey())), pattern); + URI latest = finder.getLatestVersion(URI.create(String.format("s3://%s/%s", bucket, keyPrefix)), pattern); EasyMock.verify(s3Client); @@ -126,7 +126,7 @@ public void testFindSelf() throws S3ServiceException EasyMock.replay(s3Client); - URI latest = finder.getLatestVersion(URI.create(String.format("s3://%s/%s", bucket, object0.getKey())), pattern); + URI latest = finder.getLatestVersion(URI.create(String.format("s3://%s/%s", bucket, keyPrefix)), pattern); EasyMock.verify(s3Client); @@ -135,4 +135,34 @@ public void testFindSelf() throws S3ServiceException Assert.assertEquals(expected, latest); } + @Test + public void testFindExact() throws S3ServiceException + { + String bucket = "bucket"; + String keyPrefix = "prefix/dir/0"; + RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); + + S3Object object0 = new S3Object(); + + object0.setBucketName(bucket); + object0.setKey(keyPrefix + "/renames-0.gz"); + object0.setLastModifiedDate(new Date(0)); + + EasyMock.expect(s3Client.listObjects(EasyMock.eq(bucket), EasyMock.anyString(), EasyMock.eq("/"))).andReturn( + new S3Object[]{object0} + ).once(); + S3TimestampVersionedDataFinder finder = new S3TimestampVersionedDataFinder(s3Client); + + + EasyMock.replay(s3Client); + + + URI latest = finder.getLatestVersion(URI.create(String.format("s3://%s/%s", bucket, object0.getKey())), null); + + EasyMock.verify(s3Client); + + URI expected = URI.create(String.format("s3://%s/%s", bucket, object0.getKey())); + + Assert.assertEquals(expected, latest); + } } diff --git a/server/src/main/java/io/druid/segment/loading/LocalFileTimestampVersionFinder.java b/server/src/main/java/io/druid/segment/loading/LocalFileTimestampVersionFinder.java index c5d1130a21df..9b1629725ce0 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalFileTimestampVersionFinder.java +++ b/server/src/main/java/io/druid/segment/loading/LocalFileTimestampVersionFinder.java @@ -23,6 +23,7 @@ import com.metamx.common.RetryUtils; import io.druid.data.SearchableVersionedDataFinder; +import javax.annotation.Nullable; import java.io.File; import java.io.FileFilter; import java.io.FileNotFoundException; @@ -74,7 +75,7 @@ public boolean accept(File pathname) * @return The URI of the most recently modified file which matches the pattern, or `null` if it cannot be found */ @Override - public URI getLatestVersion(URI uri, final Pattern pattern) + public URI getLatestVersion(URI uri, final @Nullable Pattern pattern) { final File file = new File(uri); try { From 04a681f1734f12503c2b01ebd880dd105794a0ee Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 21 Apr 2016 15:10:07 -0700 Subject: [PATCH 2/5] Update docs --- .../extensions-core/namespaced-lookup.md | 38 ++----------------- 1 file changed, 3 insertions(+), 35 deletions(-) diff --git a/docs/content/development/extensions-core/namespaced-lookup.md b/docs/content/development/extensions-core/namespaced-lookup.md index ad073dc58539..bdddf5c100a1 100644 --- a/docs/content/development/extensions-core/namespaced-lookup.md +++ b/docs/content/development/extensions-core/namespaced-lookup.md @@ -18,41 +18,7 @@ Make sure to [include](../../operations/including-extensions.html) `druid-namesp Namespaced lookups are appropriate for lookups which are not possible to pass at query time due to their size, or are not desired to be passed at query time because the data is to reside in and be handled by the Druid servers. -Namespaced lookups can be specified as part of the runtime properties file. The property is a list of the namespaces -described as per the sections on this page. For example: - - ```json - druid.query.extraction.namespace.lookups= - [ - { - "type": "uri", - "namespace": "some_uri_lookup", - "uri": "file:/tmp/prefix/", - "namespaceParseSpec": { - "format": "csv", - "columns": [ - "key", - "value" - ] - }, - "pollPeriod": "PT5M" - }, - { - "type": "jdbc", - "namespace": "some_jdbc_lookup", - "connectorConfig": { - "createTables": true, - "connectURI": "jdbc:mysql:\/\/localhost:3306\/druid", - "user": "druid", - "password": "diurd" - }, - "table": "lookupTable", - "keyColumn": "mykeyColumn", - "valueColumn": "MyValueColumn", - "tsColumn": "timeColumn" - } - ] - ``` +These lookups must be configured through the Cluster Wide Dynamic Configuration described at the end of the document. Proper functionality of Namespaced lookups requires the following extension to be loaded on the broker, peon, and historical nodes: `druid-namespace-lookup` @@ -118,6 +84,8 @@ The `pollPeriod` value specifies the period in ISO 8601 format between checks fo The `namespaceParseSpec` can be one of a number of values. Each of the examples below would rename foo to bar, baz to bat, and buck to truck. All parseSpec types assumes each input is delimited by a new line. See below for the types of parseSpec supported. +Only ONE file which matches the search will be used. For most implementations, the discriminator for choosing the URIs is by whichever one reports the most recent timestamp for its modification time. + ### csv lookupParseSpec |Parameter|Description|Required|Default| From 2f02446f0b142016b9acfc0769b3bb7d0f66eee9 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 21 Apr 2016 15:20:19 -0700 Subject: [PATCH 3/5] Rename error message --- .../namespace/cache/OnHeapNamespaceExtractionCacheManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java b/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java index a09560871a79..2d55f6489835 100644 --- a/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java +++ b/extensions-core/namespace-lookup/src/main/java/io/druid/server/namespace/cache/OnHeapNamespaceExtractionCacheManager.java @@ -65,7 +65,7 @@ protected boolean swapAndClearCache(String namespaceKey, String cacheKey) try { ConcurrentMap cacheMap = mapMap.get(cacheKey); if (cacheMap == null) { - throw new IAE("Namespace [%s] does not exist", cacheKey); + throw new IAE("Extraction Cache [%s] does not exist", cacheKey); } dataSize.addAndGet(cacheMap.size()); ConcurrentMap prior = mapMap.put(namespaceKey, cacheMap); From 3ed2cf284037fd44a88a541b1c8069a86c7dd453 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 21 Apr 2016 15:36:07 -0700 Subject: [PATCH 4/5] Undo overzealous deletion of docs --- .../extensions-core/namespaced-lookup.md | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/docs/content/development/extensions-core/namespaced-lookup.md b/docs/content/development/extensions-core/namespaced-lookup.md index bdddf5c100a1..480481498bb5 100644 --- a/docs/content/development/extensions-core/namespaced-lookup.md +++ b/docs/content/development/extensions-core/namespaced-lookup.md @@ -18,7 +18,41 @@ Make sure to [include](../../operations/including-extensions.html) `druid-namesp Namespaced lookups are appropriate for lookups which are not possible to pass at query time due to their size, or are not desired to be passed at query time because the data is to reside in and be handled by the Druid servers. -These lookups must be configured through the Cluster Wide Dynamic Configuration described at the end of the document. +Namespaced lookups can be specified as part of the runtime properties file. The property is a list of the namespaces +described as per the sections on this page. For example: + + ```json + druid.query.extraction.namespace.lookups= + [ + { + "type": "uri", + "namespace": "some_uri_lookup", + "uri": "file:/tmp/prefix/", + "namespaceParseSpec": { + "format": "csv", + "columns": [ + "key", + "value" + ] + }, + "pollPeriod": "PT5M" + }, + { + "type": "jdbc", + "namespace": "some_jdbc_lookup", + "connectorConfig": { + "createTables": true, + "connectURI": "jdbc:mysql:\/\/localhost:3306\/druid", + "user": "druid", + "password": "diurd" + }, + "table": "lookupTable", + "keyColumn": "mykeyColumn", + "valueColumn": "MyValueColumn", + "tsColumn": "timeColumn" + } + ] + ``` Proper functionality of Namespaced lookups requires the following extension to be loaded on the broker, peon, and historical nodes: `druid-namespace-lookup` From 47a4148deba663a3fa3e1caa2adf9b797b56690c Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Mon, 2 May 2016 11:39:43 -0700 Subject: [PATCH 5/5] Explain caching mechanism a bit more in docs --- docs/content/development/extensions-core/namespaced-lookup.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/development/extensions-core/namespaced-lookup.md b/docs/content/development/extensions-core/namespaced-lookup.md index 480481498bb5..c91e123181aa 100644 --- a/docs/content/development/extensions-core/namespaced-lookup.md +++ b/docs/content/development/extensions-core/namespaced-lookup.md @@ -114,7 +114,7 @@ The remapping values for each namespaced lookup can be specified by a json objec One of either `uri` xor `uriPrefix` must be specified. -The `pollPeriod` value specifies the period in ISO 8601 format between checks for updates. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to update. Whenever an update occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data. +The `pollPeriod` value specifies the period in ISO 8601 format between checks for replacement data for the lookup. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to look for new data later. Whenever an poll occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data set, replacing the local cache of the lookup data. The `namespaceParseSpec` can be one of a number of values. Each of the examples below would rename foo to bar, baz to bat, and buck to truck. All parseSpec types assumes each input is delimited by a new line. See below for the types of parseSpec supported.