Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,42 @@
layout: doc_page
---

# Kafka Namespaced Lookup
# Kafka Lookups

<div class="note caution">
Lookups are an <a href="../experimental.html">experimental</a> feature.
</div>

Make sure to [include](../../operations/including-extensions.html) `druid-namespace-lookup` and `druid-kafka-extraction-namespace` as an extension.

Note that this lookup does not employ a `pollPeriod`.

If you need updates to populate as promptly as possible, it is possible to plug into a kafka topic whose key is the old value and message is the desired new value (both in UTF-8).
If you need updates to populate as promptly as possible, it is possible to plug into a kafka topic whose key is the old value and message is the desired new value (both in UTF-8) as a LookupExtractorFactory.

```json
{
"type":"kafka",
"namespace":"testTopic",
"kafkaTopic":"testTopic"
"kafkaTopic":"testTopic",
"kafkaProperties":{"zookeeper.connect","somehost:2181/kafka"}
}
```

|Parameter|Description|Required|Default|
|---------|-----------|--------|-------|
|`namespace`|The namespace to define|Yes||
|`kafkaTopic`|The kafka topic to read the data from|Yes||

## Kafka renames
|`kafkaProperties`|Kafka consumer properties. At least"zookeeper.connect" must be specified. Only the zookeeper connector is supported|Yes||
|`connectTimeout`|How long to wait for an initial connection|No|`0` (do not wait)|
|`isOneToOne`|The map is a one-to-one (see[Lookup DimensionSpecs](../querying/dimensionspecs.html))|No|`false`|

The extension `kafka-extraction-namespace` enables reading from a kafka feed which has name/key pairs to allow renaming of dimension values. An example use case would be to rename an ID to a human readable format.

Currently the historical node caches the key/value pairs from the kafka feed in an ephemeral memory mapped DB via MapDB.

## Configuration

The following options are used to define the behavior and should be included wherever the extension is included (all query servicing nodes):
The consumer properties `group.id` and `auto.offset.reset` CANNOT be set in `kafkaProperties` as they are set by the extension as `UUID.randomUUID().toString()` and `smallest` respectively.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am curious what is the implication of this constraint ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto.offeset.reset as smallest means "read all the data available in the topic" otherwise two different servers could replay different changelogs.

group.id means every instance is a unique consumer, so they should be accounted for as different consumers.


|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.rename.kafka.properties`|A json map of kafka consumer properties. See below for special properties.|See below|
See [lookups](../../querying/lookups.html) for how to configure and use lookups.

The following are the handling for kafka consumer properties in `druid.query.rename.kafka.properties`
# Limitations

|Property|Description|Default|
|--------|-----------|-------|
|`zookeeper.connect`|Zookeeper connection string|`localhost:2181/kafka`|
|`group.id`|Group ID, auto-assigned for publish-subscribe model and cannot be overridden|`UUID.randomUUID().toString()`|
|`auto.offset.reset`|Setting to get the entire kafka rename stream. Cannot be overridden|`smallest`|
Currently the Kafka lookup extractor feeds the entire kafka stream into a local cache. If you are using OnHeap caching, this can easily clobber your java heap if the kafka stream spews a lot of unique keys.
OffHeap caching should alleviate these concerns, but there is still a limit to the quantity of data that can be stored.
There is currently no eviction policy.

## Testing the Kafka rename functionality

Expand Down
5 changes: 5 additions & 0 deletions extensions-core/kafka-extraction-namespace/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,10 @@
<version>3.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.lookup;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;

import java.util.List;

/**
*
*/
public class KafkaExtractionNamespaceModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.<Module>of(
new SimpleModule("kafka-lookups").registerSubtypes(
KafkaLookupExtractorFactory.class
)
);
}

@Override
public void configure(Binder binder)
{
// NOOP
}
}
Loading