Skip to content

KIP-28: First patch#130

Closed
guozhangwang wants to merge 317 commits intoapache:trunkfrom
confluentinc:streaming
Closed

KIP-28: First patch#130
guozhangwang wants to merge 317 commits intoapache:trunkfrom
confluentinc:streaming

Conversation

@guozhangwang
Copy link
Copy Markdown
Contributor

This work has been contributed by Jesse Anderson, Randall Hauch, Yasuhiro Matsuda and Guozhang Wang. The detailed design can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client.

@asfbot
Copy link
Copy Markdown

asfbot commented Aug 11, 2015

kafka-trunk-git-pr #122 SUCCESS
This pull request looks good

@lazyval
Copy link
Copy Markdown
Contributor

lazyval commented Aug 11, 2015

It's quite awkward to see commits like this

kafka 2015-08-11 17-06-37

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@lazyval Apologies for the commits history, I was fighting with git merge history back then from two branches and hence the commits was not well organized. I will create another PR with squashed commits after addressing the collected comments from this PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some documentation for some of these interfaces?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Aug 16, 2015

This is an excellent proposal, and after a quick pass this PR looks good. More detailed comments and questions to follow.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The run() and close() methods should not be synchronized. Because they are, then once run() is called it will block any other synchronized method, including close(), and because run() only completes when close() is called, run() will never complete. In other words, the thread will never stop.

You should be able to simply remove the synchronized keywords with the current code and maintain thread safety of the running volatile boolean field: the only method that reads that field is the private stillRunning() (called via private runLoop() which is called via public run()), while the only method that writes to the field is close().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Aug 18, 2015

Is there a way to make the sources and processors created by the topology aware of or dependent upon the supplied configuration? For example, let's say my job's configuration has a parameter that, if set one way, will result in a slight variation of the topology (e.g., an extra filtering processor between the source and my primary processor). Is that possible?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ProducerRecord class has a constructor that takes the partition number, yet that doesn't appear to be exposed in these two send(...) methods. Am I missing how to specify the partitioning logic for each of the sent messages?

UPDATE: Okay, it's pretty obvious you can set the partitioner.class property in the producer's configuration to the name of the Partitioner implementation class. Doing this makes the KafkaProducer pass the message key to the Partitioner to determine the partition number. Is this a best practice, or is it still logical for our Processor implementation to determine the partition, perhaps based upon something other than they key. If so, then it'd be great to have additional send(...) methods that take the partition number.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can wrap the producer / consumer configs in the streaming / processor congis as you mentioned.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@rhauch @ijuma thanks for the comments. Just to be clear we are actively working on addressing them and will respond them individually once the next version of this patch is finished, with a squashed commit history.

@asfbot
Copy link
Copy Markdown

asfbot commented Aug 21, 2015

kafka-trunk-git-pr #189 FAILURE
Looks like there's a problem with this pull request

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The createSensor(...) method called on lines 68-75 uses the this.metrics field, and because this.metrics is not set until line 76 the result is a NullPointerException. To fix, simply move the this.metrics = context.metrics(); line before the first call to createSensor(...).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a PR with the simple correction: confluentinc#38

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Aug 23, 2015

@guozhangwang, I'm willing to help resolve issues, add test cases, make suggestions via patches, and even add JavaDoc. But I suspect that'd be easier after after you squash commit history. Please let me know what you think.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be easier to implement if the parameter to this method were an Iterable<Entry<K,V>> than a List<Entry<K,V>>. For example, the current RocksDBKeyValueStore uses byte[] for the keys and values, and it's pretty easy to wrap that with a parameterized class that uses provided Serializer and Deserializerinstances for the keys and values -- except that theputAllmethod cannot be easily implemented as a delegate if it takes aList`. (In essence, the list has to be fully-copied before the delegation can be made.

I'd be happy to provide a patch with this fix.

@asfbot
Copy link
Copy Markdown

asfbot commented Aug 26, 2015

kafka-trunk-git-pr #229 FAILURE
Looks like there's a problem with this pull request

@asfbot
Copy link
Copy Markdown

asfbot commented Aug 26, 2015

kafka-trunk-git-pr #232 FAILURE
Looks like there's a problem with this pull request

@asfbot
Copy link
Copy Markdown

asfbot commented Aug 27, 2015

kafka-trunk-git-pr #239 FAILURE
Looks like there's a problem with this pull request

@asfbot
Copy link
Copy Markdown

asfbot commented Aug 27, 2015

kafka-trunk-git-pr #241 FAILURE
Looks like there's a problem with this pull request

@asfbot
Copy link
Copy Markdown

asfbot commented Aug 27, 2015

kafka-trunk-git-pr #245 FAILURE
Looks like there's a problem with this pull request

@asfbot
Copy link
Copy Markdown

asfbot commented Aug 28, 2015

kafka-trunk-git-pr #249 FAILURE
Looks like there's a problem with this pull request

@asfbot
Copy link
Copy Markdown

asfbot commented Aug 28, 2015

kafka-trunk-git-pr #251 FAILURE
Looks like there's a problem with this pull request

