KAFKA-8460: produce records with current timestamp#9877
KAFKA-8460: produce records with current timestamp#9877chia7712 merged 5 commits intoapache:trunkfrom
Conversation
|
@hachikuji , could you help review this PR? Thanks. |
|
Not sure what the original purpose of the test was, but this changes what it's testing, right? Before producing was happening after subscription and now before. |
|
Can we just bump |
|
Also, thanks so much for diagnosing this! Very awesome. :) |
a717112 to
fb751ac
Compare
…and record timestamp
|
@ijuma , thanks for the comments. I agree with you that it might changes what it was testing. In this commit(a43e72a), I updated:
I found the (3) is the main reason the test will fail because of log cleaner will delete the logs too old, and we'll get nothing to consume. So we need to set the records to the current timestamp ( or set to log.cleanup.policy to Thank you. |
|
|
||
| val consumerRecords = consumeRecords(consumer, producerRecords.size) | ||
|
|
||
| // the expected records should be 30 records * 3 topics * 30 partitions = 2700 |
There was a problem hiding this comment.
What is the purpose of this and the other comment?
| val producer = createProducer() | ||
| val producerRecords = partitions.flatMap(sendRecords(producer, numRecords = 15, _)) | ||
| // it'll send 30 records to each topic partition (3 topics and 30 partitions each topic) | ||
| val producerRecords = partitions.flatMap(sendRecords(producer, numRecords = partitionCount, _, needCurrentTime = true)) |
There was a problem hiding this comment.
Why did we change the number of records to match the partition count?
There was a problem hiding this comment.
Actually, the partitionCount is the original numRecords. And I tried to reduce the records number months ago, and still found it failed. So that's why I deep dive this flaky test. :) So, I just revert the previous change.
There was a problem hiding this comment.
Sounds good. Worth mentioning it in the PR description.
There was a problem hiding this comment.
Thanks for reminding. Updated in the PR description.
|
|
||
| protected def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int, | ||
| tp: TopicPartition): Seq[ProducerRecord[Array[Byte], Array[Byte]]] = { | ||
| tp: TopicPartition, needCurrentTime: Boolean): Seq[ProducerRecord[Array[Byte], Array[Byte]]] = { |
There was a problem hiding this comment.
Do we need this boolean? Could we always set the current time? It seems like using the existing behavior may cause problems to many other tests too.
There was a problem hiding this comment.
Of course, I'm just afraid of some tests relying on the original "fake time". Let me test it.
There was a problem hiding this comment.
Sounds good. Yes, if some tests fail after this change, we can use the previous behavior in those tests (via a default argument instead of method overload). And the new behavior should be the default.
There was a problem hiding this comment.
OK, now, the default sendRecord(producer, numRecords, tp) will send with current timestamp. Some tests will verity the timestamp after consuming, so we should keep as the old behavior.
| this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "500") | ||
| this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "100") | ||
|
|
||
| // set the 'max.poll.interval.ms' larger (default is 6 seconds), to avoid the consumer got kicked out during sending records |
There was a problem hiding this comment.
This comment can be more concise. Something like:
Avoid a rebalance while the records are being sent (the default is 6 seconds)
|
Let's see if it'll break other tests. |
|
test jdk 11/15 passed. jdk8 failed test is addressed in #9888 |
|
Thanks. I tweaked things a bit in the last commit here: If you think it looks good, please add it to your PR and please merge trunk. |
|
@showuon This test seems to be failing a lot, so it would be good to integrate the proposed changes (if you agree) so that we can merge it. |
|
@ijuma , sorry I was busy yesterday. I'll work on it today. Thanks |
ijuma
left a comment
There was a problem hiding this comment.
LGTM, thanks. Will merge if the tests pass.
|
@ijuma , Thanks for helping refactor the codes. I've fix the conflict and make sure every tests extends |
|
failed tests: Mostly they are failed in |
Originally, we make sure consumer
awaitAssignment, and then produce records. We total send 30 records to 3 topics, and each topic has 30 partitions, so it takes some time to process it. If it exceeds 6 seconds, it'll make the consumer left due to themax.poll.interval.msis set to 6 secs. And then it will delete the logs due to the record timestamp we set is 0, which will be greater than retentionMs setting (by currentTime - 0). After log deletion, we'll increment the start offset. So, later when the consumer is back, we'll first listOffset, and got the new and unexpected start offset. That's why we sometimes cannot receive expected amount of records.There are many ways to fix this issue, ex: increment the
max.poll.interval.ms, send the records with the current timestamp... I fixed it by making producing records beforeawaitAssignment, and then, I make sure we consume records right after awaitAssignemnt. This will make this test much more reliable.The numRecords changed to
partitionCountbecause the partitionCount is the original numRecords. And I tried to reduce the records number months ago, and still found it failed. So that's why I deep dive this flaky test. :) So, I just revert the previous change.PS. this flaky test investigation takes me a lot of time, and finally found the root cause! Yeah~
[UPDATED] after discussion with reviewer, we decided to produce records with current timestamp to avoid breaking original test purpose.
Committer Checklist (excluded from commit message)