diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index af47e55d75a6a..f1736eebb8da4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -141,6 +141,10 @@ public class CommonClientConfigs { + "The value must be set lower than session.timeout.ms, but typically should be set no higher " + "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances."; + public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms"; + public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs. " + + "This configuration is used as the default timeout for all client operations that do not explicitly accept a timeout parameter."; + /** * Postprocess the configuration so that exponential backoff is disabled when reconnect backoff * is explicitly configured but the maximum reconnect backoff is not explicitly configured. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java index ccccf118a1abd..08517e33129a6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java @@ -26,7 +26,7 @@ public abstract class AbstractOptions { protected Integer timeoutMs = null; /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. */ @SuppressWarnings("unchecked") @@ -36,7 +36,7 @@ public T timeoutMs(Integer timeoutMs) { } /** - * The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * The api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. */ public Integer timeoutMs() { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java index 107eb56d63aed..ad62f1fbcd6d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java @@ -30,6 +30,7 @@ import java.util.Set; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; /** @@ -107,6 +108,7 @@ public class AdminClientConfig extends AbstractConfig { private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC; public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; + public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG; /** * security.providers @@ -143,7 +145,7 @@ public class AdminClientConfig extends AbstractConfig { RETRY_BACKOFF_MS_DOC) .define(REQUEST_TIMEOUT_MS_CONFIG, Type.INT, - 120000, + 30000, atLeast(0), Importance.MEDIUM, REQUEST_TIMEOUT_MS_DOC) @@ -154,10 +156,16 @@ public class AdminClientConfig extends AbstractConfig { CONNECTIONS_MAX_IDLE_MS_DOC) .define(RETRIES_CONFIG, Type.INT, - 5, - atLeast(0), + Integer.MAX_VALUE, + between(0, Integer.MAX_VALUE), Importance.LOW, CommonClientConfigs.RETRIES_DOC) + .define(DEFAULT_API_TIMEOUT_MS_CONFIG, + Type.INT, + 60000, + atLeast(0), + Importance.MEDIUM, + CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC) .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, 30000, diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java index 0b280532104e8..ae5dc4c79ac02 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java @@ -32,7 +32,7 @@ public class AlterConfigsOptions extends AbstractOptions { private boolean validateOnly = false; /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java index bfb8e32db157f..d5f25fbbfeed2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsOptions.java @@ -30,7 +30,7 @@ public class CreateAclsOptions extends AbstractOptions { /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java index a9f1009c2fc7e..88a521fdb6408 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java @@ -32,7 +32,7 @@ public class CreateTopicsOptions extends AbstractOptions { private boolean validateOnly = false; /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java index 1b67da52f386d..b0a8b4002aa43 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsOptions.java @@ -30,7 +30,7 @@ public class DeleteAclsOptions extends AbstractOptions { /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java index 91e38a196fc9a..f9459b7613044 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java @@ -30,7 +30,7 @@ public class DeleteTopicsOptions extends AbstractOptions { /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java index b17d6a7d0cb98..27d0cb4216bbe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsOptions.java @@ -29,7 +29,7 @@ public class DescribeAclsOptions extends AbstractOptions { /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java index 670feda0d2614..cea2660f2b49a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java @@ -30,7 +30,7 @@ public class DescribeClusterOptions extends AbstractOptions requestBuilder; try { requestBuilder = call.createRequest(timeoutMs); @@ -998,7 +1036,8 @@ private long sendEligibleCalls(long now) { "Internal error sending %s to %s.", call.callName, node))); continue; } - ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true); + ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, + true, timeoutMs, null); log.trace("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId()); client.send(clientRequest, now); getOrCreateListValue(callsInFlight, node.idString()).add(call); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java index e288e1828fd79..b0818f6a8e460 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java @@ -30,7 +30,7 @@ public class ListTopicsOptions extends AbstractOptions { private boolean listInternal = false; /** - * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the + * Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the * AdminClient should be used. * */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 67897cfd54fb3..ce7a595e58613 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -222,8 +222,7 @@ public class ConsumerConfig extends AbstractConfig { private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; /** default.api.timeout.ms */ - public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms"; - public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for consumer APIs that could block. This configuration is used as the default timeout for all consumer operations that do not explicitly accept a timeout parameter."; + public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG; /** interceptor.classes */ public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; @@ -447,7 +446,7 @@ public class ConsumerConfig extends AbstractConfig { 60 * 1000, atLeast(0), Importance.MEDIUM, - DEFAULT_API_TIMEOUT_MS_DOC) + CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC) /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 62476be9ca997..b92696eba7a73 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.ClusterAuthorizationException; @@ -174,6 +175,22 @@ public class KafkaAdminClientTest { @Rule final public Timeout globalTimeout = Timeout.millis(120000); + @Test + public void testDefaultApiTimeoutAndRequestTimeoutConflicts() { + AdminClientConfig config = newConfMap(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "500"); + try (Admin client = KafkaAdminClient.createInternal(config, null)) { + fail("Expected KafkaException"); + } catch (KafkaException e) { + assertTrue(e.getCause() instanceof ConfigException); + } + + config = newConfMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000000"); + try (KafkaAdminClient client = KafkaAdminClient.createInternal(config, null)) { + // default api timeout should be overridden to request timeout. + assertEquals(client.defaultTimeoutMs(), client.requestTimeoutMs()); + } + } + @Test public void testGetOrCreateListValue() { Map> map = new HashMap<>(); @@ -567,7 +584,7 @@ public void testMetadataRetries() throws Exception { try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster, newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999", - AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000000", + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000000", AdminClientConfig.RETRIES_CONFIG, "0"))) { // The first request fails with a disconnect @@ -2712,6 +2729,49 @@ public void testGetSubLevelError() { errorsMap, memberIdentities.get(1), "For unit test").getClass()); } + @Test + public void testSingleRequestTimeoutAndRetryWithoutApiTimeout() throws Exception { + HashMap nodes = new HashMap<>(); + MockTime time = new MockTime(); + Node node0 = new Node(0, "localhost", 8121); + nodes.put(0, node0); + Cluster cluster = new Cluster("mockClusterId", nodes.values(), + Arrays.asList(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})), + Collections.emptySet(), Collections.emptySet(), + Collections.emptySet(), nodes.get(0)); + + final int requestTimeoutMs = 1000; + final int retryBackoff = 100; + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoff), + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + assertEquals(time, env.time()); + assertEquals(env.time(), ((KafkaAdminClient) env.adminClient()).time()); + + final int apiTimeoutMs = 3000; + final long startTimeMs = time.milliseconds(); + final ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().timeoutMs(apiTimeoutMs)); + + // Wait until the first attempt has failed, then advance the time + TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(), "Timed out waiting for inFlightRequests"); + time.sleep(requestTimeoutMs + retryBackoff); + final long betweenTimeoutMs = time.milliseconds(); + + // Since api timeout bound is not hit, AdminClient should add the retry call to the queue + TestUtils.waitForCondition(() -> ((KafkaAdminClient) env.adminClient()).numPendingCalls() == 1, + "Failed to add retry listTopics call"); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + time.sleep(requestTimeoutMs); + + assertEquals(apiTimeoutMs - requestTimeoutMs - retryBackoff, + KafkaAdminClient.calcTimeoutMsRemainingAsInt(betweenTimeoutMs, apiTimeoutMs + startTimeMs)); + assertEquals(result.listings().get().size(), 1); + assertEquals("foo", result.listings().get().iterator().next().name()); + + } + } + private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member, MemberAssignment assignment) { return new MemberDescription(member.memberId(), diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala index ff349f6f3612e..bb885c6459ff0 100644 --- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala @@ -79,7 +79,8 @@ object LeaderElectionCommand extends Logging { AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.options.valueOf(commandOptions.bootstrapServer) ) - props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toMillis.toString) + props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString) + props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis / 2).toString) JAdminClient.create(props) } diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 04765a8b20600..80b99e30dfa3e 100755 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -73,6 +73,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { new Properties() adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOpts.options.valueOf(commandOpts.bootstrapServerOpt)) adminProps.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toString) + adminProps.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, (timeout * 2).toString) new AdminClientCommand(adminProps) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 1f1e96d46b411..895fabeabd46c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -986,7 +986,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testCallInFlightTimeouts(): Unit = { val config = createConfig() - config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100000000") + config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000") val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory() client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory) val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava, diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala index dee00f616e810..d44e9d4ec06a9 100644 --- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala @@ -271,7 +271,7 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { private def createAdminClient: AdminClient = { val config = createConfig() - config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000") + config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "40000") val client = AdminClient.create(config) adminClients += client client diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala index 45fb7b9ebc3e8..15843ad7838d5 100644 --- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala @@ -309,7 +309,8 @@ object LeaderElectionCommandTest { def createConfig(servers: Seq[KafkaServer]): Map[String, Object] = { Map( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers(servers), - AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "20000" + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG -> "20000", + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "10000" ) } diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index 72e1408a861f0..5ebab7aae90b7 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -66,7 +66,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { def createAdminClient(servers: Seq[KafkaServer]): Admin = { val props = new Properties() props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers)) - props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000") + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000") JAdminClient.create(props) }