@asfbot
Copy link
Copy Markdown

asfbot commented Aug 28, 2015

kafka-trunk-git-pr #252 FAILURE
Looks like there's a problem with this pull request

…river-updated

Added a new ProcessorTopologyTestDriver class that makes it easier to unit test a ProcessorTopology
@asfbot
Copy link
Copy Markdown

asfbot commented Sep 23, 2015

kafka-trunk-git-pr #508 FAILURE
Looks like there's a problem with this pull request

Fix delete in in-memory store, and let delete return old value
@asfbot
Copy link
Copy Markdown

asfbot commented Sep 23, 2015

kafka-trunk-git-pr #509 FAILURE
Looks like there's a problem with this pull request

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@rhauch I agree about ser-de, maybe we can allow users to pass in their (de)serializers upon construction of the key-value store.

About InMemoryLRUCacheStore, feel free to go ahead with another PR.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Sep 23, 2015

@guozhangwang, see my draft PR for the serdes changes; should be up-to-date with the streaming branch. Feedback appreciated.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Sep 24, 2015

@guozhangwang, I created a PR for the InMemoryLRUCacheStore and a mini test framework for key-value stores, with new unit tests for all current KeyValueStore implementations. It does depends on my proposed serdes changes in key-value stores.

Yasuhiro Matsuda and others added 2 commits September 25, 2015 11:13
@asfbot
Copy link
Copy Markdown

asfbot commented Sep 25, 2015

kafka-trunk-git-pr #555 FAILURE
Looks like there's a problem with this pull request

@asfbot
Copy link
Copy Markdown

asfbot commented Sep 25, 2015

kafka-trunk-git-pr #562 SUCCESS
This pull request looks good

@guozhangwang
Copy link
Copy Markdown
Contributor Author

Update the PR body before merging, and the original message is here:


Some open questions collected so far on the first patch. Thanks @gwenshap @jkreps @junrao .

  • Topology API: requiring users to instantiate their own Topology class with the overridden build() function is a little awkward. Instead it would be great to let users explicitly build the topology in Main and pass it in as a class:
    Topology myTopology = new TopologyBuilder(defaultDeser)
                                                 .addProcessor("my-processor", MyProcessor.class, new Source("my-source"))
                                                 .addProcessor("my-other-processor", MyOtherProcessor.class, "my-processor");
    KafkaStreaming streaming = new KafkaStreaming(config, myTopology);
   streaming.run();

So the implementation of KStream.filter look instead like this:

    public KStream<K, V> filter(Predicate<K, V> predicate) {
        KStreamFilter<K, V> filter = new KStreamFilter<>();
        topology.addProcessor(KStreamFilter.class, new Configs("predicate", predicate));
        return this;
    }

The advantage is that the user code can now get rid of the whole Topology class with the builder. I think the order of execution for that API is quite unintuitive.

  • We can probably move the forward() function from Processor to ProcessorContext, and split ProcessorContext into two classes, one with all the function calls as commit / send / schedule / forward, and another with the metadata function calls as topic / partition / offset / timestamp.
  • Can we hide the Chooser interface from users? In other words, if users can specify the "time" on each fetched messages from Kafka, would a hard-coded MinTimestampMessageChooser be sufficient so that we can move TimestampTracker / RecordQueue / Chooser / RecordCollector / etc all to the internal folders?
  • Shall we split the o.a.k.clients into two folders, with o.a.k.clients.processor in stream? Or should we just remove o.a.k.clients.processor and make everything under o.a.k.stream? In addition, currently there is a cyclic dependency between that two, would better to break it in the end state.
  • Consider moving the external dependencies such as RocksDB into a separate jar? For example we can just include a kafka-stream-rocksdb.jar which includes the RocksDBKeyValueStore only, and later on when we deprecate / remove such implementations we can simply remove the jar itself.
  • The way the KeyValueStore is created seems a bit weird. Since this is part of the internal state managed by KafkaProcessorContext, it seems there should be an api to create the KeyValueStore from KafkaProcessorContext, instead of passing context to the constructor of KeyValueStore.
  • Merge ProcessorConfigs with ProcessorProperties.
  • We can potentially remove the processor argument in ProcessorContext.schedule().

@asfgit asfgit closed this in 263c10a Sep 26, 2015
hachikuji added a commit to hachikuji/kafka that referenced this pull request Feb 23, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Feb 23, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 2, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 3, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 9, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 10, 2017
apurvam pushed a commit to apurvam/kafka that referenced this pull request Mar 15, 2017
ijuma pushed a commit to ijuma/kafka that referenced this pull request Mar 22, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 22, 2017
efeg added a commit to efeg/kafka that referenced this pull request Jan 29, 2020
davide-armand pushed a commit to aiven/kafka that referenced this pull request Dec 1, 2025
This doesn't matter in the present state, but there's an upcoming PR that make Controller use the Control plane, and duplicating merging default and override topic config there is cumbersome. With this change, the resolution happens earlier, in `AppendInterceptor`.
fvaleri added a commit to fvaleri/kafka that referenced this pull request Apr 14, 2026
Remove unclean leader election warning
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants