KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics#9648
Conversation
There was a problem hiding this comment.
Since the root cause of this bug was basically just confusion over what exactly this set contains, a renaming feels in order
There was a problem hiding this comment.
In addition to removing the fault check, I slightly refactored this loop so that we only loop over the source nodes in this particular subtopology. Previously we would have added entries for all source nodes across the entire topology to our sourceNodesByTopic map
e5761c5 to
79b303f
Compare
There was a problem hiding this comment.
We didn't catch the bug in this test for two reasons: it has only one subtopology, and it didn't wait for Streams to get to RUNNING before it created the new topic. So we weren't even covering the "update source topics" code path since all topics existed by the first assignment
There was a problem hiding this comment.
Technically it's sufficient to just add the second KStream above for a multi-subtopology application, but I felt the test coverage could only stand to benefit with (slightly) more complicated examples
mjsax
left a comment
There was a problem hiding this comment.
Thanks for the swift fix @ableegoldman
79b303f to
dfdf635
Compare
cadonna
left a comment
There was a problem hiding this comment.
@ableegoldman Thank you for fixing this so quickly!
I just have some comments about testing.
| .selectKey((k, v) -> k) | ||
| .groupByKey() | ||
| .aggregate(() -> "", (k, v, a) -> v) | ||
| .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); |
There was a problem hiding this comment.
I would even add otherStream like:
| .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); | |
| .toStream() | |
| .merge(otherStream) | |
| .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); |
There was a problem hiding this comment.
But if we merge then won't that merge the subtopologies as well? (We would still have two subtopologies due to the upstream key-changing operation/repartition, but I wanted the test to cover different "kinds" of subtopologies like this)
There was a problem hiding this comment.
You would have a source node in one sub-topology an a source node in the other sub-topology. I thought that was the pattern in the bug report, but I now realized that the bug report uses a pattern similar to the one you specified. Wouldn't it make sense to test both?
There was a problem hiding this comment.
There are two ways to have more than one source node: either reading from a different input topic/pattern, or via a repartition. I agree that we should test both of these cases, but I'd prefer to do so in a single integration test rather than in two separate integration tests, to avoid making the test suite even longer
5c03032 to
46f2a32
Compare
|
The Java 15 tests passed, Java 8 build failed with flaky Will merge & cherrypick to older branches to unblock the 2.6.1 and 2.7.0 releases |
…n updating regex source topics (#9648) We should ignore any source nodes that aren't part of the ProcessorTopology's subtopology when updating its source topics after a change in the topic metadata. Reviewers: Bruno Cadonna <cadonna@confluent.io>, Matthias J. Sax <mjsax@confluent.io>
|
Cherrypicked to 2.7 cc @bbejeck |
…n updating regex source topics (#9648) We should ignore any source nodes that aren't part of the ProcessorTopology's subtopology when updating its source topics after a change in the topic metadata. Reviewers: Bruno Cadonna <cadonna@confluent.io>, Matthias J. Sax <mjsax@confluent.io>
|
Cherrypicked to 2.6 cc/ @mimaison |
…t-for-generated-requests * apache-github/trunk: (405 commits) KAFKA-6687: restrict DSL to allow only Streams from the same source topics (apache#9609) MINOR: Small cleanups in `AlterIsr` handling logic (apache#9663) MINOR: Increase unit test coverage of method ProcessorTopology#updateSourceTopics() (apache#9654) MINOR: fix reading SSH output in Streams system tests (apache#9665) KAFKA-10770: Remove duplicate defination of Metrics#getTags (apache#9659) KAFKA-10722: Described the types of the used state stores (apache#9607) KAFKA-10702; Skip bookkeeping of empty transactions (apache#9632) MINOR: Remove erroneous extra <code> in design doc (apache#9657) KAFKA-10736 Convert transaction coordinator metadata schemas to use g… (apache#9611) MINOR: Update vagrant/tests readme (apache#9650) KAFKA-10720: Document prohibition on header mutation by SMTs (apache#9597) KAFKA-10713: Stricter protocol parsing in hostnames (apache#9593) KAFKA-10565: Only print console producer prompt with a tty (apache#9644) MINOR: fix listeners doc to close <code> properly (apache#9655) MINOR: Remove unnecessary statement from WorkerConnector#doRun (apache#9653) KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics (apache#9648) KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert (apache#9629) MINOR: Upgrade to Scala 2.13.4 (apache#9643) MINOR: Update build and test dependencies (apache#9645) MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit (apache#9642) ... clients/src/main/java/org/apache/kafka/clients/ClientRequest.java clients/src/main/java/org/apache/kafka/clients/NetworkClient.java clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
The problem is basically just that we compare two incompatible sets in ProcessorTopology#updateSourceTopics: the local
sourceNodesByNamemap only contains nodes that correspond to a particular subtopology whereas the passed-innodeToSourceTopicsultimately comes from the InternalTopologyBuilder's map, which contains nodes for the entire topology. So we would end up hitting the IllegalStateException thrown in #updateSourceTopics` any time we tried to update an application with more than one subtopology.The fix is simple, we just need to ignore any source nodes that aren't part of the ProcessorTopology's subtopology.