Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.data;

import javax.annotation.Nullable;
import java.util.regex.Pattern;

/**
Expand All @@ -41,7 +42,7 @@ public interface SearchableVersionedDataFinder<DataDescriptor>
*
* @return A DataDescriptor which matches pattern, is a child of descriptorBase, and is of the most recent "version" at some point during the method execution.
*/
DataDescriptor getLatestVersion(DataDescriptor descriptorBase, final Pattern pattern);
DataDescriptor getLatestVersion(DataDescriptor descriptorBase, @Nullable final Pattern pattern);

/**
* @return The class of the descriptor for the data
Expand Down
28 changes: 22 additions & 6 deletions docs/content/development/extensions-core/namespaced-lookup.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,35 +75,51 @@ For additional lookups, please see our [extensions list](../development/extensio

## URI namespace update

The remapping values for each namespaced lookup can be specified by json as per
The remapping values for each namespaced lookup can be specified by a json object as per the following examples:

```json
{
"type":"uri",
"namespace":"some_lookup",
"uri": "s3://bucket/some/key/prefix/",
"uri": "s3://bucket/some/key/prefix/renames-0003.gz",
"namespaceParseSpec":{
"format":"csv",
"columns":["key","value"]
},
"pollPeriod":"PT5M",
"versionRegex": "renames-[0-9]*\\.gz"
}
```

```json
{
"type":"uri",
"namespace":"some_lookup",
"uriPrefix": "s3://bucket/some/key/prefix/",
"fileRegex":"renames-[0-9]*\\.gz",
"namespaceParseSpec":{
"format":"csv",
"columns":["key","value"]
},
"pollPeriod":"PT5M",
}
```
|Property|Description|Required|Default|
|--------|-----------|--------|-------|
|`namespace`|The namespace to define|Yes||
|`pollPeriod`|Period between polling for updates|No|0 (only once)|
|`versionRegex`|Regex to help find newer versions of the namespace data|Yes||
|`uri`|URI for the file of interest|No|Use `uriPrefix`|
|`uriPrefix`|A URI which specifies a directory (or other searchable resource) in which to search for files|No|Use `uri`|
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this read the most recent file under a prefix ? imagine i have an HDFS dir where prefix is /year/day/hour/lookupDir/

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's part of io.druid.storage.hdfs.HdfsFileTimestampVersionFinder which is used to discriminate among multiple files matching the pattern in uriPrefix

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we document this ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some documentation

|`fileRegex`|Optional regex for matching the file name under `uriPrefix`. Only used if `uriPrefix` is used|No|`".*"`|
|`namespaceParseSpec`|How to interpret the data at the URI|Yes||
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call it lookupParseSpec ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope here


The `pollPeriod` value specifies the period in ISO 8601 format between checks for updates. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to update. Whenever an update occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data.
One of either `uri` xor `uriPrefix` must be specified.

The `versionRegex` value specifies a regex to use to determine if a filename in the parent path of the uri should be considered when trying to find the latest version. Omitting this setting or setting it equal to `null` will match to all files it can find (equivalent to using `".*"`). The search occurs in the most significant "directory" of the uri.
The `pollPeriod` value specifies the period in ISO 8601 format between checks for replacement data for the lookup. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of `pollPeriod`. A value of 0, an absent parameter, or `null` all mean populate once and do not attempt to look for new data later. Whenever an poll occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data set, replacing the local cache of the lookup data.

The `namespaceParseSpec` can be one of a number of values. Each of the examples below would rename foo to bar, baz to bat, and buck to truck. All parseSpec types assumes each input is delimited by a new line. See below for the types of parseSpec supported.

Only ONE file which matches the search will be used. For most implementations, the discriminator for choosing the URIs is by whichever one reports the most recent timestamp for its modification time.

### csv lookupParseSpec

|Parameter|Description|Required|Default|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -81,7 +82,7 @@ public boolean accept(Path path)
* @return The URI of the file with the most recent modified timestamp.
*/
@Override
public URI getLatestVersion(final URI uri, final Pattern pattern)
public URI getLatestVersion(final URI uri, final @Nullable Pattern pattern)
{
final Path path = new Path(uri);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,40 +64,58 @@ public class URIExtractionNamespace implements ExtractionNamespace
@JsonProperty
private final URI uri;
@JsonProperty
private final URI uriPrefix;
@JsonProperty
private final FlatDataParser namespaceParseSpec;
@JsonProperty
private final Period pollPeriod;
private final String fileRegex;
@JsonProperty
private final String versionRegex;
private final Period pollPeriod;

@JsonCreator
public URIExtractionNamespace(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename it to lookup ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather leave it like it is in this PR. in the bigger migration PR I can break more things.

@NotNull @JsonProperty(value = "namespace", required = true)
String namespace,
@NotNull @JsonProperty(value = "uri", required = true)
URI uri,
String namespace,
@JsonProperty(value = "uri", required = false)
URI uri,
@JsonProperty(value = "uriPrefix", required = false)
URI uriPrefix,
@JsonProperty(value = "fileRegex", required = false)
String fileRegex,
@JsonProperty(value = "namespaceParseSpec", required = true)
FlatDataParser namespaceParseSpec,
FlatDataParser namespaceParseSpec,
@Min(0) @Nullable @JsonProperty(value = "pollPeriod", required = false)
Period pollPeriod,
Period pollPeriod,
@Deprecated
@JsonProperty(value = "versionRegex", required = false)
String versionRegex
String versionRegex
)
{
if (versionRegex != null) {
this.namespace = Preconditions.checkNotNull(namespace, "namespace");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this PR, yes. It should go away in the other PR where this is swallowed into LookupFactory stuff

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not sure why we need this ? and if so what is the next PR ? why not doing all in one PR ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b-slim small changes. Getting things done in small PRs is a lot easier than one big PR. this is a unique issue compared to the other PR and having this code muddled in the larger PR makes it horrible to manage

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the PR where it should change: #2716

I'll make sure the change makes it through before release

this.uri = uri;
this.uriPrefix = uriPrefix;
if ((uri != null) == (uriPrefix != null)) {
throw new IAE("Either uri xor uriPrefix required");
}
this.namespaceParseSpec = Preconditions.checkNotNull(namespaceParseSpec, "namespaceParseSpec");
this.pollPeriod = pollPeriod == null ? Period.ZERO : pollPeriod;
this.fileRegex = fileRegex == null ? versionRegex : fileRegex;
if (fileRegex != null && versionRegex != null) {
throw new IAE("Cannot specify both versionRegex and fileRegex. versionRegex is deprecated");
}

if (uri != null && this.fileRegex != null) {
throw new IAE("Cannot define both uri and fileRegex");
}

if (this.fileRegex != null) {
try {
Pattern.compile(versionRegex);
Pattern.compile(this.fileRegex);
}
catch (PatternSyntaxException ex) {
throw new IAE(ex, "Could not parse `versionRegex` [%s]", versionRegex);
throw new IAE(ex, "Could not parse `fileRegex` [%s]", this.fileRegex);
}
}
this.namespace = Preconditions.checkNotNull(namespace, "namespace");
this.uri = Preconditions.checkNotNull(uri, "uri");
this.namespaceParseSpec = Preconditions.checkNotNull(namespaceParseSpec, "namespaceParseSpec");
this.pollPeriod = pollPeriod == null ? Period.ZERO : pollPeriod;

this.versionRegex = versionRegex;
}

@Override
Expand All @@ -106,9 +124,9 @@ public String getNamespace()
return namespace;
}

public String getVersionRegex()
public String getFileRegex()
{
return versionRegex;
return fileRegex;
}

public FlatDataParser getNamespaceParseSpec()
Expand All @@ -121,6 +139,11 @@ public URI getUri()
return uri;
}

