Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ public class CommonClientConfigs {
+ "The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher "
+ "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";

public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";
public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs. " +
"This configuration is used as the default timeout for all client operations that do not explicitly accept a <code>timeout</code> parameter.";

/**
* Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
* is explicitly configured but the maximum reconnect backoff is not explicitly configured.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public abstract class AbstractOptions<T extends AbstractOptions> {
protected Integer timeoutMs = null;

/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*/
@SuppressWarnings("unchecked")
Expand All @@ -36,7 +36,7 @@ public T timeoutMs(Integer timeoutMs) {
}

/**
* The request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* The api timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*/
public Integer timeoutMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Set;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;

/**
Expand Down Expand Up @@ -107,6 +108,7 @@ public class AdminClientConfig extends AbstractConfig {
private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;

public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;

/**
* <code>security.providers</code>
Expand Down Expand Up @@ -143,7 +145,7 @@ public class AdminClientConfig extends AbstractConfig {
RETRY_BACKOFF_MS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
120000,
30000,
atLeast(0),
Importance.MEDIUM,
REQUEST_TIMEOUT_MS_DOC)
Expand All @@ -154,10 +156,16 @@ public class AdminClientConfig extends AbstractConfig {
CONNECTIONS_MAX_IDLE_MS_DOC)
.define(RETRIES_CONFIG,
Type.INT,
5,
atLeast(0),
Integer.MAX_VALUE,
between(0, Integer.MAX_VALUE),
Importance.LOW,
CommonClientConfigs.RETRIES_DOC)
.define(DEFAULT_API_TIMEOUT_MS_CONFIG,
Type.INT,
60000,
atLeast(0),
Importance.MEDIUM,
CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
30000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class AlterConfigsOptions extends AbstractOptions<AlterConfigsOptions> {
private boolean validateOnly = false;

/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class CreateAclsOptions extends AbstractOptions<CreateAclsOptions> {

/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> {
private boolean validateOnly = false;

/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class DeleteAclsOptions extends AbstractOptions<DeleteAclsOptions> {

/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class DeleteTopicsOptions extends AbstractOptions<DeleteTopicsOptions> {

/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class DescribeAclsOptions extends AbstractOptions<DescribeAclsOptions> {

/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio
private boolean includeAuthorizedOperations;

/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptio
private boolean includeSynonyms = false;

/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions
private boolean includeAuthorizedOperations;

/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
Expand Down Expand Up @@ -260,7 +261,12 @@ public class KafkaAdminClient extends AdminClient {
/**
* The default timeout to use for an operation.
*/
private final int defaultTimeoutMs;
private int defaultTimeoutMs;

/**
* The timeout to use for a single request.
*/
private int requestTimeoutMs;

/**
* The name of this AdminClient instance.
Expand Down Expand Up @@ -497,9 +503,9 @@ private KafkaAdminClient(AdminClientConfig config,
KafkaClient client,
TimeoutProcessorFactory timeoutProcessorFactory,
LogContext logContext) {
this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.clientId = clientId;
this.log = logContext.logger(KafkaAdminClient.class);
configureDefaultApiTimeoutMsAndRequestTimeoutMs(config);
this.time = time;
this.metadataManager = metadataManager;
this.metrics = metrics;
Expand All @@ -517,10 +523,42 @@ private KafkaAdminClient(AdminClientConfig config,
thread.start();
}

/**
* If a default.api.timeout.ms has been explicitly specified, raise an error if it conflicts with request.timeout.ms.
* If no default.api.timeout.ms has been configured, then set its value as the max of the default and request.timeout.ms. Also we should probably log a warning.
* Otherwise, use the provided values for both configurations.
*
* @param config The configuration
*/
private void configureDefaultApiTimeoutMsAndRequestTimeoutMs(AdminClientConfig config) {
this.requestTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultTimeoutMs = config.getInt(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);

if (this.defaultTimeoutMs < this.requestTimeoutMs) {
if (config.originals().containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
throw new ConfigException("The specified value of " + AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG +
" must be no smaller than the value of " + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG + ".");
} else {
log.warn("Overriding the default value for {} to {}.", AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, this.requestTimeoutMs);
this.defaultTimeoutMs = Math.max(this.defaultTimeoutMs, this.requestTimeoutMs);
}
}
}

Time time() {
return time;
}

// package level visibility for testing only
int defaultTimeoutMs() {
return this.defaultTimeoutMs;
}

// package level visibility for testing only
int requestTimeoutMs() {
return this.requestTimeoutMs;
}

@Override
public void close(Duration timeout) {
long waitTimeMs = timeout.toMillis();
Expand Down Expand Up @@ -989,7 +1027,7 @@ private long sendEligibleCalls(long now) {
continue;
}
Call call = calls.remove(0);
int timeoutMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
int timeoutMs = Math.min(requestTimeoutMs, calcTimeoutMsRemainingAsInt(now, call.deadlineMs));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to passing through the timeout to createRequest, we also need to pass it through in newClientRequest. Only some of the APIs support an explicit request timeout. This appears to have been a pre-existing bug.

AbstractRequest.Builder<?> requestBuilder;
try {
requestBuilder = call.createRequest(timeoutMs);
Expand All @@ -998,7 +1036,8 @@ private long sendEligibleCalls(long now) {
"Internal error sending %s to %s.", call.callName, node)));
continue;
}
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true);
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now,
true, timeoutMs, null);
log.trace("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId());
client.send(clientRequest, now);
getOrCreateListValue(callsInFlight, node.idString()).add(call);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {
private boolean listInternal = false;

/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* Set the api timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ public class ConsumerConfig extends AbstractConfig {
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;

/** <code>default.api.timeout.ms</code> */
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";
public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for consumer APIs that could block. This configuration is used as the default timeout for all consumer operations that do not explicitly accept a <code>timeout</code> parameter.";
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;

/** <code>interceptor.classes</code> */
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
Expand Down Expand Up @@ -447,7 +446,7 @@ public class ConsumerConfig extends AbstractConfig {
60 * 1000,
atLeast(0),
Importance.MEDIUM,
DEFAULT_API_TIMEOUT_MS_DOC)
CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
Expand Down Expand Up @@ -174,6 +175,22 @@ public class KafkaAdminClientTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);

@Test
public void testDefaultApiTimeoutAndRequestTimeoutConflicts() {
AdminClientConfig config = newConfMap(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "500");
try (Admin client = KafkaAdminClient.createInternal(config, null)) {
fail("Expected KafkaException");
} catch (KafkaException e) {
assertTrue(e.getCause() instanceof ConfigException);
}

config = newConfMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000000");
try (KafkaAdminClient client = KafkaAdminClient.createInternal(config, null)) {
// default api timeout should be overridden to request timeout.
assertEquals(client.defaultTimeoutMs(), client.requestTimeoutMs());
}
}

@Test
public void testGetOrCreateListValue() {
Map<String, List<String>> map = new HashMap<>();
Expand Down Expand Up @@ -567,7 +584,7 @@ public void testMetadataRetries() throws Exception {

try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster,
newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000000",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000000",
AdminClientConfig.RETRIES_CONFIG, "0"))) {

// The first request fails with a disconnect
Expand Down Expand Up @@ -2712,6 +2729,49 @@ public void testGetSubLevelError() {
errorsMap, memberIdentities.get(1), "For unit test").getClass());
}

