Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ See [lookups](../../querying/lookups.html) for how to configure and use lookups.
Currently the Kafka lookup extractor feeds the entire kafka stream into a local cache. If you are using OnHeap caching, this can easily clobber your java heap if the kafka stream spews a lot of unique keys.
OffHeap caching should alleviate these concerns, but there is still a limit to the quantity of data that can be stored.
There is currently no eviction policy.
Even though global cached lookup can support multiple maps in one lookup namespace, Kafka lookup extractor currently provides only one map for each lookup, the default map.

## Testing the Kafka rename functionality

Expand Down
179 changes: 144 additions & 35 deletions docs/content/development/extensions-core/lookups-cached-global.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,21 @@ Globally cached lookups can be specified as part of the [cluster wide config for
"format": "csv",
"columns": [
"key",
"value"
]
"val1",
"val2"
],
"maps":[
{
"mapName":"__default",
"keyColumn":"key",
"valueColumn":"val1"
},
{
"mapName":"another",
"keyColumn":"key",
"valueColumn":"val2"
}
]
},
"pollPeriod": "PT5M"
},
Expand All @@ -55,8 +68,18 @@ Globally cached lookups can be specified as part of the [cluster wide config for
"password": "diurd"
},
"table": "lookupTable",
"keyColumn": "mykeyColumn",
"valueColumn": "MyValueColumn",
"maps":[
{
"mapName":"__default",
"keyColumn":"lookupKey",
"valueColumn":"val1"
},
{
"mapName":"another",
"keyColumn":"lookupKey2",
"valueColumn":"val2"
}
],
"tsColumn": "timeColumn"
},
"firstCacheTimeout": 120000,
Expand Down Expand Up @@ -95,8 +118,18 @@ In a simple case where only one [tier](../../querying/lookups.html#dynamic-confi
"password": "diurd"
},
"table": "lookupTable",
"keyColumn": "country_id",
"valueColumn": "country_name",
"maps":[
{
"mapName":"name",
"keyColumn":"country_id",
"valueColumn":"coutry_name"
},
{
"mapName":"capital",
"keyColumn":"country_id",
"valueColumn":"capital_city"
}
],
"tsColumn": "timeColumn"
},
"firstCacheTimeout": 120000,
Expand All @@ -120,8 +153,18 @@ Where the coordinator endpoint `/druid/coordinator/v1/lookups/realtime_customer2
"password": "diurd"
},
"table": "lookupTable",
"keyColumn": "country_id",
"valueColumn": "country_name",
"maps":[
{
"mapName":"name",
"keyColumn":"country_id",
"valueColumn":"coutry_name"
},
{
"mapName":"capital",
"keyColumn":"country_id",
"valueColumn":"capital_city"
}
],
"tsColumn": "timeColumn"
},
"firstCacheTimeout": 120000,
Expand Down Expand Up @@ -159,7 +202,19 @@ The remapping values for each globally cached lookup can be specified by a json
"uri": "s3://bucket/some/key/prefix/renames-0003.gz",
"namespaceParseSpec":{
"format":"csv",
"columns":["key","value"]
"columns":["key","val1","val2"],
"maps":[
{
"mapName":"__default",
"keyColumn":"key",
"valueColumn":"val1"
},
{
"mapName":"another",
"keyColumn":"key",
"valueColumn":"val2"
}
]
},
"pollPeriod":"PT5M"
}
Expand All @@ -172,7 +227,19 @@ The remapping values for each globally cached lookup can be specified by a json
"fileRegex":"renames-[0-9]*\\.gz",
"namespaceParseSpec":{
"format":"csv",
"columns":["key","value"]
"columns":["key","val1","val2"],
"maps":[
{
"mapName":"__default",
"keyColumn":"key",
"valueColumn":"val1"
},
{
"mapName":"another",
"keyColumn":"key",
"valueColumn":"val2"
}
]
},
"pollPeriod":"PT5M"
}
Expand All @@ -199,8 +266,11 @@ Only ONE file which matches the search will be used. For most implementations, t
|Parameter|Description|Required|Default|
|---------|-----------|--------|-------|
|`columns`|The list of columns in the csv file|yes|`null`|
|`keyColumn`|The name of the column containing the key|no|The first column|
|`valueColumn`|The name of the column containing the value|no|The second column|
|`maps`|Map name and key/value columns or fields within data source used in constructing lookup maps|No|`{"mapName":"__default","keyColumn":"key","valueColumn":"value"}`|

`columns` should contain all the columns specified in `maps`.
`maps` specifies lookup maps under one lookup name. One lookup can have multiple maps that share the source of lookup. It is a list of map specs called `keyValueMap` that has three entries, `mapName`, `keyColumn`, and `valueColumn`. `mapName` is the name of map inside the lookup and `keyColumn` is key column or field name within the lookup source and `valueColumn` is value column or field name within the lookup source.
At lookup, user can specify which map they use or default map `__default` is used if they do not specify explicitly.

*example input*

Expand All @@ -215,9 +285,19 @@ truck,something3,buck
```json
"namespaceParseSpec": {
"format": "csv",
"columns": ["value","somethingElse","key"],
"keyColumn": "key",
"valueColumn": "value"
"columns": ["val1","val2","key"],
"maps":[
{
"mapName":"__default",
"keyColumn":"key",
"valueColumn":"val1"
},
{
"mapName":"another",
"keyColumn":"key",
"valueColumn":"val2"
}
]
}
```

Expand All @@ -226,11 +306,11 @@ truck,something3,buck
|Parameter|Description|Required|Default|
|---------|-----------|--------|-------|
|`columns`|The list of columns in the csv file|yes|`null`|
|`keyColumn`|The name of the column containing the key|no|The first column|
|`valueColumn`|The name of the column containing the value|no|The second column|
|`delimiter`|The delimiter in the file|no|tab (`\t`)|
|`listDelimiter`|The list delimiter in the file|no| (`\u0001`)|
|`maps`|Map name and key/value columns or fields within data source used in constructing lookup maps|No|`{"mapName":"__default","keyColumn":"key","valueColumn":"value"}`|

`columns` should contain all the columns specified in `maps`.

*example input*

Expand All @@ -245,43 +325,63 @@ truck|something,3|buck
```json
"namespaceParseSpec": {
"format": "tsv",
"columns": ["value","somethingElse","key"],
"keyColumn": "key",
"valueColumn": "value",
"delimiter": "|"
"columns": ["val1","val2","key"],
"delimiter": "|",
"maps":[
{
"mapName":"__default",
"keyColumn":"key",
"valueColumn":"val1"
},
{
"mapName":"another",
"keyColumn":"key",
"valueColumn":"val2"
}
]
}
```

### customJson lookupParseSpec

|Parameter|Description|Required|Default|
|---------|-----------|--------|-------|
|`keyFieldName`|The field name of the key|yes|null|
|`valueFieldName`|The field name of the value|yes|null|
|`maps`|Map name and key/value columns or fields within data source used in constructing lookup maps|No|`{"mapName":"__default","keyColumn":"key","valueColumn":"value"}`|

Inputs should have all the fields used in `maps`.

*example input*

```json
{"key": "foo", "value": "bar", "somethingElse" : "something"}
{"key": "baz", "value": "bat", "somethingElse" : "something"}
{"key": "buck", "somethingElse": "something", "value": "truck"}
{"key": "foo", "val1": "bar", "val2" : "something"}
{"key": "baz", "val1": "bat", "val2" : "something"}
{"key": "buck", "val2": "value2", "val1": "truck"}
```

*example namespaceParseSpec*

```json
"namespaceParseSpec": {
"format": "customJson",
"keyFieldName": "key",
"valueFieldName": "value"
"maps":[
{
"mapName":"__default",
"keyColumn":"key",
"valueColumn":"val1"
},
{
"mapName":"another",
"keyColumn":"key",
"valueColumn":"val2"
}
]
}
```

With customJson parsing, if the value field for a particular row is missing or null then that line will be skipped, and
will not be included in the lookup.

### simpleJson lookupParseSpec
The `simpleJson` lookupParseSpec does not take any parameters. It is simply a line delimited json file where the field is the key, and the field's value is the value.
The `simpleJson` lookupParseSpec does not take any parameters. It is simply a line delimited json file where the field is the key, and the field's value is the value. `maps` is ignored and input data is inserted to default map.

*example input*

Expand All @@ -308,8 +408,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
|`namespace`|The namespace to define|Yes||
|`connectorConfig`|The connector config to use|Yes||
|`table`|The table which contains the key value pairs|Yes||
|`keyColumn`|The column in `table` which contains the keys|Yes||
|`valueColumn`|The column in `table` which contains the values|Yes||
|`maps`|Map name and key/value columns of `table` used in constructing lookup maps|No|`{"mapName":"__default","keyColumn":"key","valueColumn":"value"}`|
|`tsColumn`| The column in `table` which contains when the key was updated|No|Not used|
|`pollPeriod`|How often to poll the DB|No|0 (only once)|

Expand All @@ -324,13 +423,23 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
"password":"diurd"
},
"table":"some_lookup_table",
"keyColumn":"the_old_dim_value",
"valueColumn":"the_new_dim_value",
"maps":[
{
"mapName":"__default",
"keyColumn":"lookupKey",
"valueColumn":"val1"
},
{
"mapName":"another",
"keyColumn":"lookupKey2",
"valueColumn":"val2"
}
],
"tsColumn":"timestamp_column",
"pollPeriod":600000
}
```

# Introspection

Globally cached lookups have introspection points at `/keys` and `/values` which return a complete set of the keys and values (respectively) in the lookup. Introspection to `/` returns the entire map. Introspection to `/version` returns the version indicator for the lookup.
Globally cached lookups have introspection points at `/keys` and `/values` which return a complete set of the map names and key/value maps(respectively) in the lookup. Introspection to `/` returns the entire map. Introspection to `/version` returns the version indicator for the lookup.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I wonder about backwards compatibility here. Will take a closer look at the actual http code.

33 changes: 13 additions & 20 deletions docs/content/querying/dimensionspecs.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,26 +232,7 @@ Explicit lookups allow you to specify a set of keys and values to use when perfo
}
```

```json
{
"type":"lookup",
"lookup":{"type":"namespace","namespace":"some_lookup"},
"replaceMissingValueWith":"Unknown",
"injective":false
}
```

```json
{
"type":"lookup",
"lookup":{"type":"namespace","namespace":"some_lookup"},
"retainMissingValue":true,
"injective":false
}
```

A lookup can be of type `namespace` or `map`. A `map` lookup is passed as part of the query.
A `namespace` lookup is populated on all the nodes which handle queries as per [lookups](../querying/lookups.html)
A lookup has only one type `map`. A `map` lookup is passed as part of the query.

A property of `retainMissingValue` and `replaceMissingValueWith` can be specified at query time to hint how to handle missing values. Setting `replaceMissingValueWith` to `""` has the same effect as setting it to `null` or omitting the property. Setting `retainMissingValue` to true will use the dimension's original value if it is not found in the lookup. The default values are `replaceMissingValueWith = null` and `retainMissingValue = false` which causes missing values to be treated as missing.

Expand Down Expand Up @@ -479,3 +460,15 @@ The second kind where it is not possible to pass at query time due to their size
"name":"lookupName"
}
```

User can specify the `mapName` inside the given lookup as [Globally Cached Lookups](../development/extensions-core/lookups-cached-global.html) can have multiple maps in each lookup.

```json
{
"type":"lookup",
"dimension":"dimensionName",
"outputName":"dimensionOutputName",
"name":"lookupName",
"mapName":"map1"
}
```
14 changes: 12 additions & 2 deletions docs/content/querying/lookups.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,18 @@ So a config might look something like:
"password": "diurd"
},
"table": "lookupTable",
"keyColumn": "country_id",
"valueColumn": "country_name",
"keyValueMaps":[
{
"mapName":"name",
"keyName":"country_id",
"valueName":"coutry_name"
},
{
"mapName":"capital",
"keyName":"country_id",
"valueName":"capital_city"
}
],
"tsColumn": "timeColumn"
},
"firstCacheTimeout": 120000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.query.lookup.namespace.KeyValueMap;
import io.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
Expand All @@ -64,7 +65,7 @@
import java.util.regex.Pattern;

@JsonTypeName("kafka")
public class KafkaLookupExtractorFactory implements LookupExtractorFactory
public class KafkaLookupExtractorFactory extends LookupExtractorFactory
{
private static final Logger LOG = new Logger(KafkaLookupExtractorFactory.class);
static final Decoder<String> DEFAULT_STRING_DECODER = new Decoder<String>()
Expand Down Expand Up @@ -182,7 +183,7 @@ public boolean start()
kafkaProperties.setProperty("group.id", factoryId);
final String topic = getKafkaTopic();
LOG.debug("About to listen to topic [%s] with group.id [%s]", topic, factoryId);
final Map<String, String> map = cacheManager.getCacheMap(factoryId);
final Map<String, String> map = cacheManager.getInnerCacheMap(factoryId, KeyValueMap.DEFAULT_MAPNAME);
mapRef.set(map);
// Enable publish-subscribe
kafkaProperties.setProperty("auto.offset.reset", "smallest");
Expand Down
Loading