KAFKA-6455: Extend CacheFlushListener to forward timestamp#6147
KAFKA-6455: Extend CacheFlushListener to forward timestamp#6147mjsax merged 3 commits intoapache:trunkfrom
Conversation
|
Call for review @guozhangwang @bbejeck @vvcephei |
mjsax
left a comment
There was a problem hiding this comment.
Marked the relevant changes.
All others are to keep the code working :)
| * @param timestamp timestamp of new value | ||
| */ | ||
| void apply(final K key, final V newValue, final V oldValue); | ||
| void apply(final K key, final V newValue, final V oldValue, final long timestamp); |
There was a problem hiding this comment.
This is the actual change.
| package org.apache.kafka.streams.kstream.internals; | ||
|
|
||
| import org.apache.kafka.streams.state.internals.ThreadCache; | ||
| package org.apache.kafka.streams.state.internals; |
There was a problem hiding this comment.
Minor fix: The interface should be in state package, not kstream.
| serdes.keyFrom(entry.key().get()), | ||
| serdes.valueFrom(entry.newValue()), | ||
| oldValue, | ||
| entry.entry().context().timestamp()); |
There was a problem hiding this comment.
part of the fix: we now also forward the timestamp on eviction
| key, | ||
| newValue, | ||
| oldValue, | ||
| entry.entry().context().timestamp()); |
There was a problem hiding this comment.
part of the fix: we now also forward the timestamp on eviction
| windowedKey, | ||
| serdes.valueFrom(entry.newValue()), | ||
| oldValue, | ||
| entry.entry().context().timestamp()); |
There was a problem hiding this comment.
part of the fix: we now also forward the timestamp on eviction
|
Retest this please. |
| } | ||
| forward(child, key, value); | ||
| } else { | ||
| if (children.size() == 1) { |
There was a problem hiding this comment.
I think this if/else block here can be removed and just go with
for (final ProcessorNode child : children) {
forward(child, key, value);
}There was a problem hiding this comment.
I copied this from ProcessorContextImpl#forward() -- we added it there are "optimization" to avoid creating the iterator -- might be pre-mature optimization to be fair.
I don't feel strong about it, however, I would prefer to keep both classed "in sync". Thus, if we remove it here, I think we should also remove it from there?
Throughts?
There was a problem hiding this comment.
Yeah if it exists elsewhere let's just leave it as is for now.
There was a problem hiding this comment.
Let's remove on both side: I think in J8 it is not really a big difference.
| for (final ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode().children()) { | ||
| setCurrentNode(child); | ||
| child.process(key, value); | ||
| final List<ProcessorNode<K, V>> children = (List<ProcessorNode<K, V>>) currentNode().children(); |
There was a problem hiding this comment.
We should add a test or tests for this section
There was a problem hiding this comment.
Thinking about this once more, can a "global processor" have multiple children to begin with? Maybe we can simplify this method significantly? Thoughts?
Will add a test for GlobalProcessorContextImpl though -- there is not at all atm.
There was a problem hiding this comment.
Thinking about this once more, can a "global processor" have multiple children to begin with?
Yeah, you could be correct about, I don't think it can.
There was a problem hiding this comment.
+1 on assuming a single children, check-and-throw-otherwise
|
Updated this. |
|
Retest this please. |
guozhangwang
left a comment
There was a problem hiding this comment.
Pleas feel free to merge after comments addressed.
| for (final ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode().children()) { | ||
| setCurrentNode(child); | ||
| child.process(key, value); | ||
| final List<ProcessorNode<K, V>> children = (List<ProcessorNode<K, V>>) currentNode().children(); |
There was a problem hiding this comment.
+1 on assuming a single children, check-and-throw-otherwise
| } | ||
| forward(child, key, value); | ||
| } else { | ||
| if (children.size() == 1) { |
There was a problem hiding this comment.
Let's remove on both side: I think in J8 it is not really a big difference.
| package org.apache.kafka.streams.kstream.internals; | ||
|
|
||
| import org.apache.kafka.streams.state.internals.ThreadCache; | ||
| package org.apache.kafka.streams.state.internals; |
| import org.junit.Before; | ||
| import org.junit.Test; | ||
|
|
||
| import java.util.ArrayList; |
There was a problem hiding this comment.
Shouldn't we add new test case similar to the ones in GlobalProcessorContextImplTest.java in this class as well?
There was a problem hiding this comment.
Why? ProcessorContextImpl was not changed.
There was a problem hiding this comment.
@bbejeck @guozhangwang I was thinking about this once more, and for global tasks, we know that there are exactly two processors: the source processor and the "state maintainer" processor. The first one has exactly one child and it calls forward() from above. The second processor only call this forward overload on flush and should have zero children.
Does this make sense?
There was a problem hiding this comment.
There is a related JIRA about that but whether we'd keep it as is still open questions, I think we can make this assumption still atm but just bring it up FYI.
There was a problem hiding this comment.
The changes in this class are just a code simplification as discussed with @bbejeck and @guozhangwang -- no change in behavior.
|
Updated this. |
8e74612 to
30b8447
Compare
|
Retest this please. \cc For review @ableegoldman |
|
Test failure in core. Retest this please. |
|
Retest this please. |
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
* ak/trunk: MINOR: fix race condition in KafkaStreamsTest (apache#6185) KAFKA-4850: Enable bloomfilters (apache#6012) MINOR: ducker-ak: add down -f, avoid using a terminal in ducker test KAFKA-5117: Stop resolving externalized configs in Connect REST API MINOR: Cleanup handling of mixed transactional/idempotent records (apache#6172) KAFKA-7844: Use regular subproject for generator to fix *All targets (apache#6182) Fix Documentation for cleanup.policy is out of date (apache#6181) MINOR: increase timeouts for KafkaStreamsTest (apache#6178) MINOR: Rejoin split ssl principal mapping rules (apache#6099) MINOR: Handle case where connector status endpoints returns 404 (apache#6176) MINOR: Remove unused imports, exceptions, and values (apache#6117) KAFKA-3522: Add internal RecordConverter interface (apache#6150) Fix Javadoc of KafkaConsumer (apache#6155) KAFKA-6455: Extend CacheFlushListener to forward timestamp (apache#6147) MINOR: Log partition info when creating new request batch in controller (apache#6145) KAFKA-7652: Part I; Fix SessionStore's findSession(single-key) (apache#6134) MINOR: Remove the InvalidTopicException handling in InternalTopicManager (apache#6167) [KAFKA-7024] Rocksdb state directory should be created before opening the DB (apache#6138) MINOR:: Fix typos (apache#6079)
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This PR is of part of the overall KIP-258 story. It does not resolve KAFKA-6455, but prepares it. It's only internal changes so we can merge right away.