MINOR: Producers should set delivery timeout instead of retries #5425
MINOR: Producers should set delivery timeout instead of retries #5425hachikuji merged 8 commits intoapache:trunkfrom
Conversation
ijuma
left a comment
There was a problem hiding this comment.
Do we want to mention this in the upgrade notes?
|
Also, what if people have set retries explicitly? Maybe we should not set this if that's set. |
|
Also, docs say: * @note For mirror maker, the following settings are set by default to make sure there is no data loss:
* 1. use producer with following settings
* acks=all
* retries=max integer
* max.block.ms=max long
* max.in.flight.requests.per.connection=1
* 2. Consumer Settings
* enable.auto.commit=false
* 3. Mirror Maker Setting:
* abort.on.send.failure=true |
27d6371 to
f7494dd
Compare
|
Maybe we should log a warning if retries is set. Or does the producer do that already? |
|
I'm going to go ahead and change the other overrides of |
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the updates. Overall makes sense, just a few questions and comments.
| + "greater than " + REQUEST_TIMEOUT_MS_CONFIG + " + " + LINGER_MS_CONFIG; | ||
| private static final String DELIVERY_TIMEOUT_MS_DOC = "An upper bound on the time to report success or failure after " | ||
| + "Producer.send() returns. The producer may report failure to send a record earlier than this config if all " | ||
| + "the retries have been exhausted or a record is added to a batch nearing expiration. " |
There was a problem hiding this comment.
What config is used for "batch expiration"?
There was a problem hiding this comment.
Urmm, delivery timeout. I found this wording a little confusing as well. I will try to rephrase.
| producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
| producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); | ||
| producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class | ||
| producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 0); // we handle retries in this class |
There was a problem hiding this comment.
Do we really want to do this? Do we ensure that we send the record at least once or could it fail in the accumulator?
There was a problem hiding this comment.
Actually we probably just want to leave this as it was.
| producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Int.MaxValue.toString) | ||
| val producer = TestUtils.createProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, | ||
| saslProperties = clientSaslProperties, retries = 0, lingerMs = Int.MaxValue, props = Some(producerProps)) | ||
| saslProperties = clientSaslProperties, lingerMs = Int.MaxValue, props = Some(producerProps)) |
There was a problem hiding this comment.
Do you know why we were setting this to 0 previously?
There was a problem hiding this comment.
There was no obvious reason. As far as I can tell, we are just using this function to populate some data in order to check consumer operations.
|
|
||
| private case class ProducerBuilder() extends ClientBuilder[KafkaProducer[String, String]] { | ||
| private var _retries = 0 | ||
| private var _retries = Int.MaxValue |
There was a problem hiding this comment.
It is being overridden in some cases.
| bufferSize: Long = 1024L * 1024L, | ||
| retries: Int = 0, | ||
| retries: Int = Int.MaxValue, | ||
| deliveryTimeoutMs: Int = 20000, |
There was a problem hiding this comment.
I think we're not using this variable. Also, it should be higher than request timeout, which is 30 seconds by default.
There was a problem hiding this comment.
Default value for delivery timeout is already 120 secs, why do we want to set its default value here to be smaller than that?
| final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(); | ||
| tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100"); | ||
| tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, 10); | ||
| tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); |
There was a problem hiding this comment.
The default is already 120000, which I think is reasonable: 2 minutes is quite long already.
| fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); | ||
| fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
| fullProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); | ||
| fullProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); |
There was a problem hiding this comment.
Seems like it would be better to avoid infinite so that the tests don't hang forever.
There was a problem hiding this comment.
Sure, I think the defaults are good enough.
There was a problem hiding this comment.
Yes, both smoke and eos test client would not expect the brokers to be down for very long time in system test, so having a reasonable value, like the default 2min should be fine.
Ditto below.
| producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); | ||
| // the next 2 config values make sure that all records are produced with no loss and no duplicates | ||
| producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); | ||
| producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); |
There was a problem hiding this comment.
Same as the other comment about tests not using infinite, if possible. Although we should probably use it in one unit test to verify that we don't overflow.
| producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
| producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); | ||
| producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); | ||
| producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); |
| lingerMs: Int = 0, | ||
| props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = { | ||
| val producer = TestUtils.createProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, | ||
| saslProperties = clientSaslProperties, retries = retries, lingerMs = lingerMs, props = props) |
There was a problem hiding this comment.
I was thinking that it would be good to have at least one test with retries = 0. Do we have such a test?
|
Are the jenkins failure related? |
| + " Note that this retry is no different than if the client resent the record upon receiving the error." | ||
| + " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the" | ||
| + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second" | ||
| + " succeeds, then the records in the second batch may appear first. Note that the produce requests will be" |
| bufferSize: Long = 1024L * 1024L, | ||
| retries: Int = 0, | ||
| retries: Int = Int.MaxValue, | ||
| deliveryTimeoutMs: Int = 20000, |
There was a problem hiding this comment.
Default value for delivery timeout is already 120 secs, why do we want to set its default value here to be smaller than that?
| * increase {@link ConsumerConfig#MAX_POLL_INTERVAL_MS_CONFIG} using the following guidance: | ||
| * <pre> | ||
| * max.poll.interval.ms > min ( max.block.ms, (retries +1) * request.timeout.ms ) | ||
| * max.poll.interval.ms > max.block.ms |
| final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(); | ||
| tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100"); | ||
| tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, 10); | ||
| tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); |
There was a problem hiding this comment.
The default is already 120000, which I think is reasonable: 2 minutes is quite long already.
| fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); | ||
| fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
| fullProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); | ||
| fullProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); |
There was a problem hiding this comment.
Yes, both smoke and eos test client would not expect the brokers to be down for very long time in system test, so having a reasonable value, like the default 2min should be fine.
Ditto below.
dbc944a to
86129cf
Compare
86129cf to
e02c5ec
Compare
|
@hachikuji Could we trigger a Streams system test to validate that the related test cases are not broken? otherwise LGTM. |
|
@guozhangwang Ran the streams system tests and they all passed. Merging to trunk. |
As part of apache#5425 the streams default override for producer retries was removed. The documentation was not updated to reflect that change. Reviewers: Matthias J. Sax <mjsax@apache.org>, Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
As part of #5425 the streams default override for producer retries was removed. The documentation was not updated to reflect that change. Reviewers: Matthias J. Sax <mjsax@apache.org>, Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
As part of #5425 the streams default override for producer retries was removed. The documentation was not updated to reflect that change. Reviewers: Matthias J. Sax <mjsax@apache.org>, Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
As part of #5425 the streams default override for producer retries was removed. The documentation was not updated to reflect that change. Reviewers: Matthias J. Sax <mjsax@apache.org>, Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
… default docs. (apache#6844) TICKET = LI_DESCRIPTION = EXIT_CRITERIA = HASH [aa1a285] ORIGINAL_DESCRIPTION = As part of apache#5425 the streams default override for producer retries was removed. The documentation was not updated to reflect that change. Reviewers: Matthias J. Sax <mjsax@apache.org>, Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bbejeck@gmail.com> (cherry picked from commit aa1a285)
MirrorMaker should set
delivery.timeout.msinstead ofretriesnow that we have KIP-91.Committer Checklist (excluded from commit message)