KAFKA-9739: Fixes null key changing child node#8400
Conversation
…the parent of at least one repartition topics to be optimized.
…ork was done on 2.4 branch and the naming conventions for repartition topics has changed.
…he merge node to update the optimization map with the merge node vs. the key-changing node.
|
An excellent example of this in action is https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java. Here's the un-optimized topology And the optimized one |
|
ping @guozhangwang, @mjsax, and @vvcephei |
|
Thanks @bbejeck ! also cc @ableegoldman @cadonna to take a look as well. |
| mergeNodesToKeyChangers.get(mergeNode).add(key); | ||
| final Set<Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>>> entrySet = keyChangingOperationsToOptimizableRepartitionNodes.entrySet(); | ||
| for (final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>> entry : entrySet) { | ||
| if (mergeNodeHasRepartitionChildren(mergeNode, entry.getValue())) { |
|
Java 11 failed with Java 8 failed with retest this please. |
|
Java 8 failed with Java 11 passed retest this please. |
vvcephei
left a comment
There was a problem hiding this comment.
Thanks for the lucid PR in response to a truly mind-bending bug. The explanation sounds right to me, and the code looks right. The test looks good, too.
Thanks!
|
Merged #8400 into trunk. |
|
Thanks, @bbejeck ! |
2.4 port of #8400 since cherry-picking not possible Reviewers: John Roesler <john@confluent.io>
2.4 port of apache#8400 since cherry-picking not possible Reviewers: John Roesler <john@confluent.io>
A port of #8400 for 2.3. The process of sorting source and sink nodes changed in 2.4, so we can't cherry-pick the PR directly as we need to update the expected topology to what it would be in the 2.3 version. Reviewers: John Roesler <john@confluent.io>, Andrew Choi <a24choi@edu.uwaterloo.ca>
A port of #8400 for 2.3. The process of sorting source and sink nodes changed in 2.4, so we can't cherry-pick the PR directly as we need to update the expected topology to what it would be in the 2.3 version. Reviewers: John Roesler <john@confluent.io>, Andrew Choi <a24choi@edu.uwaterloo.ca>
For some context, when building a streams application, the optimizer keeps track of the key-changing operations and any repartition nodes that are descendants of the key-changer. During the optimization phase (if enabled), any repartition nodes are logically collapsed into one. The optimizer updates the graph by inserting the single repartition node between the key-changing node and its first child node. This graph update process is done by searching for a node that has the key-changing node as one of its direct parents, and the search starts from the repartition node, going up in the parent hierarchy. The one exception to this rule is if there is a merge node that is a descendant of the key-changing node, then during the optimization phase, the map tracking key-changers to repartition nodes is updated to have the merge node as the key. Then the optimization process updates the graph to place the single repartition node between the merge node and its first child node. The error in KAFKA-9739 occurred because there was an assumption that the repartition nodes are children of the merge node. But in the topology from KAFKA-9739, the repartition node was a parent of the merge node. So when attempting to find the first child of the merge node, nothing was found (obviously) resulting in StreamException(Found a null keyChangingChild node for..) This PR fixes this bug by first checking that all repartition nodes for optimization are children of the merge node. This PR includes a test with the topology from KAFKA-9739. Reviewers: John Roesler <john@confluent.io>
This is a port of #8400 for the 2.5 branch For some context, when building a streams application, the optimizer keeps track of the key-changing operations and any repartition nodes that are descendants of the key-changer. During the optimization phase (if enabled), any repartition nodes are logically collapsed into one. The optimizer updates the graph by inserting the single repartition node between the key-changing node and its first child node. This graph update process is done by searching for a node that has the key-changing node as one of its direct parents, and the search starts from the repartition node, going up in the parent hierarchy. The one exception to this rule is if there is a merge node that is a descendant of the key-changing node, then during the optimization phase, the map tracking key-changers to repartition nodes is updated to have the merge node as the key. Then the optimization process updates the graph to place the single repartition node between the merge node and its first child node. The error in KAFKA-9739 occurred because there was an assumption that the repartition nodes are children of the merge node. But in the topology from KAFKA-9739, the repartition node was a parent of the merge node. So when attempting to find the first child of the merge node, nothing was found (obviously) resulting in StreamException(Found a null keyChangingChild node for..) This PR fixes this bug by first checking that all repartition nodes for optimization are children of the merge node. Reviewers: John Roesler <john@confluent.io>
2.4 port of apache#8400 since cherry-picking not possible Reviewers: John Roesler <john@confluent.io>


For some context, when building a streams application, the optimizer keeps track of the key-changing operations and any repartition nodes that are descendants of the key-changer. During the optimization phase (if enabled), any repartition nodes are logically collapsed into one. The optimizer updates the graph by inserting the single repartition node between the key-changing node and its first child node. This graph update process is done by searching for a node that has the key-changing node as one of its direct parents, and the search starts from the repartition node, going up in the parent hierarchy.
The one exception to this rule is if there is a merge node that is a descendant of the key-changing node, then during the optimization phase, the map tracking key-changers to repartition nodes is updated to have the merge node as the key. Then the optimization process updates the graph to place the single repartition node between the merge node and its first child node.
The error in KAFKA-9739 occurred because there was an assumption that the repartition nodes are children of the merge node. But in the topology from KAFKA-9739, the repartition node was a parent of the merge node. So when attempting to find the first child of the merge node, nothing was found (obviously) resulting in
StreamException(Found a null keyChangingChild node for..)This PR fixes this bug by first checking that all repartition nodes for optimization are children of the merge node.
This PR includes a test with the topology from KAFKA-9739.
Committer Checklist (excluded from commit message)