From e10a12de915fcadef4f3f5b555745fc5a41c80ed Mon Sep 17 00:00:00 2001 From: huxihx Date: Fri, 6 Sep 2019 10:58:14 +0800 Subject: [PATCH 1/4] rebase KafkaAdminClient.java --- .../apache/kafka/clients/CommonClientConfigs.java | 4 ++++ .../kafka/clients/admin/AdminClientConfig.java | 14 +++++++++++--- .../kafka/clients/consumer/ConsumerConfig.java | 5 ++--- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index af47e55d75a6a..87e46f531a5d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -141,6 +141,10 @@ public class CommonClientConfigs { + "The value must be set lower than session.timeout.ms, but typically should be set no higher " + "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances."; + public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms"; + public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs that could block. " + + "This configuration is used as the default timeout for all client operations that do not explicitly accept a timeout parameter."; + /** * Postprocess the configuration so that exponential backoff is disabled when reconnect backoff * is explicitly configured but the maximum reconnect backoff is not explicitly configured. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java index 107eb56d63aed..ad62f1fbcd6d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java @@ -30,6 +30,7 @@ import java.util.Set; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; /** @@ -107,6 +108,7 @@ public class AdminClientConfig extends AbstractConfig { private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC; public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; + public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG; /** * security.providers @@ -143,7 +145,7 @@ public class AdminClientConfig extends AbstractConfig { RETRY_BACKOFF_MS_DOC) .define(REQUEST_TIMEOUT_MS_CONFIG, Type.INT, - 120000, + 30000, atLeast(0), Importance.MEDIUM, REQUEST_TIMEOUT_MS_DOC) @@ -154,10 +156,16 @@ public class AdminClientConfig extends AbstractConfig { CONNECTIONS_MAX_IDLE_MS_DOC) .define(RETRIES_CONFIG, Type.INT, - 5, - atLeast(0), + Integer.MAX_VALUE, + between(0, Integer.MAX_VALUE), Importance.LOW, CommonClientConfigs.RETRIES_DOC) + .define(DEFAULT_API_TIMEOUT_MS_CONFIG, + Type.INT, + 60000, + atLeast(0), + Importance.MEDIUM, + CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC) .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, 30000, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 67897cfd54fb3..ce7a595e58613 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -222,8 +222,7 @@ public class ConsumerConfig extends AbstractConfig { private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; /** default.api.timeout.ms */ - public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms"; - public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for consumer APIs that could block. This configuration is used as the default timeout for all consumer operations that do not explicitly accept a timeout parameter."; + public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG; /** interceptor.classes */ public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; @@ -447,7 +446,7 @@ public class ConsumerConfig extends AbstractConfig { 60 * 1000, atLeast(0), Importance.MEDIUM, - DEFAULT_API_TIMEOUT_MS_DOC) + CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC) /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, From 9c6856e3b84a60f83f628d202086209fef9694c1 Mon Sep 17 00:00:00 2001 From: huxihx Date: Fri, 29 Nov 2019 13:56:07 +0800 Subject: [PATCH 2/4] KAFKA-8503: Ignore retries config if a custom timeout is provided https://issues.apache.org/jira/browse/KAFKA-8503 The KIP-533 implementation. --- .../kafka/clients/admin/AbstractOptions.java | 14 ++-- .../org/apache/kafka/clients/admin/Admin.java | 12 +-- .../clients/admin/AlterConfigsOptions.java | 6 +- .../clients/admin/CreateAclsOptions.java | 6 +- .../clients/admin/CreateTopicsOptions.java | 6 +- .../clients/admin/DeleteAclsOptions.java | 6 +- .../clients/admin/DeleteTopicsOptions.java | 6 +- .../clients/admin/DescribeAclsOptions.java | 6 +- .../clients/admin/DescribeClusterOptions.java | 6 +- .../clients/admin/DescribeConfigsOptions.java | 6 +- .../clients/admin/DescribeTopicsOptions.java | 6 +- .../kafka/clients/admin/KafkaAdminClient.java | 75 ++++++++++--------- .../clients/admin/ListTopicsOptions.java | 6 +- .../clients/admin/AdminClientUnitTestEnv.java | 2 +- .../clients/admin/KafkaAdminClientTest.java | 30 ++++---- .../scala/kafka/admin/ConfigCommand.scala | 4 +- .../kafka/admin/ConsumerGroupCommand.scala | 2 +- .../kafka/admin/LeaderElectionCommand.scala | 2 +- ...referredReplicaLeaderElectionCommand.scala | 2 +- .../admin/ReassignPartitionsCommand.scala | 2 +- .../scala/kafka/tools/StreamsResetter.java | 2 +- .../kafka/api/BaseAdminIntegrationTest.scala | 2 +- .../DescribeAuthorizedOperationsTest.scala | 2 +- .../api/PlaintextAdminIntegrationTest.scala | 10 +-- .../kafka/api/SslAdminIntegrationTest.scala | 2 +- .../admin/LeaderElectionCommandTest.scala | 2 +- .../admin/ReassignPartitionsClusterTest.scala | 2 +- .../kafka/trogdor/common/WorkerUtils.java | 8 +- 28 files changed, 121 insertions(+), 114 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java index ccccf118a1abd..d8c139d6863da 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java @@ -23,24 +23,24 @@ */ public abstract class AbstractOptions { - protected Integer timeoutMs = null; + protected Integer apiTimeoutMs = null; /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. */ @SuppressWarnings("unchecked") - public T timeoutMs(Integer timeoutMs) { - this.timeoutMs = timeoutMs; + public T apiTimeoutMs(Integer apiTimeoutMs) { + this.apiTimeoutMs = apiTimeoutMs; return (T) this; } /** - * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * The api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. */ - public Integer timeoutMs() { - return timeoutMs; + public Integer apiTimeoutMs() { + return apiTimeoutMs; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index f041f680b4564..be7b8f4110416 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -562,7 +562,7 @@ default CreatePartitionsResult createPartitions(Map newPa *
  • {@link org.apache.kafka.common.errors.AuthorizationException} * if the authenticated user is not authorized to alter the topic
  • *
  • {@link org.apache.kafka.common.errors.TimeoutException} - * if the request was not completed in within the given {@link CreatePartitionsOptions#timeoutMs()}.
  • + * if the request was not completed in within the given {@link CreatePartitionsOptions#apiTimeoutMs()}. *
  • {@link org.apache.kafka.common.errors.ReassignmentInProgressException} * if a partition reassignment is currently in progress
  • *
  • {@link org.apache.kafka.common.errors.BrokerNotAvailableException} @@ -637,7 +637,7 @@ default CreateDelegationTokenResult createDelegationToken() { *
  • {@link org.apache.kafka.common.errors.DelegationTokenDisabledException} * if the delegation token feature is disabled.
  • *
  • {@link org.apache.kafka.common.errors.TimeoutException} - * if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}.
  • + * if the request was not completed in within the given {@link CreateDelegationTokenOptions#apiTimeoutMs()}. * * * @param options The options to use when creating delegation token. @@ -678,7 +678,7 @@ default RenewDelegationTokenResult renewDelegationToken(byte[] hmac) { *
  • {@link org.apache.kafka.common.errors.DelegationTokenExpiredException} * if the delegation token is expired.
  • *
  • {@link org.apache.kafka.common.errors.TimeoutException} - * if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}.
  • + * if the request was not completed in within the given {@link RenewDelegationTokenOptions#apiTimeoutMs()}. * * * @param hmac HMAC of the Delegation token @@ -719,7 +719,7 @@ default ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) { *
  • {@link org.apache.kafka.common.errors.DelegationTokenExpiredException} * if the delegation token is expired.
  • *
  • {@link org.apache.kafka.common.errors.TimeoutException} - * if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}.
  • + * if the request was not completed in within the given {@link ExpireDelegationTokenOptions#apiTimeoutMs()}. * * * @param hmac HMAC of the Delegation token @@ -753,7 +753,7 @@ default DescribeDelegationTokenResult describeDelegationToken() { *
  • {@link org.apache.kafka.common.errors.DelegationTokenDisabledException} * if the delegation token feature is disabled.
  • *
  • {@link org.apache.kafka.common.errors.TimeoutException} - * if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}.
  • + * if the request was not completed in within the given {@link DescribeDelegationTokenOptions#apiTimeoutMs()}. * * * @param options The options to use when describing delegation tokens. @@ -898,7 +898,7 @@ default ElectPreferredLeadersResult electPreferredLeaders(Collection partitions, ElectPreferredLeadersOptions options) { final ElectLeadersOptions newOptions = new ElectLeadersOptions(); - newOptions.timeoutMs(options.timeoutMs()); + newOptions.apiTimeoutMs(options.apiTimeoutMs()); final Set topicPartitions = partitions == null ? null : new HashSet<>(partitions); return new ElectPreferredLeadersResult(electLeaders(ElectionType.PREFERRED, topicPartitions, newOptions)); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java index 0b280532104e8..620c30469fb5a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java @@ -32,13 +32,13 @@ public class AlterConfigsOptions extends AbstractOptions { private boolean validateOnly = false; /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ // This method is retained to keep binary compatibility with 0.11 - public AlterConfigsOptions timeoutMs(Integer timeoutMs) { - this.timeoutMs = timeoutMs; + public AlterConfigsOptions apiTimeoutMs(Integer apiTimeoutMs) { + this.apiTimeoutMs = apiTimeoutMs; return this; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java index bfb8e32db157f..32472d996f956 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java @@ -30,13 +30,13 @@ public class CreateAclsOptions extends AbstractOptions { /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ // This method is retained to keep binary compatibility with 0.11 - public CreateAclsOptions timeoutMs(Integer timeoutMs) { - this.timeoutMs = timeoutMs; + public CreateAclsOptions apiTimeoutMs(Integer apiTimeoutMs) { + this.apiTimeoutMs = apiTimeoutMs; return this; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java index a9f1009c2fc7e..440fd5ad2c06f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java @@ -32,13 +32,13 @@ public class CreateTopicsOptions extends AbstractOptions { private boolean validateOnly = false; /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ // This method is retained to keep binary compatibility with 0.11 - public CreateTopicsOptions timeoutMs(Integer timeoutMs) { - this.timeoutMs = timeoutMs; + public CreateTopicsOptions apiTimeoutMs(Integer apiTimeoutMs) { + this.apiTimeoutMs = apiTimeoutMs; return this; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java index 1b67da52f386d..07d20faf19f66 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java @@ -30,13 +30,13 @@ public class DeleteAclsOptions extends AbstractOptions { /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ // This method is retained to keep binary compatibility with 0.11 - public DeleteAclsOptions timeoutMs(Integer timeoutMs) { - this.timeoutMs = timeoutMs; + public DeleteAclsOptions apiTimeoutMs(Integer apiTimeoutMs) { + this.apiTimeoutMs = apiTimeoutMs; return this; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java index 91e38a196fc9a..b7ee749634dd7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java @@ -30,13 +30,13 @@ public class DeleteTopicsOptions extends AbstractOptions { /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ // This method is retained to keep binary compatibility with 0.11 - public DeleteTopicsOptions timeoutMs(Integer timeoutMs) { - this.timeoutMs = timeoutMs; + public DeleteTopicsOptions apiTimeoutMs(Integer apiTimeoutMs) { + this.apiTimeoutMs = apiTimeoutMs; return this; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java index b17d6a7d0cb98..628a6f8ce1f32 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java @@ -29,13 +29,13 @@ public class DescribeAclsOptions extends AbstractOptions { /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ // This method is retained to keep binary compatibility with 0.11 - public DescribeAclsOptions timeoutMs(Integer timeoutMs) { - this.timeoutMs = timeoutMs; + public DescribeAclsOptions apiTimeoutMs(Integer apiTimeoutMs) { + this.apiTimeoutMs = apiTimeoutMs; return this; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java index 670feda0d2614..2e01609c26cde 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java @@ -30,13 +30,13 @@ public class DescribeClusterOptions extends AbstractOptions requestBuilder; try { requestBuilder = call.createRequest(timeoutMs); @@ -1353,7 +1360,7 @@ public CreateTopicsResult createTopics(final Collection newTopics, } } final long now = time.milliseconds(); - Call call = new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()), + Call call = new Call("createTopics", calcDeadlineMs(now, options.apiTimeoutMs()), new ControllerNodeProvider()) { @Override @@ -1450,7 +1457,7 @@ public DeleteTopicsResult deleteTopics(Collection topicNames, } } final long now = time.milliseconds(); - Call call = new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()), + Call call = new Call("deleteTopics", calcDeadlineMs(now, options.apiTimeoutMs()), new ControllerNodeProvider()) { @Override @@ -1510,7 +1517,7 @@ void handleFailure(Throwable throwable) { public ListTopicsResult listTopics(final ListTopicsOptions options) { final KafkaFutureImpl> topicListingFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); - runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("listTopics", calcDeadlineMs(now, options.apiTimeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -1555,7 +1562,7 @@ public DescribeTopicsResult describeTopics(final Collection topicNames, } } final long now = time.milliseconds(); - Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), + Call call = new Call("describeTopics", calcDeadlineMs(now, options.apiTimeoutMs()), new LeastLoadedNodeProvider()) { private boolean supportsDisablingTopicCreation = true; @@ -1639,7 +1646,7 @@ public DescribeClusterResult describeCluster(DescribeClusterOptions options) { final KafkaFutureImpl> authorizedOperationsFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); - runnable.call(new Call("listNodes", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("listNodes", calcDeadlineMs(now, options.apiTimeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -1691,7 +1698,7 @@ public DescribeAclsResult describeAcls(final AclBindingFilter filter, DescribeAc } final long now = time.milliseconds(); final KafkaFutureImpl> future = new KafkaFutureImpl<>(); - runnable.call(new Call("describeAcls", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("describeAcls", calcDeadlineMs(now, options.apiTimeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -1735,7 +1742,7 @@ public CreateAclsResult createAcls(Collection acls, CreateAclsOption } } } - runnable.call(new Call("createAcls", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("createAcls", calcDeadlineMs(now, options.apiTimeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -1783,7 +1790,7 @@ public DeleteAclsResult deleteAcls(Collection filters, DeleteA futures.put(filter, new KafkaFutureImpl<>()); } } - runnable.call(new Call("deleteAcls", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("deleteAcls", calcDeadlineMs(now, options.apiTimeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -1849,7 +1856,7 @@ public DescribeConfigsResult describeConfigs(Collection configRe final long now = time.milliseconds(); if (!unifiedRequestResources.isEmpty()) { - runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.apiTimeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -1896,7 +1903,7 @@ void handleFailure(Throwable throwable) { final KafkaFutureImpl brokerFuture = entry.getValue(); final ConfigResource resource = entry.getKey(); final int nodeId = Integer.parseInt(resource.name()); - runnable.call(new Call("describeBrokerConfigs", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("describeBrokerConfigs", calcDeadlineMs(now, options.apiTimeoutMs()), new ConstantNodeIdProvider(nodeId)) { @Override @@ -2011,7 +2018,7 @@ private Map> alterConfigs(Map> incrementalAlterConfigs(Map()); final long now = time.milliseconds(); - runnable.call(new Call("incrementalAlterConfigs", calcDeadlineMs(now, options.timeoutMs()), nodeProvider) { + runnable.call(new Call("incrementalAlterConfigs", calcDeadlineMs(now, options.apiTimeoutMs()), nodeProvider) { @Override public IncrementalAlterConfigsRequest.Builder createRequest(int timeoutMs) { @@ -2146,7 +2153,7 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map assignment = entry.getValue(); - runnable.call(new Call("alterReplicaLogDirs", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("alterReplicaLogDirs", calcDeadlineMs(now, options.apiTimeoutMs()), new ConstantNodeIdProvider(brokerId)) { @Override @@ -2192,7 +2199,7 @@ public DescribeLogDirsResult describeLogDirs(Collection brokers, Descri final long now = time.milliseconds(); for (final Integer brokerId: brokers) { - runnable.call(new Call("describeLogDirs", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("describeLogDirs", calcDeadlineMs(now, options.apiTimeoutMs()), new ConstantNodeIdProvider(brokerId)) { @Override @@ -2246,7 +2253,7 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection newPar .collect(Collectors.toMap(Map.Entry::getKey, e -> partitionDetails(e.getValue()))); final long now = time.milliseconds(); - runnable.call(new Call("createPartitions", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("createPartitions", calcDeadlineMs(now, options.apiTimeoutMs()), new ControllerNodeProvider()) { @Override @@ -2373,7 +2380,7 @@ public DeleteRecordsResult deleteRecords(final Map expiryTimeFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); - runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, options.apiTimeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -2542,7 +2549,7 @@ void handleFailure(Throwable throwable) { public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, final ExpireDelegationTokenOptions options) { final KafkaFutureImpl expiryTimeFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); - runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, options.apiTimeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -2576,7 +2583,7 @@ void handleFailure(Throwable throwable) { public DescribeDelegationTokenResult describeDelegationToken(final DescribeDelegationTokenOptions options) { final KafkaFutureImpl> tokensFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); - runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, options.apiTimeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -2652,7 +2659,7 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection context = new ConsumerGroupOperationContext<>(groupId, options, deadline, futures.get(groupId)); Call findCoordinatorCall = getFindCoordinatorCall(context, @@ -2897,7 +2904,7 @@ private synchronized void tryComplete() { public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { final KafkaFutureImpl> all = new KafkaFutureImpl<>(); final long nowMetadata = time.milliseconds(); - final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs()); + final long deadline = calcDeadlineMs(nowMetadata, options.apiTimeoutMs()); runnable.call(new Call("findAllBrokers", deadline, new LeastLoadedNodeProvider()) { @Override MetadataRequest.Builder createRequest(int timeoutMs) { @@ -2977,7 +2984,7 @@ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String grou final ListConsumerGroupOffsetsOptions options) { final KafkaFutureImpl> groupOffsetListingFuture = new KafkaFutureImpl<>(); final long startFindCoordinatorMs = time.milliseconds(); - final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.apiTimeoutMs()); ConsumerGroupOperationContext, ListConsumerGroupOffsetsOptions> context = new ConsumerGroupOperationContext<>(groupId, options, deadline, groupOffsetListingFuture); @@ -3051,7 +3058,7 @@ public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupI continue; final long startFindCoordinatorMs = time.milliseconds(); - final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.apiTimeoutMs()); ConsumerGroupOperationContext context = new ConsumerGroupOperationContext<>(groupId, options, deadline, future); Call findCoordinatorCall = getFindCoordinatorCall(context, @@ -3111,7 +3118,7 @@ public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets( } final long startFindCoordinatorMs = time.milliseconds(); - final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.apiTimeoutMs()); ConsumerGroupOperationContext, DeleteConsumerGroupOffsetsOptions> context = new ConsumerGroupOperationContext<>(groupId, options, deadline, future); @@ -3192,7 +3199,7 @@ public ElectLeadersResult electLeaders( ElectLeadersOptions options) { final KafkaFutureImpl>> electionFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); - runnable.call(new Call("electLeaders", calcDeadlineMs(now, options.timeoutMs()), + runnable.call(new Call("electLeaders", calcDeadlineMs(now, options.apiTimeoutMs()), new ControllerNodeProvider()) { @Override @@ -3257,7 +3264,7 @@ public AlterPartitionReassignmentsResult alterPartitionReassignments( } final long now = time.milliseconds(); - Call call = new Call("alterPartitionReassignments", calcDeadlineMs(now, options.timeoutMs()), + Call call = new Call("alterPartitionReassignments", calcDeadlineMs(now, options.apiTimeoutMs()), new ControllerNodeProvider()) { @Override @@ -3392,7 +3399,7 @@ public ListPartitionReassignmentsResult listPartitionReassignments(Optional> future = new KafkaFutureImpl<>(); @@ -3535,7 +3542,7 @@ public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, final KafkaFutureImpl> future = new KafkaFutureImpl<>(); final long startFindCoordinatorMs = time.milliseconds(); - final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.apiTimeoutMs()); ConsumerGroupOperationContext, AlterConsumerGroupOffsetsOptions> context = new ConsumerGroupOperationContext<>(groupId, options, deadline, future); @@ -3639,7 +3646,7 @@ public ListOffsetsResult listOffsets(Map topicPartit } final long nowMetadata = time.milliseconds(); - final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs()); + final long deadline = calcDeadlineMs(nowMetadata, options.apiTimeoutMs()); MetadataOperationContext context = new MetadataOperationContext<>(topics, options, deadline, futures); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java index e288e1828fd79..04b7c5194c728 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java @@ -30,13 +30,13 @@ public class ListTopicsOptions extends AbstractOptions { private boolean listInternal = false; /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ // This method is retained to keep binary compatibility with 0.11 - public ListTopicsOptions timeoutMs(Integer timeoutMs) { - this.timeoutMs = timeoutMs; + public ListTopicsOptions apiTimeoutMs(Integer apiTimeoutMs) { + this.apiTimeoutMs = apiTimeoutMs; return this; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java index f6a808f74727e..137ec03ccf7c0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java @@ -119,7 +119,7 @@ public void close() { static Map clientConfigs(String... overrides) { Map map = new HashMap<>(); map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121"); - map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"); + map.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "1000"); if (overrides.length % 2 != 0) { throw new IllegalStateException(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 62476be9ca997..9b93a8dbdac18 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -209,7 +209,7 @@ public void testPrettyPrintException() { private static Map newStrMap(String... vals) { Map map = new HashMap<>(); map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121"); - map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"); + map.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "1000"); if (vals.length % 2 != 0) { throw new IllegalStateException(); } @@ -340,12 +340,12 @@ private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors @Test public void testTimeoutWithoutMetadata() throws Exception { try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, mockBootstrapCluster(), - newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) { + newStrMap(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10"))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(prepareCreateTopicsResponse("myTopic", Errors.NONE)); KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().timeoutMs(1000)).all(); + new CreateTopicsOptions().apiTimeoutMs(1000)).all(); TestUtils.assertFutureError(future, TimeoutException.class); } } @@ -368,7 +368,7 @@ public void testConnectionFailureOnMetadataUpdate() throws Exception { KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().timeoutMs(10000)).all(); + new CreateTopicsOptions().apiTimeoutMs(10000)).all(); future.get(); } @@ -393,7 +393,7 @@ public void testUnreachableBootstrapServer() throws Exception { KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().timeoutMs(10000)).all(); + new CreateTopicsOptions().apiTimeoutMs(10000)).all(); future.get(); } @@ -407,14 +407,14 @@ public void testPropagatedMetadataFetchException() throws Exception { Cluster cluster = mockCluster(0); try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster, newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121", - AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) { + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10"))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().createPendingAuthenticationError(cluster.nodeById(0), TimeUnit.DAYS.toMillis(1)); env.kafkaClient().prepareResponse(prepareCreateTopicsResponse("myTopic", Errors.NONE)); KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().timeoutMs(1000)).all(); + new CreateTopicsOptions().apiTimeoutMs(1000)).all(); TestUtils.assertFutureError(future, SaslAuthenticationException.class); } } @@ -427,7 +427,7 @@ public void testCreateTopics() throws Exception { prepareCreateTopicsResponse("myTopic", Errors.NONE)); KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().timeoutMs(10000)).all(); + new CreateTopicsOptions().apiTimeoutMs(10000)).all(); future.get(); } } @@ -459,7 +459,7 @@ public void testCreateTopicsRetryBackoff() throws Exception { KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().timeoutMs(10000)).all(); + new CreateTopicsOptions().apiTimeoutMs(10000)).all(); // Wait until the first attempt has failed, then advance the time TestUtils.waitForCondition(() -> mockClient.numAwaitingResponses() == 1, @@ -495,7 +495,7 @@ public void testCreateTopicsHandleNotControllerException() throws Exception { env.cluster().nodeById(1)); KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().timeoutMs(10000)).all(); + new CreateTopicsOptions().apiTimeoutMs(10000)).all(); future.get(); } } @@ -567,7 +567,7 @@ public void testMetadataRetries() throws Exception { try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster, newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999", - AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000000", + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000000", AdminClientConfig.RETRIES_CONFIG, "0"))) { // The first request fails with a disconnect @@ -600,7 +600,7 @@ public void testMetadataRetries() throws Exception { public void testAdminClientApisAuthenticationFailure() throws Exception { Cluster cluster = mockBootstrapCluster(); try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster, - newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) { + newStrMap(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "1000"))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().createPendingAuthenticationError(cluster.nodes().get(0), TimeUnit.DAYS.toMillis(1)); @@ -612,7 +612,7 @@ private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTe try { env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().timeoutMs(10000)).all().get(); + new CreateTopicsOptions().apiTimeoutMs(10000)).all().get(); fail("Expected an authentication error."); } catch (ExecutionException e) { assertTrue("Expected an authentication error, but got " + Utils.stackTrace(e), @@ -818,7 +818,7 @@ public void testElectLeaders() throws Exception { results = env.adminClient().electLeaders( electionType, new HashSet<>(asList(topic1, topic2)), - new ElectLeadersOptions().timeoutMs(100)); + new ElectLeadersOptions().apiTimeoutMs(100)); TestUtils.assertFutureError(results.partitions(), TimeoutException.class); } } @@ -846,7 +846,7 @@ public void testHandleTimeout() throws Exception { // Make a request with an extremely short timeout. // Then wait for it to fail by not supplying any response. log.info("Starting AdminClient#listTopics..."); - final ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().timeoutMs(1000)); + final ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().apiTimeoutMs(1000)); TestUtils.waitForCondition(new TestCondition() { @Override public boolean conditionMet() { diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 9203334e4615e..5442defcbf83b 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -314,7 +314,7 @@ object ConfigCommand extends Config { throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}") val newConfig = new JConfig(newEntries.asJava.values) - val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) + val alterOptions = new AlterConfigsOptions().apiTimeoutMs(30000).validateOnly(false) adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) } else if (entityType == BrokerLoggerConfigType) { val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName) @@ -324,7 +324,7 @@ object ConfigCommand extends Config { if (invalidBrokerLoggers.nonEmpty) throw new InvalidConfigurationException(s"Invalid broker logger(s): ${invalidBrokerLoggers.mkString(",")}") - val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) + val alterOptions = new AlterConfigsOptions().apiTimeoutMs(30000).validateOnly(false) val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) } ).asJavaCollection diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 3de3b1a836356..d286237f76af9 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -645,7 +645,7 @@ object ConsumerGroupCommand extends Logging { private def withTimeoutMs [T <: AbstractOptions[T]] (options : T) = { val t = opts.options.valueOf(opts.timeoutMsOpt).intValue() - options.timeoutMs(t) + options.apiTimeoutMs(t) } private def parseTopicPartitionsToReset(groupId: String, topicArgs: Seq[String]): Seq[TopicPartition] = topicArgs.flatMap { diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala index ff349f6f3612e..a02c15fa82700 100644 --- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala @@ -79,7 +79,7 @@ object LeaderElectionCommand extends Logging { AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.options.valueOf(commandOptions.bootstrapServer) ) - props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toMillis.toString) + props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString) JAdminClient.create(props) } diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 04765a8b20600..51db844645d0b 100755 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -72,7 +72,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { else new Properties() adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOpts.options.valueOf(commandOpts.bootstrapServerOpt)) - adminProps.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toString) + adminProps.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toString) new AdminClientCommand(adminProps) } diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 934191ac39e2a..5710e51143039 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -598,7 +598,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient, private def alterReplicaLogDirsIgnoreReplicaNotAvailable(replicaAssignment: Map[TopicPartitionReplica, String], adminClient: Admin, timeoutMs: Long): Set[TopicPartitionReplica] = { - val alterReplicaLogDirsResult = adminClient.alterReplicaLogDirs(replicaAssignment.asJava, new AlterReplicaLogDirsOptions().timeoutMs(timeoutMs.toInt)) + val alterReplicaLogDirsResult = adminClient.alterReplicaLogDirs(replicaAssignment.asJava, new AlterReplicaLogDirsOptions().apiTimeoutMs(timeoutMs.toInt)) val replicasAssignedToFutureDir = alterReplicaLogDirsResult.values().asScala.flatMap { case (replica, future) => { try { future.get() diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 574e9c66e291a..300065e92f11f 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -182,7 +182,7 @@ private void validateNoActiveConsumers(final String groupId, final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups( Collections.singleton(groupId), - new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000)); + new DescribeConsumerGroupsOptions().apiTimeoutMs(10 * 1000)); final List members = new ArrayList<>(describeResult.describedGroups().get(groupId).get().members()); if (!members.isEmpty()) { diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index d680dfbba7886..bf0afff883a12 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -202,7 +202,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg def createConfig(): util.Map[String, Object] = { val config = new util.HashMap[String, Object] config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000") + config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "20000") val securityProps: util.Map[Object, Object] = adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) } diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala index 3022af142eba0..673643046bc09 100644 --- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala +++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala @@ -108,7 +108,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS def createConfig(): Properties = { val adminClientConfig = new Properties() adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - adminClientConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000") + adminClientConfig.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "20000") val securityProps: util.Map[Object, Object] = TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) securityProps.asScala.foreach { case (key, value) => adminClientConfig.put(key.asInstanceOf[String], value) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 1f1e96d46b411..d4a0b7cc56115 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -957,7 +957,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be // cancelled by the close operation. val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava, - new CreateTopicsOptions().timeoutMs(900000)).all() + new CreateTopicsOptions().apiTimeoutMs(900000)).all() client.close(time.Duration.ZERO) assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) } @@ -970,11 +970,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testMinimumRequestTimeouts(): Unit = { val config = createConfig() config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") - config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0") + config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "0") client = AdminClient.create(config) val startTimeMs = Time.SYSTEM.milliseconds() val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava, - new CreateTopicsOptions().timeoutMs(2)).all() + new CreateTopicsOptions().apiTimeoutMs(2)).all() assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) val endTimeMs = Time.SYSTEM.milliseconds() assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs) @@ -986,7 +986,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testCallInFlightTimeouts(): Unit = { val config = createConfig() - config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100000000") + config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000") val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory() client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory) val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava, @@ -1349,7 +1349,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1)) // ... now what happens if we try to elect the preferred leader and it's down? - val shortTimeout = new ElectLeadersOptions().timeoutMs(10000) + val shortTimeout = new ElectLeadersOptions().apiTimeoutMs(10000) electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava, shortTimeout) assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet) exception = electResult.partitions.get.get(partition1).get diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala index dee00f616e810..d44e9d4ec06a9 100644 --- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala @@ -271,7 +271,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { private def createAdminClient: AdminClient = { val config = createConfig() - config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000") + config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "40000") val client = AdminClient.create(config) adminClients += client client diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala index 45fb7b9ebc3e8..948f77db53114 100644 --- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala @@ -309,7 +309,7 @@ object LeaderElectionCommandTest { def createConfig(servers: Seq[KafkaServer]): Map[String, Object] = { Map( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers(servers), - AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "20000" + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG -> "20000" ) } diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index 72e1408a861f0..e8fcb8b192c18 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -66,7 +66,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { def createAdminClient(servers: Seq[KafkaServer]): Admin = { val props = new Properties() props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers)) - props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000") JAdminClient.create(props) } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index faf2d964bfa5f..04c6cd4dc36b5 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -277,7 +277,7 @@ private static Map topicDescriptions(Collection getMatchingTopicPartitions( // first get list of matching topics List matchedTopics = new ArrayList<>(); ListTopicsResult res = adminClient.listTopics( - new ListTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); + new ListTopicsOptions().apiTimeoutMs(ADMIN_REQUEST_TIMEOUT)); Map topicListingMap = res.namesToListings().get(); for (Map.Entry topicListingEntry: topicListingMap.entrySet()) { if (!topicListingEntry.getValue().isInternal() @@ -319,7 +319,7 @@ static Collection getMatchingTopicPartitions( // create a list of topic/partitions List out = new ArrayList<>(); DescribeTopicsResult topicsResult = adminClient.describeTopics( - matchedTopics, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); + matchedTopics, new DescribeTopicsOptions().apiTimeoutMs(ADMIN_REQUEST_TIMEOUT)); Map topicDescriptionMap = topicsResult.all().get(); for (TopicDescription desc: topicDescriptionMap.values()) { List partitions = desc.partitions(); @@ -337,7 +337,7 @@ private static Admin createAdminClient( Map commonClientConf, Map adminClientConf) { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ADMIN_REQUEST_TIMEOUT); + props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, ADMIN_REQUEST_TIMEOUT); // first add common client config, and then admin client config to properties, possibly // over-writing default or common properties. addConfigsToProperties(props, commonClientConf, adminClientConf); From b84363eba01053eb0064ddd8430286707b19150c Mon Sep 17 00:00:00 2001 From: huxihx Date: Fri, 6 Dec 2019 11:19:29 +0800 Subject: [PATCH 3/4] addressed Jason's comments --- .../kafka/clients/CommonClientConfigs.java | 2 +- .../kafka/clients/admin/AbstractOptions.java | 10 +- .../org/apache/kafka/clients/admin/Admin.java | 12 +- .../clients/admin/AlterConfigsOptions.java | 4 +- .../clients/admin/CreateAclsOptions.java | 4 +- .../clients/admin/CreateTopicsOptions.java | 4 +- .../clients/admin/DeleteAclsOptions.java | 4 +- .../clients/admin/DeleteTopicsOptions.java | 4 +- .../clients/admin/DescribeAclsOptions.java | 4 +- .../clients/admin/DescribeClusterOptions.java | 4 +- .../clients/admin/DescribeConfigsOptions.java | 4 +- .../clients/admin/DescribeTopicsOptions.java | 4 +- .../kafka/clients/admin/KafkaAdminClient.java | 105 ++++++++++++------ .../clients/admin/ListTopicsOptions.java | 4 +- .../clients/admin/AdminClientUnitTestEnv.java | 2 +- .../clients/admin/KafkaAdminClientTest.java | 45 +++++--- .../scala/kafka/admin/ConfigCommand.scala | 4 +- .../kafka/admin/ConsumerGroupCommand.scala | 2 +- .../kafka/admin/LeaderElectionCommand.scala | 1 + ...referredReplicaLeaderElectionCommand.scala | 3 +- .../admin/ReassignPartitionsCommand.scala | 2 +- .../scala/kafka/tools/StreamsResetter.java | 2 +- .../kafka/api/BaseAdminIntegrationTest.scala | 2 +- .../DescribeAuthorizedOperationsTest.scala | 2 +- .../api/PlaintextAdminIntegrationTest.scala | 8 +- .../admin/LeaderElectionCommandTest.scala | 3 +- .../admin/ReassignPartitionsClusterTest.scala | 1 + .../kafka/trogdor/common/WorkerUtils.java | 8 +- 28 files changed, 153 insertions(+), 101 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 87e46f531a5d8..f1736eebb8da4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -142,7 +142,7 @@ public class CommonClientConfigs { + "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances."; public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms"; - public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs that could block. " + + public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs. " + "This configuration is used as the default timeout for all client operations that do not explicitly accept a timeout parameter."; /** diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java index d8c139d6863da..08517e33129a6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java @@ -23,15 +23,15 @@ */ public abstract class AbstractOptions { - protected Integer apiTimeoutMs = null; + protected Integer timeoutMs = null; /** * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. */ @SuppressWarnings("unchecked") - public T apiTimeoutMs(Integer apiTimeoutMs) { - this.apiTimeoutMs = apiTimeoutMs; + public T timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; return (T) this; } @@ -39,8 +39,8 @@ public T apiTimeoutMs(Integer apiTimeoutMs) { * The api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. */ - public Integer apiTimeoutMs() { - return apiTimeoutMs; + public Integer timeoutMs() { + return timeoutMs; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index be7b8f4110416..f041f680b4564 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -562,7 +562,7 @@ default CreatePartitionsResult createPartitions(Map newPa *
  • {@link org.apache.kafka.common.errors.AuthorizationException} * if the authenticated user is not authorized to alter the topic
  • *
  • {@link org.apache.kafka.common.errors.TimeoutException} - * if the request was not completed in within the given {@link CreatePartitionsOptions#apiTimeoutMs()}.
  • + * if the request was not completed in within the given {@link CreatePartitionsOptions#timeoutMs()}. *
  • {@link org.apache.kafka.common.errors.ReassignmentInProgressException} * if a partition reassignment is currently in progress
  • *
  • {@link org.apache.kafka.common.errors.BrokerNotAvailableException} @@ -637,7 +637,7 @@ default CreateDelegationTokenResult createDelegationToken() { *
  • {@link org.apache.kafka.common.errors.DelegationTokenDisabledException} * if the delegation token feature is disabled.
  • *
  • {@link org.apache.kafka.common.errors.TimeoutException} - * if the request was not completed in within the given {@link CreateDelegationTokenOptions#apiTimeoutMs()}.
  • + * if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}. * * * @param options The options to use when creating delegation token. @@ -678,7 +678,7 @@ default RenewDelegationTokenResult renewDelegationToken(byte[] hmac) { *
  • {@link org.apache.kafka.common.errors.DelegationTokenExpiredException} * if the delegation token is expired.
  • *
  • {@link org.apache.kafka.common.errors.TimeoutException} - * if the request was not completed in within the given {@link RenewDelegationTokenOptions#apiTimeoutMs()}.
  • + * if the request was not completed in within the given {@link RenewDelegationTokenOptions#timeoutMs()}. * * * @param hmac HMAC of the Delegation token @@ -719,7 +719,7 @@ default ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) { *
  • {@link org.apache.kafka.common.errors.DelegationTokenExpiredException} * if the delegation token is expired.
  • *
  • {@link org.apache.kafka.common.errors.TimeoutException} - * if the request was not completed in within the given {@link ExpireDelegationTokenOptions#apiTimeoutMs()}.
  • + * if the request was not completed in within the given {@link ExpireDelegationTokenOptions#timeoutMs()}. * * * @param hmac HMAC of the Delegation token @@ -753,7 +753,7 @@ default DescribeDelegationTokenResult describeDelegationToken() { *
  • {@link org.apache.kafka.common.errors.DelegationTokenDisabledException} * if the delegation token feature is disabled.
  • *
  • {@link org.apache.kafka.common.errors.TimeoutException} - * if the request was not completed in within the given {@link DescribeDelegationTokenOptions#apiTimeoutMs()}.
  • + * if the request was not completed in within the given {@link DescribeDelegationTokenOptions#timeoutMs()}. * * * @param options The options to use when describing delegation tokens. @@ -898,7 +898,7 @@ default ElectPreferredLeadersResult electPreferredLeaders(Collection partitions, ElectPreferredLeadersOptions options) { final ElectLeadersOptions newOptions = new ElectLeadersOptions(); - newOptions.apiTimeoutMs(options.apiTimeoutMs()); + newOptions.timeoutMs(options.timeoutMs()); final Set topicPartitions = partitions == null ? null : new HashSet<>(partitions); return new ElectPreferredLeadersResult(electLeaders(ElectionType.PREFERRED, topicPartitions, newOptions)); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java index 620c30469fb5a..ae5dc4c79ac02 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java @@ -37,8 +37,8 @@ public class AlterConfigsOptions extends AbstractOptions { * */ // This method is retained to keep binary compatibility with 0.11 - public AlterConfigsOptions apiTimeoutMs(Integer apiTimeoutMs) { - this.apiTimeoutMs = apiTimeoutMs; + public AlterConfigsOptions timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; return this; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java index 32472d996f956..d5f25fbbfeed2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java @@ -35,8 +35,8 @@ public class CreateAclsOptions extends AbstractOptions { * */ // This method is retained to keep binary compatibility with 0.11 - public CreateAclsOptions apiTimeoutMs(Integer apiTimeoutMs) { - this.apiTimeoutMs = apiTimeoutMs; + public CreateAclsOptions timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; return this; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java index 440fd5ad2c06f..88a521fdb6408 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java @@ -37,8 +37,8 @@ public class CreateTopicsOptions extends AbstractOptions { * */ // This method is retained to keep binary compatibility with 0.11 - public CreateTopicsOptions apiTimeoutMs(Integer apiTimeoutMs) { - this.apiTimeoutMs = apiTimeoutMs; + public CreateTopicsOptions timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; return this; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java index 07d20faf19f66..b0a8b4002aa43 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java @@ -35,8 +35,8 @@ public class DeleteAclsOptions extends AbstractOptions { * */ // This method is retained to keep binary compatibility with 0.11 - public DeleteAclsOptions apiTimeoutMs(Integer apiTimeoutMs) { - this.apiTimeoutMs = apiTimeoutMs; + public DeleteAclsOptions timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; return this; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java index b7ee749634dd7..f9459b7613044 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java @@ -35,8 +35,8 @@ public class DeleteTopicsOptions extends AbstractOptions { * */ // This method is retained to keep binary compatibility with 0.11 - public DeleteTopicsOptions apiTimeoutMs(Integer apiTimeoutMs) { - this.apiTimeoutMs = apiTimeoutMs; + public DeleteTopicsOptions timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; return this; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java index 628a6f8ce1f32..27d0cb4216bbe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java @@ -34,8 +34,8 @@ public class DescribeAclsOptions extends AbstractOptions { * */ // This method is retained to keep binary compatibility with 0.11 - public DescribeAclsOptions apiTimeoutMs(Integer apiTimeoutMs) { - this.apiTimeoutMs = apiTimeoutMs; + public DescribeAclsOptions timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; return this; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java index 2e01609c26cde..cea2660f2b49a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java @@ -35,8 +35,8 @@ public class DescribeClusterOptions extends AbstractOptions newTopics, } } final long now = time.milliseconds(); - Call call = new Call("createTopics", calcDeadlineMs(now, options.apiTimeoutMs()), + Call call = new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()) { @Override @@ -1457,7 +1488,7 @@ public DeleteTopicsResult deleteTopics(Collection topicNames, } } final long now = time.milliseconds(); - Call call = new Call("deleteTopics", calcDeadlineMs(now, options.apiTimeoutMs()), + Call call = new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()) { @Override @@ -1517,7 +1548,7 @@ void handleFailure(Throwable throwable) { public ListTopicsResult listTopics(final ListTopicsOptions options) { final KafkaFutureImpl> topicListingFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); - runnable.call(new Call("listTopics", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -1562,7 +1593,7 @@ public DescribeTopicsResult describeTopics(final Collection topicNames, } } final long now = time.milliseconds(); - Call call = new Call("describeTopics", calcDeadlineMs(now, options.apiTimeoutMs()), + Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { private boolean supportsDisablingTopicCreation = true; @@ -1646,7 +1677,7 @@ public DescribeClusterResult describeCluster(DescribeClusterOptions options) { final KafkaFutureImpl> authorizedOperationsFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); - runnable.call(new Call("listNodes", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("listNodes", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -1698,7 +1729,7 @@ public DescribeAclsResult describeAcls(final AclBindingFilter filter, DescribeAc } final long now = time.milliseconds(); final KafkaFutureImpl> future = new KafkaFutureImpl<>(); - runnable.call(new Call("describeAcls", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("describeAcls", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -1742,7 +1773,7 @@ public CreateAclsResult createAcls(Collection acls, CreateAclsOption } } } - runnable.call(new Call("createAcls", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("createAcls", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -1790,7 +1821,7 @@ public DeleteAclsResult deleteAcls(Collection filters, DeleteA futures.put(filter, new KafkaFutureImpl<>()); } } - runnable.call(new Call("deleteAcls", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("deleteAcls", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -1856,7 +1887,7 @@ public DescribeConfigsResult describeConfigs(Collection configRe final long now = time.milliseconds(); if (!unifiedRequestResources.isEmpty()) { - runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -1903,7 +1934,7 @@ void handleFailure(Throwable throwable) { final KafkaFutureImpl brokerFuture = entry.getValue(); final ConfigResource resource = entry.getKey(); final int nodeId = Integer.parseInt(resource.name()); - runnable.call(new Call("describeBrokerConfigs", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("describeBrokerConfigs", calcDeadlineMs(now, options.timeoutMs()), new ConstantNodeIdProvider(nodeId)) { @Override @@ -2018,7 +2049,7 @@ private Map> alterConfigs(Map> incrementalAlterConfigs(Map()); final long now = time.milliseconds(); - runnable.call(new Call("incrementalAlterConfigs", calcDeadlineMs(now, options.apiTimeoutMs()), nodeProvider) { + runnable.call(new Call("incrementalAlterConfigs", calcDeadlineMs(now, options.timeoutMs()), nodeProvider) { @Override public IncrementalAlterConfigsRequest.Builder createRequest(int timeoutMs) { @@ -2153,7 +2184,7 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map assignment = entry.getValue(); - runnable.call(new Call("alterReplicaLogDirs", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("alterReplicaLogDirs", calcDeadlineMs(now, options.timeoutMs()), new ConstantNodeIdProvider(brokerId)) { @Override @@ -2199,7 +2230,7 @@ public DescribeLogDirsResult describeLogDirs(Collection brokers, Descri final long now = time.milliseconds(); for (final Integer brokerId: brokers) { - runnable.call(new Call("describeLogDirs", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("describeLogDirs", calcDeadlineMs(now, options.timeoutMs()), new ConstantNodeIdProvider(brokerId)) { @Override @@ -2253,7 +2284,7 @@ public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection newPar .collect(Collectors.toMap(Map.Entry::getKey, e -> partitionDetails(e.getValue()))); final long now = time.milliseconds(); - runnable.call(new Call("createPartitions", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("createPartitions", calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()) { @Override @@ -2380,7 +2411,7 @@ public DeleteRecordsResult deleteRecords(final Map expiryTimeFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); - runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -2549,7 +2580,7 @@ void handleFailure(Throwable throwable) { public ExpireDelegationTokenResult expireDelegationToken(final byte[] hmac, final ExpireDelegationTokenOptions options) { final KafkaFutureImpl expiryTimeFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); - runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -2583,7 +2614,7 @@ void handleFailure(Throwable throwable) { public DescribeDelegationTokenResult describeDelegationToken(final DescribeDelegationTokenOptions options) { final KafkaFutureImpl> tokensFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); - runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override @@ -2659,7 +2690,7 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection context = new ConsumerGroupOperationContext<>(groupId, options, deadline, futures.get(groupId)); Call findCoordinatorCall = getFindCoordinatorCall(context, @@ -2904,7 +2935,7 @@ private synchronized void tryComplete() { public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { final KafkaFutureImpl> all = new KafkaFutureImpl<>(); final long nowMetadata = time.milliseconds(); - final long deadline = calcDeadlineMs(nowMetadata, options.apiTimeoutMs()); + final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs()); runnable.call(new Call("findAllBrokers", deadline, new LeastLoadedNodeProvider()) { @Override MetadataRequest.Builder createRequest(int timeoutMs) { @@ -2984,7 +3015,7 @@ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String grou final ListConsumerGroupOffsetsOptions options) { final KafkaFutureImpl> groupOffsetListingFuture = new KafkaFutureImpl<>(); final long startFindCoordinatorMs = time.milliseconds(); - final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.apiTimeoutMs()); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); ConsumerGroupOperationContext, ListConsumerGroupOffsetsOptions> context = new ConsumerGroupOperationContext<>(groupId, options, deadline, groupOffsetListingFuture); @@ -3058,7 +3089,7 @@ public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupI continue; final long startFindCoordinatorMs = time.milliseconds(); - final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.apiTimeoutMs()); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); ConsumerGroupOperationContext context = new ConsumerGroupOperationContext<>(groupId, options, deadline, future); Call findCoordinatorCall = getFindCoordinatorCall(context, @@ -3118,7 +3149,7 @@ public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets( } final long startFindCoordinatorMs = time.milliseconds(); - final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.apiTimeoutMs()); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); ConsumerGroupOperationContext, DeleteConsumerGroupOffsetsOptions> context = new ConsumerGroupOperationContext<>(groupId, options, deadline, future); @@ -3199,7 +3230,7 @@ public ElectLeadersResult electLeaders( ElectLeadersOptions options) { final KafkaFutureImpl>> electionFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); - runnable.call(new Call("electLeaders", calcDeadlineMs(now, options.apiTimeoutMs()), + runnable.call(new Call("electLeaders", calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()) { @Override @@ -3264,7 +3295,7 @@ public AlterPartitionReassignmentsResult alterPartitionReassignments( } final long now = time.milliseconds(); - Call call = new Call("alterPartitionReassignments", calcDeadlineMs(now, options.apiTimeoutMs()), + Call call = new Call("alterPartitionReassignments", calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()) { @Override @@ -3399,7 +3430,7 @@ public ListPartitionReassignmentsResult listPartitionReassignments(Optional> future = new KafkaFutureImpl<>(); @@ -3542,7 +3573,7 @@ public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, final KafkaFutureImpl> future = new KafkaFutureImpl<>(); final long startFindCoordinatorMs = time.milliseconds(); - final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.apiTimeoutMs()); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); ConsumerGroupOperationContext, AlterConsumerGroupOffsetsOptions> context = new ConsumerGroupOperationContext<>(groupId, options, deadline, future); @@ -3646,7 +3677,7 @@ public ListOffsetsResult listOffsets(Map topicPartit } final long nowMetadata = time.milliseconds(); - final long deadline = calcDeadlineMs(nowMetadata, options.apiTimeoutMs()); + final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs()); MetadataOperationContext context = new MetadataOperationContext<>(topics, options, deadline, futures); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java index 04b7c5194c728..b0818f6a8e460 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java @@ -35,8 +35,8 @@ public class ListTopicsOptions extends AbstractOptions { * */ // This method is retained to keep binary compatibility with 0.11 - public ListTopicsOptions apiTimeoutMs(Integer apiTimeoutMs) { - this.apiTimeoutMs = apiTimeoutMs; + public ListTopicsOptions timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; return this; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java index 137ec03ccf7c0..f6a808f74727e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java @@ -119,7 +119,7 @@ public void close() { static Map clientConfigs(String... overrides) { Map map = new HashMap<>(); map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121"); - map.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "1000"); + map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"); if (overrides.length % 2 != 0) { throw new IllegalStateException(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 9b93a8dbdac18..cf908c88057bb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.ClusterAuthorizationException; @@ -174,6 +175,22 @@ public class KafkaAdminClientTest { @Rule final public Timeout globalTimeout = Timeout.millis(120000); + @Test + public void testDefaultApiTimeoutAndRequestTimeoutConflicts() { + AdminClientConfig config = newConfMap(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "500"); + try (Admin client = KafkaAdminClient.createInternal(config, null)) { + fail("Expected KafkaException"); + } catch (KafkaException e) { + assertTrue(e.getCause() instanceof ConfigException); + } + + config = newConfMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000000"); + try (KafkaAdminClient client = KafkaAdminClient.createInternal(config, null)) { + // default api timeout should be overridden to request timeout. + assertEquals(client.defaultTimeoutMs(), client.requestTimeoutMs()); + } + } + @Test public void testGetOrCreateListValue() { Map> map = new HashMap<>(); @@ -209,7 +226,7 @@ public void testPrettyPrintException() { private static Map newStrMap(String... vals) { Map map = new HashMap<>(); map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121"); - map.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "1000"); + map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"); if (vals.length % 2 != 0) { throw new IllegalStateException(); } @@ -340,12 +357,12 @@ private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors @Test public void testTimeoutWithoutMetadata() throws Exception { try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, mockBootstrapCluster(), - newStrMap(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10"))) { + newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(prepareCreateTopicsResponse("myTopic", Errors.NONE)); KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().apiTimeoutMs(1000)).all(); + new CreateTopicsOptions().timeoutMs(1000)).all(); TestUtils.assertFutureError(future, TimeoutException.class); } } @@ -368,7 +385,7 @@ public void testConnectionFailureOnMetadataUpdate() throws Exception { KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().apiTimeoutMs(10000)).all(); + new CreateTopicsOptions().timeoutMs(10000)).all(); future.get(); } @@ -393,7 +410,7 @@ public void testUnreachableBootstrapServer() throws Exception { KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().apiTimeoutMs(10000)).all(); + new CreateTopicsOptions().timeoutMs(10000)).all(); future.get(); } @@ -407,14 +424,14 @@ public void testPropagatedMetadataFetchException() throws Exception { Cluster cluster = mockCluster(0); try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster, newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121", - AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10"))) { + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().createPendingAuthenticationError(cluster.nodeById(0), TimeUnit.DAYS.toMillis(1)); env.kafkaClient().prepareResponse(prepareCreateTopicsResponse("myTopic", Errors.NONE)); KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().apiTimeoutMs(1000)).all(); + new CreateTopicsOptions().timeoutMs(1000)).all(); TestUtils.assertFutureError(future, SaslAuthenticationException.class); } } @@ -427,7 +444,7 @@ public void testCreateTopics() throws Exception { prepareCreateTopicsResponse("myTopic", Errors.NONE)); KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().apiTimeoutMs(10000)).all(); + new CreateTopicsOptions().timeoutMs(10000)).all(); future.get(); } } @@ -459,7 +476,7 @@ public void testCreateTopicsRetryBackoff() throws Exception { KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().apiTimeoutMs(10000)).all(); + new CreateTopicsOptions().timeoutMs(10000)).all(); // Wait until the first attempt has failed, then advance the time TestUtils.waitForCondition(() -> mockClient.numAwaitingResponses() == 1, @@ -495,7 +512,7 @@ public void testCreateTopicsHandleNotControllerException() throws Exception { env.cluster().nodeById(1)); KafkaFuture future = env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().apiTimeoutMs(10000)).all(); + new CreateTopicsOptions().timeoutMs(10000)).all(); future.get(); } } @@ -600,7 +617,7 @@ public void testMetadataRetries() throws Exception { public void testAdminClientApisAuthenticationFailure() throws Exception { Cluster cluster = mockBootstrapCluster(); try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster, - newStrMap(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "1000"))) { + newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().createPendingAuthenticationError(cluster.nodes().get(0), TimeUnit.DAYS.toMillis(1)); @@ -612,7 +629,7 @@ private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTe try { env.adminClient().createTopics( Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), - new CreateTopicsOptions().apiTimeoutMs(10000)).all().get(); + new CreateTopicsOptions().timeoutMs(10000)).all().get(); fail("Expected an authentication error."); } catch (ExecutionException e) { assertTrue("Expected an authentication error, but got " + Utils.stackTrace(e), @@ -818,7 +835,7 @@ public void testElectLeaders() throws Exception { results = env.adminClient().electLeaders( electionType, new HashSet<>(asList(topic1, topic2)), - new ElectLeadersOptions().apiTimeoutMs(100)); + new ElectLeadersOptions().timeoutMs(100)); TestUtils.assertFutureError(results.partitions(), TimeoutException.class); } } @@ -846,7 +863,7 @@ public void testHandleTimeout() throws Exception { // Make a request with an extremely short timeout. // Then wait for it to fail by not supplying any response. log.info("Starting AdminClient#listTopics..."); - final ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().apiTimeoutMs(1000)); + final ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().timeoutMs(1000)); TestUtils.waitForCondition(new TestCondition() { @Override public boolean conditionMet() { diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 5442defcbf83b..9203334e4615e 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -314,7 +314,7 @@ object ConfigCommand extends Config { throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}") val newConfig = new JConfig(newEntries.asJava.values) - val alterOptions = new AlterConfigsOptions().apiTimeoutMs(30000).validateOnly(false) + val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) } else if (entityType == BrokerLoggerConfigType) { val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName) @@ -324,7 +324,7 @@ object ConfigCommand extends Config { if (invalidBrokerLoggers.nonEmpty) throw new InvalidConfigurationException(s"Invalid broker logger(s): ${invalidBrokerLoggers.mkString(",")}") - val alterOptions = new AlterConfigsOptions().apiTimeoutMs(30000).validateOnly(false) + val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) } ).asJavaCollection diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index d286237f76af9..3de3b1a836356 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -645,7 +645,7 @@ object ConsumerGroupCommand extends Logging { private def withTimeoutMs [T <: AbstractOptions[T]] (options : T) = { val t = opts.options.valueOf(opts.timeoutMsOpt).intValue() - options.apiTimeoutMs(t) + options.timeoutMs(t) } private def parseTopicPartitionsToReset(groupId: String, topicArgs: Seq[String]): Seq[TopicPartition] = topicArgs.flatMap { diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala index a02c15fa82700..bb885c6459ff0 100644 --- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala @@ -80,6 +80,7 @@ object LeaderElectionCommand extends Logging { commandOptions.options.valueOf(commandOptions.bootstrapServer) ) props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString) + props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis / 2).toString) JAdminClient.create(props) } diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 51db844645d0b..80b99e30dfa3e 100755 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -72,7 +72,8 @@ object PreferredReplicaLeaderElectionCommand extends Logging { else new Properties() adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOpts.options.valueOf(commandOpts.bootstrapServerOpt)) - adminProps.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toString) + adminProps.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toString) + adminProps.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, (timeout * 2).toString) new AdminClientCommand(adminProps) } diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 5710e51143039..934191ac39e2a 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -598,7 +598,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient, private def alterReplicaLogDirsIgnoreReplicaNotAvailable(replicaAssignment: Map[TopicPartitionReplica, String], adminClient: Admin, timeoutMs: Long): Set[TopicPartitionReplica] = { - val alterReplicaLogDirsResult = adminClient.alterReplicaLogDirs(replicaAssignment.asJava, new AlterReplicaLogDirsOptions().apiTimeoutMs(timeoutMs.toInt)) + val alterReplicaLogDirsResult = adminClient.alterReplicaLogDirs(replicaAssignment.asJava, new AlterReplicaLogDirsOptions().timeoutMs(timeoutMs.toInt)) val replicasAssignedToFutureDir = alterReplicaLogDirsResult.values().asScala.flatMap { case (replica, future) => { try { future.get() diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 300065e92f11f..574e9c66e291a 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -182,7 +182,7 @@ private void validateNoActiveConsumers(final String groupId, final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups( Collections.singleton(groupId), - new DescribeConsumerGroupsOptions().apiTimeoutMs(10 * 1000)); + new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000)); final List members = new ArrayList<>(describeResult.describedGroups().get(groupId).get().members()); if (!members.isEmpty()) { diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index bf0afff883a12..d680dfbba7886 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -202,7 +202,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg def createConfig(): util.Map[String, Object] = { val config = new util.HashMap[String, Object] config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "20000") + config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000") val securityProps: util.Map[Object, Object] = adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) } diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala index 673643046bc09..3022af142eba0 100644 --- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala +++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala @@ -108,7 +108,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS def createConfig(): Properties = { val adminClientConfig = new Properties() adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - adminClientConfig.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "20000") + adminClientConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000") val securityProps: util.Map[Object, Object] = TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) securityProps.asScala.foreach { case (key, value) => adminClientConfig.put(key.asInstanceOf[String], value) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index d4a0b7cc56115..895fabeabd46c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -957,7 +957,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be // cancelled by the close operation. val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava, - new CreateTopicsOptions().apiTimeoutMs(900000)).all() + new CreateTopicsOptions().timeoutMs(900000)).all() client.close(time.Duration.ZERO) assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) } @@ -970,11 +970,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testMinimumRequestTimeouts(): Unit = { val config = createConfig() config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") - config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "0") + config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0") client = AdminClient.create(config) val startTimeMs = Time.SYSTEM.milliseconds() val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava, - new CreateTopicsOptions().apiTimeoutMs(2)).all() + new CreateTopicsOptions().timeoutMs(2)).all() assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) val endTimeMs = Time.SYSTEM.milliseconds() assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs) @@ -1349,7 +1349,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1)) // ... now what happens if we try to elect the preferred leader and it's down? - val shortTimeout = new ElectLeadersOptions().apiTimeoutMs(10000) + val shortTimeout = new ElectLeadersOptions().timeoutMs(10000) electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava, shortTimeout) assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet) exception = electResult.partitions.get.get(partition1).get diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala index 948f77db53114..15843ad7838d5 100644 --- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala @@ -309,7 +309,8 @@ object LeaderElectionCommandTest { def createConfig(servers: Seq[KafkaServer]): Map[String, Object] = { Map( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers(servers), - AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG -> "20000" + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG -> "20000", + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "10000" ) } diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index e8fcb8b192c18..5ebab7aae90b7 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -67,6 +67,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { val props = new Properties() props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers)) props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000") + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000") JAdminClient.create(props) } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index 04c6cd4dc36b5..faf2d964bfa5f 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -277,7 +277,7 @@ private static Map topicDescriptions(Collection getMatchingTopicPartitions( // first get list of matching topics List matchedTopics = new ArrayList<>(); ListTopicsResult res = adminClient.listTopics( - new ListTopicsOptions().apiTimeoutMs(ADMIN_REQUEST_TIMEOUT)); + new ListTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); Map topicListingMap = res.namesToListings().get(); for (Map.Entry topicListingEntry: topicListingMap.entrySet()) { if (!topicListingEntry.getValue().isInternal() @@ -319,7 +319,7 @@ static Collection getMatchingTopicPartitions( // create a list of topic/partitions List out = new ArrayList<>(); DescribeTopicsResult topicsResult = adminClient.describeTopics( - matchedTopics, new DescribeTopicsOptions().apiTimeoutMs(ADMIN_REQUEST_TIMEOUT)); + matchedTopics, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); Map topicDescriptionMap = topicsResult.all().get(); for (TopicDescription desc: topicDescriptionMap.values()) { List partitions = desc.partitions(); @@ -337,7 +337,7 @@ private static Admin createAdminClient( Map commonClientConf, Map adminClientConf) { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, ADMIN_REQUEST_TIMEOUT); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ADMIN_REQUEST_TIMEOUT); // first add common client config, and then admin client config to properties, possibly // over-writing default or common properties. addConfigsToProperties(props, commonClientConf, adminClientConf); From 4f61615bde20f343342f2676c5f69507720c14da Mon Sep 17 00:00:00 2001 From: huxihx Date: Thu, 12 Dec 2019 12:00:02 +0800 Subject: [PATCH 4/4] test case added --- .../kafka/clients/admin/KafkaAdminClient.java | 5 ++- .../clients/admin/KafkaAdminClientTest.java | 43 +++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 49dbf75c99dc1..3044c3fdddcf9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -528,7 +528,7 @@ private KafkaAdminClient(AdminClientConfig config, * If no default.api.timeout.ms has been configured, then set its value as the max of the default and request.timeout.ms. Also we should probably log a warning. * Otherwise, use the provided values for both configurations. * - * @param config + * @param config The configuration */ private void configureDefaultApiTimeoutMsAndRequestTimeoutMs(AdminClientConfig config) { this.requestTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG); @@ -1036,7 +1036,8 @@ private long sendEligibleCalls(long now) { "Internal error sending %s to %s.", call.callName, node))); continue; } - ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true); + ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, + true, timeoutMs, null); log.trace("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId()); client.send(clientRequest, now); getOrCreateListValue(callsInFlight, node.idString()).add(call); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index cf908c88057bb..b92696eba7a73 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2729,6 +2729,49 @@ public void testGetSubLevelError() { errorsMap, memberIdentities.get(1), "For unit test").getClass()); } + @Test + public void testSingleRequestTimeoutAndRetryWithoutApiTimeout() throws Exception { + HashMap nodes = new HashMap<>(); + MockTime time = new MockTime(); + Node node0 = new Node(0, "localhost", 8121); + nodes.put(0, node0); + Cluster cluster = new Cluster("mockClusterId", nodes.values(), + Arrays.asList(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})), + Collections.emptySet(), Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + final int requestTimeoutMs = 1000; + final int retryBackoff = 100; + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoff), + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + assertEquals(time, env.time()); + assertEquals(env.time(), ((KafkaAdminClient) env.adminClient()).time()); + + final int apiTimeoutMs = 3000; + final long startTimeMs = time.milliseconds(); + final ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().timeoutMs(apiTimeoutMs)); + + // Wait until the first attempt has failed, then advance the time + TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(), "Timed out waiting for inFlightRequests"); + time.sleep(requestTimeoutMs + retryBackoff); + final long betweenTimeoutMs = time.milliseconds(); + + // Since api timeout bound is not hit, AdminClient should add the retry call to the queue + TestUtils.waitForCondition(() -> ((KafkaAdminClient) env.adminClient()).numPendingCalls() == 1, + "Failed to add retry listTopics call"); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + time.sleep(requestTimeoutMs); + + assertEquals(apiTimeoutMs - requestTimeoutMs - retryBackoff, + KafkaAdminClient.calcTimeoutMsRemainingAsInt(betweenTimeoutMs, apiTimeoutMs + startTimeMs)); + assertEquals(result.listings().get().size(), 1); + assertEquals("foo", result.listings().get().iterator().next().name()); + + } + } + private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member, MemberAssignment assignment) { return new MemberDescription(member.memberId(),