KAFKA-18513: Validate share state topic records produced in tests.#18521
KAFKA-18513: Validate share state topic records produced in tests.#18521AndrewJSchofield merged 10 commits intoapache:trunkfrom
Conversation
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the PR. Just a few minor comments.
| DEFAULT_MAX_WAIT_MS, 100L, () -> "cache not up yet"); | ||
| Map<String, Object> consumerConfigs = new HashMap<>(); | ||
| consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); | ||
| consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); |
There was a problem hiding this comment.
I think you could use KafkaConsumer.assign() instead so that the test doesn't have to wait for the consumer to join the consumer group.
There was a problem hiding this comment.
This too is causing flake
There was a problem hiding this comment.
I think that implies that the test would also be flaky with subscribe instead of assign, just that it runs slightly more slowly so the risk of flakiness is lower.
There was a problem hiding this comment.
Perhaps but since the pattern is same as existing tests. This addition should not decrease reliability.
There was a problem hiding this comment.
@AndrewJSchofield It was a blunder on my part. The SGS topic is set to have 3 partitions in the cluster config and I was only assigning to 1 partition. Will rectify.
| if (msgs.count() > 0) { | ||
| msgs.records(Topic.SHARE_GROUP_STATE_TOPIC_NAME).forEach(records::add); | ||
| } | ||
| return records.size() == messageCount + 2; // +2 because of extra warmup records |
There was a problem hiding this comment.
nit: extra space before ==
| consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); | ||
| consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); | ||
| consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); | ||
| consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
There was a problem hiding this comment.
Then you could also use seekToBeginning()
There was a problem hiding this comment.
this is causing flake - I will keep the config
AndrewJSchofield
left a comment
There was a problem hiding this comment.
It seems to me that testMultipleConsumersInGroupSequentialConsumption is flaky with this PR.
It appears that occasionally this error is happening It would seem this is resulting in off by one errors - sometimes RPCs are 4 and sometimes 5. I will remove this check from this test for now |
|
I also had a failure locally in |
Then maybe exact record count match is too strict. Perhaps checking for record being produced should be the way to go. Another reason for this is the RPC coalescing and batching which might make the number of records actually written different each time and as a result make the tests brittle. |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the changes. I ran this test repeatedly yesterday and I think it's time to remove the no-op persister from this test now. The new changes add about 6 minutes to the execution time. The no-op persister is not adding any value any longer and it takes about 6 minutes too. Please remove the parameterisation of the persister and just use the default persister in the tests.
| } | ||
|
|
||
| private void maybeVerifyShareGroupStateTopicRecordCount(String persister, int messageCount) { | ||
| private void maybeVerifyShareGroupStateTopicRecordCount(String persister) { |
There was a problem hiding this comment.
The method name now seems redundant. I suggest verifyShareGroupStateTopicRecordsProduced.
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Just a couple of additional comments. Thanks for the update.
| .setConfigProp("group.share.enable", "true") | ||
| .setConfigProp("group.share.partition.max.record.locks", "10000") | ||
| .setConfigProp("group.share.persister.class.name", persisterClassName) | ||
| .setConfigProp("group.share.persister.class.name", DEFAULT_STATE_PERSISTER) |
There was a problem hiding this comment.
This is the default. Not required.
| DEFAULT_MAX_WAIT_MS, 100L, () -> "cache not up yet"); | ||
| Map<String, Object> consumerConfigs = new HashMap<>(); | ||
| consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); | ||
| consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); |
There was a problem hiding this comment.
I think you can remove the group.id config too because of the use of KafkaConsumer.assign.
AndrewJSchofield
left a comment
There was a problem hiding this comment.
One final comment. When the build next finishes, please can you triage the failed tests and make sure there are open issues for failed tests. ClientUtilsTest#testParseAndValidateAddressesWithReverseLookup is expected to fail until #18549 is merged. Thanks.
|
|
||
| private void verifyShareGroupStateTopicRecordsProduced() { | ||
| try { | ||
| TestUtils.waitForCondition(() -> |
There was a problem hiding this comment.
This condition will already be satisfied because it mirrors the warmup() method which runs before each test. I think you can remove this as redundant code.
They seem to be completely unrelated to our work. |
True, but if some of the failed tests are flaky but not marked as flaky, then we ought to do something about it. |
Did multiple runs for each of the tests: NO ERRORSFLAKE FOUNDhttps://issues.apache.org/jira/browse/KAFKA-18550 https://issues.apache.org/jira/browse/KAFKA-18551 |
AndrewJSchofield
left a comment
There was a problem hiding this comment.
lgtm. Will merge once the build is complete.
…pache#18521) Reviewers: Andrew Schofield <aschofield@confluent.io>
…pache#18521) Reviewers: Andrew Schofield <aschofield@confluent.io>
…pache#18521) Reviewers: Andrew Schofield <aschofield@confluent.io>
the records.