MINOR: remove unnecessary timeout for admin request#8738
MINOR: remove unnecessary timeout for admin request#8738mjsax merged 11 commits intoapache:trunkfrom
Conversation
| private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. " + | ||
| "Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact)."; | ||
|
|
||
| private static final class InternalAdminClientConfig extends AdminClientConfig { |
There was a problem hiding this comment.
Moved to ClientUtil
| if (nextProbingRebalanceMs.get() < time.milliseconds()) { | ||
| log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get()); | ||
| mainConsumer.enforceRebalance(); | ||
| nextProbingRebalanceMs.set(Long.MAX_VALUE); |
There was a problem hiding this comment.
This is neither relevant to this PR nor required for correctness, but I noticed the log message above tends to spam the logs in some tests. Since this gets set/reset at the end of every rebalance, we may as well reset it here to avoid an avalanche of Triggering the followup rebalance...
There was a problem hiding this comment.
enforceRebalance is guaranteed not to actually run the assignment logic, right? That will only run during a call to poll, I'm hoping. Otherwise, this line should go before the call.
There was a problem hiding this comment.
Yep. It just provides a notice to the consumer to enforce that a rebalance will occur on the next poll
45d3dc4 to
702187a
Compare
| public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions, | ||
| final Admin adminClient, | ||
| final Duration timeout) { | ||
| final long timeoutMs) { |
There was a problem hiding this comment.
It seems to me Duration is more readable than long. Is there a reason to make this change?
There was a problem hiding this comment.
I think it's because we're now also calling it right after calling getAdminDefaultApiTimeoutMs, so it seems a bummer to create a Duration from millis and then immediately convert it back to millis.
| fetchEndOffsets( | ||
| allPartitions, | ||
| adminClient, | ||
| getAdminDefaultApiTimeoutMs(config) |
There was a problem hiding this comment.
The admin configs is built in KafkaStreams construction. Could we reuse it?
adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));There was a problem hiding this comment.
I did it this way since we need to get the admin's default.api.timeout in other places where we only have the streamsConfig, so we may as well just pass that as the argument to getAdminDefaultApiTimeoutMs
There was a problem hiding this comment.
We should update the JavaDocs that this method may throw a TimeoutException now. What make we wondering if this is a public API change? Was there any discussion on the original KIP about the behavior of allLocalStorePartitionLags ?
There was a problem hiding this comment.
This change might require a KIP... \cc @vvcephei @guozhangwang WDYT?
| final Admin adminClient) { | ||
| return fetchEndOffsets(partitions, adminClient, null); | ||
| public static int getAdminDefaultApiTimeoutMs(final StreamsConfig streamsConfig) { | ||
| final InternalAdminClientConfig dummyAdmin = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy")); |
There was a problem hiding this comment.
Could you add comment for the "dummy"?
vvcephei
left a comment
There was a problem hiding this comment.
LGTM! Just a few minor comments.
| public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions, | ||
| final Admin adminClient, | ||
| final Duration timeout) { | ||
| final long timeoutMs) { |
There was a problem hiding this comment.
I think it's because we're now also calling it right after calling getAdminDefaultApiTimeoutMs, so it seems a bummer to create a Duration from millis and then immediately convert it back to millis.
| mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"), | ||
| mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener), | ||
| mkEntry(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 9) | ||
| mkEntry(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 90_000) |
There was a problem hiding this comment.
Just curious, why the change from 9 to 90,000?
There was a problem hiding this comment.
I mean, because the value has to be larger than the request.timeout.ms which defaults to 30,000
|
Test this please |
|
It looks like all the review comments are addressed, and all the tests passed, so I'll proceed to merge. |
|
Oh, actually @ableegoldman , it looks like there was a conflict with the other PR I just merged. |
|
I just resolved the conflicts. |
|
Test this please |
|
Ok to test |
|
Retest this please |
|
... or not |
|
Test this please |
2 similar comments
|
Test this please |
|
Test this please |
|
Retest this please |
|
Test this please |
|
\o/ |
| EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andThrow(new RuntimeException()); | ||
| replay(adminClient); | ||
| assertThrows(StreamsException.class, () -> fetchEndOffsetsWithoutTimeout(emptyList(), adminClient)); | ||
| assertThrows(StreamsException.class, () -> fetchEndOffsets(emptyList(), adminClient, 60_000L)); |
There was a problem hiding this comment.
Should we pass in MAX_VALUE to avoid introducing test flakyness?
| EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result); | ||
| EasyMock.expect(result.all()).andStubReturn(allFuture); | ||
| EasyMock.expect(allFuture.get()).andThrow(new InterruptedException()); | ||
| EasyMock.expect(allFuture.get(60000L, TimeUnit.MILLISECONDS)).andThrow(new InterruptedException()); |
There was a problem hiding this comment.
As above (also below)
Also nit: 60_000L (if just 60 and TimeUnit.SECONDS?)
…hub.com/ableegoldman/kafka into MINOR-remove-extra-admin-timeout-config
vvcephei
left a comment
There was a problem hiding this comment.
Thanks for the update @ableegoldman
|
Test this please |
1 similar comment
|
Test this please |
|
|
||
| log.debug("Current changelog positions: {}", allChangelogPositions); | ||
| final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsetsWithoutTimeout(allPartitions, adminClient); | ||
| final Map<TopicPartition, ListOffsetsResultInfo> allEndOffsets = fetchEndOffsets(allPartitions, adminClient); |
There was a problem hiding this comment.
Can we please update the JavaDocs of allLocalStorePartitionLags to state that a StreamsException could be thrown?
|
Checkstyle error: |
|
Retest this please. |
1 similar comment
|
Retest this please. |
Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>
|
Merged to |
|
Thanks! I actually don't think there ended up being anything relevant t o2.5 in the final form of this PR. Except maybe adding |
|
Ack. In 2.5 we use the AdminClient directly in |
* 'trunk' of github.com:apache/kafka: (36 commits) Remove redundant `containsKey` call in KafkaProducer (apache#8761) KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723) KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749) KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238) KAFKA-9501: convert between active and standby without closing stores (apache#8248) KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739) MINOR: Log the reason for coordinator discovery failure (apache#8747) KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705) MINOR: remove unnecessary timeout for admin request (apache#8738) MINOR: Relax Percentiles test (apache#8748) MINOR: regression test for task assignor config (apache#8743) MINOR: Update documentation.html to refer to 2.6 (apache#8745) MINOR: Update documentation.html to refer to 2.5 (apache#8744) KAFKA-9673: Filter and Conditional SMTs (apache#8699) KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (apache#8720) KAFKA-10052: Harden assertion of topic settings in Connect integration tests (apache#8735) MINOR: Slight MetadataCache tweaks to avoid unnecessary work (apache#8728) KAFKA-9802; Increase transaction timeout in system tests to reduce flakiness (apache#8736) KAFKA-10050: kafka_log4j_appender.py fixed for JDK11 (apache#8731) KAFKA-9146: Add option to force delete active members in StreamsResetter (apache#8589) ... # Conflicts: # core/src/main/scala/kafka/log/Log.scala
|
Fair enough: #8772 |
Turns out
future.get()actually does apply the admin'sdefault.api.timeout.msconfig internally, so we don't need to worry about providing a timeout of our own. Who knew