Skip to content

[QTL] Move kafka-extraction-namespace to the Lookup framework.#2800

Merged
drcrallen merged 12 commits intoapache:masterfrom
metamx:kafkaLookupOnly
May 2, 2016
Merged

[QTL] Move kafka-extraction-namespace to the Lookup framework.#2800
drcrallen merged 12 commits intoapache:masterfrom
metamx:kafkaLookupOnly

Conversation

@drcrallen
Copy link
Copy Markdown
Contributor

@drcrallen drcrallen commented Apr 7, 2016

The major changes to the PR are as follows:

The kafka extraction namespace stuff now works off the lookup extractor factory framework. It still uses the caching mechanisms in the main lookup extension, but uses some random UUID in the cache instead of trying to keep some sort of relationship to the lookup name (which is enforced by the Lookup framework in core).

This means the kafka-extraction-namespace must go through the main lookup framework.

@drcrallen
Copy link
Copy Markdown
Contributor Author

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 4.575 sec <<< FAILURE! - in io.druid.query.lookup.TestKafkaExtractionCluster
testSimpleRename(io.druid.query.lookup.TestKafkaExtractionCluster)  Time elapsed: 4.57 sec  <<< ERROR!
com.fasterxml.jackson.databind.JsonMappingException: Could not resolve type id 'kafka' into a subtype of [simple type, class io.druid.query.lookup.LookupExtractorFactory]
 at [Source: {"type":"kafka","kafkaTopic":"testTopic","kafkaProperties":{"request.required.acks":"1","zookeeper.connect":"127.0.0.1:54849/kafka","zookeeper.sync.time.ms":"200","zookeeper.session.timeout.ms":"10000"}}; line: 1, column: 2]
    at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
    at com.fasterxml.jackson.databind.DeserializationContext.unknownTypeException(DeserializationContext.java:862)
    at com.fasterxml.jackson.databind.jsontype.impl.TypeDeserializerBase._findDeserializer(TypeDeserializerBase.java:167)
    at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:99)
    at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:84)
    at com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:132)
    at com.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:41)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3066)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2161)
    at io.druid.query.lookup.TestKafkaExtractionCluster.setUp(TestKafkaExtractionCluster.java:266)

actual problem, fixing


private final Object startStopLock = new Object();
private final ListeningExecutorService executorService;
private final AtomicLong doubleEventCount = new AtomicLong(0L);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we maintain doubleEventCount instead of eventCount ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to have a simple way to minimize race conditions and locking. I could do a read/write lock if this isn't good enough.

Basically, before and after the critical section the count is increased (as opposed to just doing before or just doing after).

The crux revolves around "What was the state of the map that produced this result" for computing cache key. Which, for a continuously mutable map is a little tricky.

@nishantmonu51
Copy link
Copy Markdown
Member

could you summarize all the changes in this PR in the description ?

|`druid.query.rename.kafka.properties`|A json map of kafka consumer properties. See below for special properties.|See below|

The following are the handling for kafka consumer properties in `druid.query.rename.kafka.properties`
The consumer properties `group.id` and `auto.offset.reset` CANNOT be set in `kafkaProperties` as they are set by the extension as `UUID.randomUUID().toString()` and `smallest` respectively.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am curious what is the implication of this constraint ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto.offeset.reset as smallest means "read all the data available in the topic" otherwise two different servers could replay different changelogs.

group.id means every instance is a unique consumer, so they should be accounted for as different consumers.

@drcrallen
Copy link
Copy Markdown
Contributor Author

Merge problems from introspection PR, fixing

@drcrallen
Copy link
Copy Markdown
Contributor Author

@b-slim I'm trying to hammer out some tests to prevent racy-ness, but otherwise this should be done.

private final String factoryId = UUID.randomUUID().toString();
private final AtomicReference<Map<String, String>> mapRef = new AtomicReference<>(null);

private AtomicBoolean started = new AtomicBoolean(false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use this for startStop lock as well ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is possible yes, changed

@b-slim
Copy link
Copy Markdown
Contributor

b-slim commented Apr 28, 2016

@drcrallen can you explain how the caching mechanism works ?
I guess First the user need to know how caching works and where the cache is leaving.
Plus i am not sure how eviction is suppose to work here ? this impl is going to fill the cache with all the entry thought.

@drcrallen
Copy link
Copy Markdown
Contributor Author

@b-slim this impl fills the cache with all entries without eviction.

@drcrallen
Copy link
Copy Markdown
Contributor Author

@b-slim this does not change caching mechanism from prior impl I don't understand why caching would be a blocker for this PR

@b-slim
Copy link
Copy Markdown
Contributor

b-slim commented Apr 28, 2016

@drcrallen i know that, my comments it to help the user understand how thing works by adding it to the docs. IMHO it is a serious limitation to mention.

@b-slim
Copy link
Copy Markdown
Contributor

b-slim commented Apr 28, 2016

@drcrallen i am not blocking it, just update the DOCs to reflect the limitation.

@b-slim
Copy link
Copy Markdown
Contributor

b-slim commented Apr 28, 2016

👍 after Docs and squash.

@drcrallen
Copy link
Copy Markdown
Contributor Author

Will add docs very shortly

@drcrallen
Copy link
Copy Markdown
Contributor Author

@b-slim added

# Limitations

Currently the Kafka lookup extractor feeds the entire kafka stream into a local cache. If you are using OnHeap caching, this can easily clobber your java heap if the kafka stream spews a lot of unique keys.
OffHeap caching should alleviate these concerns, but there is still a limit to the quantity of data that can be stored.
There is currently no eviction policy.

@drcrallen
Copy link
Copy Markdown
Contributor Author

https://travis-ci.org/druid-io/druid/jobs/126506259 died in


-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running io.druid.indexing.kafka.KafkaDataSourceMetadataTest
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.022 sec - in io.druid.indexing.kafka.KafkaDataSourceMetadataTest
Running io.druid.indexing.kafka.KafkaIndexTaskTest

@drcrallen drcrallen closed this Apr 28, 2016
@drcrallen drcrallen reopened this Apr 28, 2016
@nishantmonu51
Copy link
Copy Markdown
Member

👍 , LGTM

@drcrallen drcrallen merged commit 54b717b into apache:master May 2, 2016
@nishantmonu51 nishantmonu51 deleted the kafkaLookupOnly branch May 2, 2016 16:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants