MINOR: Add system test for optimization upgrades#5912
Conversation
There was a problem hiding this comment.
monitor.wait_for uses grep without the E switch so I need to escape the | in the pattern
There was a problem hiding this comment.
With 5 sub-topologies (1 source input, 4 repartition topic inputs) and 6 partitions, we end up with 30 tasks. So it's very probable that each instance only has 2 of the 3 tested aggregations i,e (AGGREGATED, REDUCED), (AGGEGRATED, JOINED), (REDUCED, JOINED) so I just verify that the STDOUT log has something from the pattern. The False parameter is used to signal verification with the operation_pattern as a whole.
There was a problem hiding this comment.
Can we use named parameters (especially for the boolean param) for clarity?
There was a problem hiding this comment.
Now there are 2 sub-topologies (1 source input, 1 repartition topic inputs) and 6 partitions; we end up with 12 tasks. So each streams instance should have at least one task for AGGREGATED, REDUCED, and JOINED so we use the True parameter to indicate test for the existence of each operation in the STDOUT of each streams instance.
There was a problem hiding this comment.
cool. (again, do you mind using named params?)
There was a problem hiding this comment.
Testing for the existence of each term by itself in the STDOUT file
There was a problem hiding this comment.
Testing for the entire pattern
There was a problem hiding this comment.
In a tiny percentage of test runs one streams instance ends up with all input source tasks, i.e. (0_0, 0_1, 0_2, 0_3), so none of the expected operations are processed on that node. So we check the task assignment and if it's all input source tasks we skip checking this node.
I added this check after noticing some test flakiness.
There was a problem hiding this comment.
Hmm.. I'm wondering why it is still possible, as we should have achieved balance across sub-topologies right? Or are there any potential edge cases you are aware of while working on that PR @bbejeck ?
There was a problem hiding this comment.
I think as long as we favor stickiness over load-balancing this is always a possibility. One thing to note I only observed one instance getting all tasks from one sub-topology after the first phase of the test meaning stickiness is a factor, and it seemed to be a tiny percentage. I put the check in to eliminate test flakiness.
I have some additional thoughts on putting back the check we had in making sure that adding a task from the same sub-topology only happens when all clients are over-capacity. But I'd like to do that in a separate PR and write an independent system test for that.
WDYT?
There was a problem hiding this comment.
Interesting! I didn't realize we attempt to balance subtopologies over the instances... Is this important for some reason?
There was a problem hiding this comment.
Checking the task assignment for this processor node.
|
ping @guozhangwang, @mjsax, and @vvcephei for reviews |
mjsax
left a comment
There was a problem hiding this comment.
Not sure, if I fully understand the test setup, ie, what Python part does. Will revisit tomorrow again.
There was a problem hiding this comment.
nit: StreamsUpgradeTest -> StreamsOptimizedTest
There was a problem hiding this comment.
nit: simplify to final String propFileName = args[0];
There was a problem hiding this comment.
Why this?
Also, there is a peek() in the other PR, that cannot be switched on/off. I like the idea, just want to get clarification to get a unique strategy we apply for system tests.
There was a problem hiding this comment.
I needed this Sysout for debugging when writing the test, but I don't need to verify the output, hence the guard, but since it's no longer needed I'll remove it.
There was a problem hiding this comment.
It's not guarded as the output is needed for verification in the test. The one above was strictly for debugging purposes, but I've taken it out.
There was a problem hiding this comment.
It's not guarded as the output is needed for verification in the test. The one above was strictly for debugging purposes, but I've taken it out.
There was a problem hiding this comment.
Why do we need this? This info is logged already
There was a problem hiding this comment.
nit: why starting the name with _ ?
There was a problem hiding this comment.
trying to mark it as a private method, but thinking about it more, that doesn't make too much sense, I'll remove the _.
There was a problem hiding this comment.
Is this a naming convention? Or a Python feature (ie, does an starting _ make a method private in Python?)
There was a problem hiding this comment.
Why is this called index? Seems like a retry to me? -- Also, why do we need a retry? Can you use a monitor instead of direct ssh_capture ?
There was a problem hiding this comment.
updated variable name.
I'll try using monitor , but I went with ssh_capture as it allows me to set my own regex and monitor.wait_for uses grep without the -E flag so I wasn't sure I could be as specific as I needed.
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks @bbejeck . I just have some minor comments.
One meta question is that why we can still observe imbalance that tasks of the same sub-topology are not distributed evenly, do you have any ideas?
There was a problem hiding this comment.
Since we set default serde as String, String already, do we still need the Produced / Consumed when constructing the topology?
There was a problem hiding this comment.
It's a habit for me to always put those in. If you want me to remove them I will.
There was a problem hiding this comment.
No need to remove, just curious if there are any issues I do not know about that enforces you to add it :)
There was a problem hiding this comment.
nit: I'd suggest print the list to sysout as well for debugging, since the number of matched ones may not be sufficient.
There was a problem hiding this comment.
Hmm.. I'm wondering why it is still possible, as we should have achieved balance across sub-topologies right? Or are there any potential edge cases you are aware of while working on that PR @bbejeck ?
|
@bbejeck as a side note, we believe that when doing this upgrade there may be a minor amount of data loss as some repartition topics that gets merged may have data not processed yet. Could you add some logs to illustrate how much data in percentage may be incurred in lost (we do not make it as a verification phase, but just print it out like "produced XXX, consumed YYY, lost XXX - YYY". |
|
@guozhangwang @mjsax updated per comments |
This could be a little tricky as I'm using the |
There was a problem hiding this comment.
changed this as current active tasks always shows up in the log files, where Committed active tasks may or may not be in the log file.
Sounds good. |
|
LGTM. @mjsax feel free to merge after you made another pass. |
@bbejeck any ideas? |
Here's my original response #5912 (comment) Thinking about this some more, I also think setting |
Thanks for the explanation, and it makes sense to me that if the imbalance only happens on the second phase, then it is indeed possible because maybe not all instances participated in the first rebalance. I agree that |
|
retest this please |
1 similar comment
|
retest this please |
|
Hey @bbejeck , Sorry my review is late, but since the tests are still failing, maybe I can sneak in two comments (above, about named parameters in python)? Regardless, it LGTM. Thanks! |
e372f97 to
7e36e54
Compare
|
@vvcephei updated per comments and kicked off new system test https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2104/ EDIT: rebased from trunk as well |
|
System test succeeded, merging to trunk now. |
This is a new system test testing for optimizing an existing topology. This test takes the following steps 1. Start a Kafka Streams application that uses a selectKey then performs 3 groupByKey() operations and 1 join creating four repartition topics 2. Verify all instances start and process data 3. Stop all instances and verify stopped 4. For each stopped instance update the config for TOPOLOGY_OPTIMIZATION to all then restart the instance and verify the instance has started successfully also verifying Kafka Streams reduced the number of repartition topics from 4 to 1 5. Verify that each instance is processing data from the aggregation, reduce, and join operation Stop all instances and verify the shut down is complete. 6. For testing I ran two passes of the system test with 25 repeats for a total of 50 test runs. All test runs passed Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This is a new system test testing for optimizing an existing topology. This test takes the following steps
selectKeythen performs 3groupByKey()operations and 1joincreating four repartition topicsTOPOLOGY_OPTIMIZATIONtoallthen restart the instance and verify the instance has started successfully also verifying Kafka Streams reduced the number of repartition topics from 4 to 1aggregation,reduce, andjoinoperationFor testing I ran two passes of the system test with 25 repeats for a total of 50 test runs.
All test runs passed
First 25 system test runs
Second 25 system test runs
Committer Checklist (excluded from commit message)