public URI getUriPrefix()
{
return uriPrefix;
}

@Override
public long getPollMs()
{
Expand All @@ -130,17 +153,16 @@ public long getPollMs()
@Override
public String toString()
{
return String.format(
"URIExtractionNamespace = { namespace = %s, uri = %s, namespaceParseSpec = %s, pollPeriod = %s, versionRegex = %s }",
namespace,
uri.toString(),
namespaceParseSpec.toString(),
pollPeriod.toString(),
versionRegex
);
return "URIExtractionNamespace{" +
"namespace='" + namespace + '\'' +
", uri=" + uri +
", uriPrefix=" + uriPrefix +
", namespaceParseSpec=" + namespaceParseSpec +
", fileRegex='" + fileRegex + '\'' +
", pollPeriod=" + pollPeriod +
'}';
}


@Override
public boolean equals(Object o)
{
Expand All @@ -151,22 +173,39 @@ public boolean equals(Object o)
return false;
}

URIExtractionNamespace namespace1 = (URIExtractionNamespace) o;
return toString().equals(namespace1.toString());
URIExtractionNamespace that = (URIExtractionNamespace) o;

if (!getNamespace().equals(that.getNamespace())) {
return false;
}
if (getUri() != null ? !getUri().equals(that.getUri()) : that.getUri() != null) {
return false;
}
if (getUriPrefix() != null ? !getUriPrefix().equals(that.getUriPrefix()) : that.getUriPrefix() != null) {
return false;
}
if (!getNamespaceParseSpec().equals(that.getNamespaceParseSpec())) {
return false;
}
if (getFileRegex() != null ? !getFileRegex().equals(that.getFileRegex()) : that.getFileRegex() != null) {
return false;
}
return pollPeriod.equals(that.pollPeriod);

}