@Test
public void testSingleRequestTimeoutAndRetryWithoutApiTimeout() throws Exception {
HashMap<Integer, Node> nodes = new HashMap<>();
MockTime time = new MockTime();
Node node0 = new Node(0, "localhost", 8121);
nodes.put(0, node0);
Cluster cluster = new Cluster("mockClusterId", nodes.values(),
Arrays.asList(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0})),
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), nodes.get(0));

final int requestTimeoutMs = 1000;
final int retryBackoff = 100;
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoff),
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeoutMs))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
assertEquals(time, env.time());
assertEquals(env.time(), ((KafkaAdminClient) env.adminClient()).time());

final int apiTimeoutMs = 3000;
final long startTimeMs = time.milliseconds();
final ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().timeoutMs(apiTimeoutMs));

// Wait until the first attempt has failed, then advance the time
TestUtils.waitForCondition(() -> env.kafkaClient().hasInFlightRequests(), "Timed out waiting for inFlightRequests");
time.sleep(requestTimeoutMs + retryBackoff);
final long betweenTimeoutMs = time.milliseconds();

// Since api timeout bound is not hit, AdminClient should add the retry call to the queue
TestUtils.waitForCondition(() -> ((KafkaAdminClient) env.adminClient()).numPendingCalls() == 1,
"Failed to add retry listTopics call");
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
time.sleep(requestTimeoutMs);

assertEquals(apiTimeoutMs - requestTimeoutMs - retryBackoff,
KafkaAdminClient.calcTimeoutMsRemainingAsInt(betweenTimeoutMs, apiTimeoutMs + startTimeMs));
assertEquals(result.listings().get().size(), 1);
assertEquals("foo", result.listings().get().iterator().next().name());

}
}

private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
MemberAssignment assignment) {
return new MemberDescription(member.memberId(),
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ object LeaderElectionCommand extends Logging {
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
commandOptions.options.valueOf(commandOptions.bootstrapServer)
)
props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)
props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString)
props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis / 2).toString)

JAdminClient.create(props)
}
Expand Down
Loading