KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress#9022
KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress#9022ijuma merged 6 commits intoapache:trunkfrom
Conversation
TopicCommandWithAdminClientTest# testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
| Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get() | ||
| waitForTopicCreated(testTopicName) | ||
| TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1) | ||
| TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 1000, acks = -1) |
There was a problem hiding this comment.
Note that although we're bumping this to 1000, it should be a single produce batch with really small data, so on the order of a couple KBs.
There was a problem hiding this comment.
With the throttle approach we don't want it to be produced in a single batch. If all messages are produced in a single batch then you will still fetch the entire batch in a single request even if replica.fetch.max.bytes is 1, as we allow this limit to be broken to fetch at least one message.
There was a problem hiding this comment.
So each message creates its own ProducerRecord, I was under the assumption that the limit to fetch expands to a single record, and not the batch itself. Or am I misunderstanding?
There was a problem hiding this comment.
Each message creates its own ProducerRecord but these get collected into batches by the producer. Fetch requests will not break up a batch unless down-converting to the old record batch format, so you could end up with 1000 messages in a single batch being fetched in a single fetch request.
There was a problem hiding this comment.
The broker only works in terms of record batches, not individual records.
|
Please review, @lbradstreet - thanks! |
| TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 1000, acks = -1) | ||
|
|
||
| val brokerIds = servers.map(_.config.brokerId) | ||
| TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1) |
There was a problem hiding this comment.
Why did we remove this? We still need the throttle to be set so that the replica doesn't join the ISR. It could easily do this even if it only fetches a message at a time. The idea with the new replica fetch max bytes setting is to give the broker a chance to throttle itself before joining the ISR.
There was a problem hiding this comment.
Added back, but to clarify - are you suggesting the high watermark could still be at 0 (or relatively low) even after producing the messages? Or just that both combined are a more effective throttle, essentially putting it at 1 record/sec?
There was a problem hiding this comment.
I simply meant that we still need the replication throttle so the reassignment doesn't complete before we perform our checks.
bdbyrne
left a comment
There was a problem hiding this comment.
Thanks for the review, a couple questions.
| Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get() | ||
| waitForTopicCreated(testTopicName) | ||
| TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1) | ||
| TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 1000, acks = -1) |
There was a problem hiding this comment.
So each message creates its own ProducerRecord, I was under the assumption that the limit to fetch expands to a single record, and not the batch itself. Or am I misunderstanding?
| TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 1000, acks = -1) | ||
|
|
||
| val brokerIds = servers.map(_.config.brokerId) | ||
| TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1) |
There was a problem hiding this comment.
Added back, but to clarify - are you suggesting the high watermark could still be at 0 (or relatively low) even after producing the messages? Or just that both combined are a more effective throttle, essentially putting it at 1 record/sec?
lbradstreet
left a comment
There was a problem hiding this comment.
The fix works and the test is no longer flaky with the changes made. Someone may have a better idea about how to avoid this kind of check which will be brittle if someone renames the test https://github.com/apache/kafka/pull/9022/files#diff-0bc85b210490e2fc83d3b11c6a9d83dfR67
|
ok to test |
| private val defaultReplicationFactor = 1.toShort | ||
|
|
||
| private def replicaFetchMaxBytes() = | ||
| if (testName.getMethodName == "testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress") Some(1) |
There was a problem hiding this comment.
It's always an annoyance with Junit that there is no way for the test case to override initialization in a @Before. What we often end up doing is removing the annotation and calling the setup method explicitly in each test case. I slightly prefer that option since it is easier to understand, but not sure it is possible since we're extending KafkaServerTestHarness, which has its own initialization logic. For the sake of argument, would it be possible to set max fetch bytes to 1 for all tests? Either that or maybe we should just produce more data in the test case.
There was a problem hiding this comment.
Agreed, the KafkaServerTestHarness makes it more difficult. I've updated the test to set max fetch bytes to 1 for all tests, which is fine given none of the other tests produce data.
|
retest this please |
|
ok to test |
- Remove Thread.sleep - Don't change signature of `TestUtils.createBrokerConfigs` for infrequently used parameter. - Remove unnecessary `()`
ijuma
left a comment
There was a problem hiding this comment.
I pushed some minor fixes. LGTM now.
|
ok to test |
|
retest this please |
|
Green build, merging to trunk, 2.6 and 2.5. |
…ignmentIsInProgress (#9022) Set `replica.fetch.max.bytes` to `1` and produce multiple record batches to allow for throttling to take place. This helps avoid a race condition where the reassignment would complete more quickly than expected causing an assertion to fail some times. Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jason Gustafson <jason@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
…ignmentIsInProgress (#9022) Set `replica.fetch.max.bytes` to `1` and produce multiple record batches to allow for throttling to take place. This helps avoid a race condition where the reassignment would complete more quickly than expected causing an assertion to fail some times. Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jason Gustafson <jason@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
…t-for-generated-requests * apache-github/trunk: (148 commits) MINOR: remove NewTopic#NO_PARTITIONS and NewTopic#NO_REPLICATION_FACTOR as they are duplicate to CreateTopicsRequest#NO_NUM_PARTITIONS and CreateTopicsRequest#NO_REPLICATION_FACTOR (apache#9077) MINOR: Remove staticmethod tag to be able to use logger of instance (apache#9086) MINOR: Adjust 'release.py' script to use shell when using gradlewAll and PGP signing, which were required to build the 2.6.0 RCs (apache#9045) MINOR: Update dependencies for Kafka 2.7 (part 1) (apache#9082) MINOR: INFO log4j when request re-join (apache#9068) MINOR: Recommend Java 11 (apache#9080) KAFKA-10306: GlobalThread should fail on InvalidOffsetException (apache#9075) KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress (apache#9022) MINOR: code cleanup for `VOut` inconsistent naming (apache#8907) KAFKA-10246 : AbstractProcessorContext topic() throws NPE (apache#9034) KAFKA-10305: Print usage when parsing fails for ConsumerPerformance (apache#9071) MINOR: removed incorrect deprecation annotations (apache#9061) MINOR: speed up release script (apache#9070) MINOR: add task ':streams:testAll' (apache#9073) KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates (apache#9065) KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work (apache#9051) KAFKA-10300 fix flaky core/group_mode_transactions_test.py (apache#9059) MINOR: Publish metrics package in the javadoc (apache#9036) KAFKA-8264: decrease the record size for flaky test KAFKA-5876: Add new exception types for Interactive Queries (apache#8200) ...
Set
replica.fetch.max.bytesto1and produce multiple record batches to allowfor throttling to take place. This helps avoid a race condition where the
reassignment would complete more quickly than expected causing an
assertion to fail some times.
Committer Checklist (excluded from commit message)