@Override
public int hashCode()
{
int result = namespace.hashCode();
result = 31 * result + uri.hashCode();
result = 31 * result + namespaceParseSpec.hashCode();
int result = getNamespace().hashCode();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep using IntellJ auto Gen ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did.

result = 31 * result + (getUri() != null ? getUri().hashCode() : 0);
result = 31 * result + (getUriPrefix() != null ? getUriPrefix().hashCode() : 0);
result = 31 * result + getNamespaceParseSpec().hashCode();
result = 31 * result + (getFileRegex() != null ? getFileRegex().hashCode() : 0);
result = 31 * result + pollPeriod.hashCode();
result = 31 * result + (versionRegex != null ? versionRegex.hashCode() : 0);
return result;
}


private static class DelegateParser implements Parser<String, String>
{
private final Parser<String, Object> delegate;
Expand Down Expand Up @@ -268,7 +307,11 @@ public CSVFlatDataParser(
Arrays.toString(columns.toArray())
);

this.parser = new DelegateParser(new CSVParser(Optional.<String>absent(), columns), this.keyColumn, this.valueColumn);
this.parser = new DelegateParser(
new CSVParser(Optional.<String>absent(), columns),
this.keyColumn,
this.valueColumn
);
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -120,7 +122,8 @@ public Callable<String> getCachePopulator(
@Override
public String call()
{
final URI originalUri = extractionNamespace.getUri();
final boolean doSearch = extractionNamespace.getUriPrefix() != null;
final URI originalUri = doSearch ? extractionNamespace.getUriPrefix() : extractionNamespace.getUri();
final SearchableVersionedDataFinder<URI> pullerRaw = pullers.get(originalUri.getScheme());
if (pullerRaw == null) {
throw new IAE(
Expand All @@ -132,15 +135,29 @@ public String call()
if (!(pullerRaw instanceof URIDataPuller)) {
throw new IAE(
"Cannot load data from location [%s]. Data pulling from [%s] not supported",
originalUri.toString(),
originalUri,
originalUri.getScheme()
);
}
final URIDataPuller puller = (URIDataPuller) pullerRaw;
final String versionRegex = extractionNamespace.getVersionRegex();
final Pattern versionRegex;
final URI uriBase;
if (doSearch) {
uriBase = extractionNamespace.getUriPrefix();

if (extractionNamespace.getFileRegex() != null) {
versionRegex = Pattern.compile(extractionNamespace.getFileRegex());
} else {
versionRegex = null;
}
} else {
final Path filePath = Paths.get(extractionNamespace.getUri());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With an s3 lookup, this Paths.get call throws this exception for me,

java.nio.file.FileSystemNotFoundException: Provider "s3" not installed
        at java.nio.file.Paths.get(Paths.java:147) ~[?:1.8.0_66]
        at io.druid.server.namespace.URIExtractionNamespaceFunctionFactory$3.call(URIExtractionNamespaceFunctionFactory.java:154) ~[druid-namespace-lookup-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
        at io.druid.server.namespace.URIExtractionNamespaceFunctionFactory$3.call(URIExtractionNamespaceFunctionFactory.java:121) ~[druid-namespace-lookup-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
        at io.druid.server.namespace.cache.NamespaceExtractionCacheManager$5.run(NamespaceExtractionCacheManager.java:364) [druid-namespace-lookup-0.9.1-SNAPSHOT.jar:0.9.1-SNAPSHOT]
        at com.google.common.util.concurrent.MoreExecutors$ScheduledListeningDecorator$NeverSuccessfulListenableFutureTask.run(MoreExecutors.java:582) [guava-16.0.1.jar:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_66]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_66]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_66]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_66]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_66]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_66]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]

versionRegex = Pattern.compile(Pattern.quote(filePath.getFileName().toString()));
uriBase = filePath.getParent().toUri();
}
final URI uri = pullerRaw.getLatestVersion(
originalUri,
versionRegex == null ? null : Pattern.compile(versionRegex)
uriBase,
versionRegex
);
if (uri == null) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ protected boolean swapAndClearCache(String namespaceKey, String cacheKey)

final String priorCache = currentNamespaceCache.put(namespaceKey, swapCacheKey);
if (priorCache != null) {
// TODO: resolve what happens here if query is actively going on
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please file a github issue for this if there is none already.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added #2863

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drcrallen @nishantmonu51 you could use a reference counting mechanism similar to segments to avoid closing the resources in query is in flight.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, yes. Unless you have some sort of insight, though, this is not straight forward. The issue is that queries do not have a concept of resources associated with them. There are some runners which wrap resources, but there isn't a general concept of query resources that need to be accounted for.

There are quite a few resources that get tied to a particular query though, so it might be worthwhile to add such a convention.

mmapDB.delete(priorCache);
dataSize.set(tmpFile.length());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.util.concurrent.Striped;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.metamx.common.IAE;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.query.extraction.namespace.ExtractionNamespace;
Expand Down Expand Up @@ -53,7 +54,7 @@ public OnHeapNamespaceExtractionCacheManager(
final Map<Class<? extends ExtractionNamespace>, ExtractionNamespaceFunctionFactory<?>> namespaceFunctionFactoryMap
)
{
super(lifecycle, fnCache, reverseFnCache,emitter, namespaceFunctionFactoryMap);
super(lifecycle, fnCache, reverseFnCache, emitter, namespaceFunctionFactoryMap);
}

@Override
Expand All @@ -64,14 +65,14 @@ protected boolean swapAndClearCache(String namespaceKey, String cacheKey)
try {
ConcurrentMap<String, String> cacheMap = mapMap.get(cacheKey);
if (cacheMap == null) {
// Sometimes cache will not be populated (for example: if it doesn't contain new data)
return false;
throw new IAE("Extraction Cache [%s] does not exist", cacheKey);
}
dataSize.addAndGet(cacheMap.size());
ConcurrentMap<String, String> prior = mapMap.put(namespaceKey, cacheMap);
mapMap.remove(cacheKey);
if (prior != null) {
dataSize.addAndGet(-prior.size());
// Old map will get GC'd when it is not used anymore
return true;
} else {
return false;
Expand All @@ -87,7 +88,7 @@ public ConcurrentMap<String, String> getCacheMap(String namespaceOrCacheKey)
{
ConcurrentMap<String, String> map = mapMap.get(namespaceOrCacheKey);
if (map == null) {
mapMap.putIfAbsent(namespaceOrCacheKey, new ConcurrentHashMap<String, String>(32));
mapMap.putIfAbsent(namespaceOrCacheKey, new ConcurrentHashMap<String, String>());
map = mapMap.get(namespaceOrCacheKey);
}
return map;
Expand Down
Loading