From d24493ed80960ce59539a60f9d44ac63223d8839 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 19 May 2017 11:46:20 +0100 Subject: [PATCH 1/5] KAFKA-5291: AdminClient should not trigger auto creation of topics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Added a boolean allow_auto_topic_creation to MetadataRequest. I didn’t bump the version a second time since we did it once for this release already, but this needs to be verified. - Set it to false in the new AdminClient and StreamsKafkaClient (which exists for the purpose of creating topics manually); set it to true everywhere else for now. Other clients will eventually rely on client-side auto topic creation, but that’s not there yet. --- .../org/apache/kafka/clients/Metadata.java | 22 +++++----- .../apache/kafka/clients/NetworkClient.java | 3 +- .../kafka/clients/admin/AdminClient.java | 7 ++-- .../kafka/clients/admin/KafkaAdminClient.java | 6 +-- .../kafka/clients/consumer/KafkaConsumer.java | 5 ++- .../kafka/clients/producer/KafkaProducer.java | 3 +- .../kafka/common/protocol/Protocol.java | 28 ++++++++----- .../common/requests/MetadataRequest.java | 27 +++++++++--- .../apache/kafka/clients/MetadataTest.java | 12 +++--- .../kafka/clients/NetworkClientTest.java | 8 ++-- .../admin/MockKafkaAdminClientEnv.java | 2 +- .../clients/consumer/KafkaConsumerTest.java | 36 +++++++++------- .../internals/AbstractCoordinatorTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 2 +- .../internals/ConsumerNetworkClientTest.java | 2 +- .../consumer/internals/FetcherTest.java | 14 +++---- .../clients/producer/KafkaProducerTest.java | 3 +- .../producer/internals/SenderTest.java | 2 +- .../internals/TransactionManagerTest.java | 2 +- .../common/requests/RequestResponseTest.java | 2 +- .../authenticator/SaslAuthenticatorTest.java | 11 +++-- .../distributed/WorkerGroupMember.java | 2 +- .../distributed/WorkerCoordinatorTest.java | 2 +- .../main/scala/kafka/admin/AdminClient.scala | 4 +- .../main/scala/kafka/server/KafkaApis.scala | 10 +++-- .../kafka/api/AuthorizerIntegrationTest.scala | 10 +++-- .../api/KafkaAdminClientIntegrationTest.scala | 31 ++++++++++---- .../AbstractCreateTopicsRequestTest.scala | 4 +- .../server/DeleteTopicsRequestTest.scala | 2 +- .../kafka/server/MetadataRequestTest.scala | 42 ++++++++++++++++--- .../unit/kafka/server/RequestQuotaTest.scala | 2 +- .../internals/StreamsKafkaClient.java | 8 ++-- 32 files changed, 198 insertions(+), 118 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 9ff629d736409..0963bad2e0320 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -66,17 +66,11 @@ public final class Metadata { private final List listeners; private final ClusterResourceListeners clusterResourceListeners; private boolean needMetadataForAllTopics; + private final boolean allowAutoTopicCreation; private final boolean topicExpiryEnabled; - /** - * Create a metadata instance with reasonable defaults - */ - public Metadata() { - this(100L, 60 * 60 * 1000L); - } - - public Metadata(long refreshBackoffMs, long metadataExpireMs) { - this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners()); + public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation) { + this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, false, new ClusterResourceListeners()); } /** @@ -84,12 +78,16 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs) { * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy * polling * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh + * @param allowAutoTopicCreation If this and the broker config 'auto.create.topics.enable' are true, topics that + * don't exist will be created by the broker when a metadata request is sent * @param topicExpiryEnabled If true, enable expiry of unused topics * @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates. */ - public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) { + public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation, + boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; + this.allowAutoTopicCreation = allowAutoTopicCreation; this.topicExpiryEnabled = topicExpiryEnabled; this.lastRefreshMs = 0L; this.lastSuccessfulRefreshMs = 0L; @@ -275,6 +273,10 @@ public synchronized long lastSuccessfulUpdate() { return this.lastSuccessfulRefreshMs; } + public boolean allowAutoTopicCreation() { + return allowAutoTopicCreation; + } + /** * Set state to indicate if metadata for all topics in Kafka cluster is required or not. * @param needMetadataForAllTopics boolean indicating need for metadata of all topics in cluster. diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index bfd0eb56d1184..1d4fe58949a17 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -829,7 +829,8 @@ private long maybeUpdate(long now, Node node) { if (metadata.needMetadataForAllTopics()) metadataRequest = MetadataRequest.Builder.allTopics(); else - metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics())); + metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()), + metadata.allowAutoTopicCreation()); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 8ae32496fe78a..8b5e8dc9ed49c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -167,11 +167,10 @@ public DescribeTopicsResult describeTopics(Collection topicNames) { /** * Describe some topics in the cluster. * - * Note that if auto.create.topics.enable is true on the brokers, + * If the Kafka cluster is older than 0.11.0 and the broker config auto.create.topics.enable is true, * describeTopics(topicName, ...) may create a topic named topicName. - * There are two workarounds: either use AdminClient#listTopics and ensure - * that the topic is present before describing, or disable - * auto.create.topics.enable. + * There are two workarounds that don't involve upgrading the Kafka cluster: disable auto.create.topics.enable on + * every broker or check if the topic is present via AdminClient#listTopics. * * @param topicNames The names of the topics to describe. * @param options The options to use when describing the topic. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 9fa0cadb6fd70..a258e7bbf6221 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -282,7 +282,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config) { try { metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), - config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG)); + config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false); List reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); Map metricTags = Collections.singletonMap("client-id", clientId); @@ -1155,7 +1155,7 @@ public DescribeTopicsResult describeTopics(final Collection topicNames, @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new MetadataRequest.Builder(topicNamesList); + return new MetadataRequest.Builder(topicNamesList, false); } @Override @@ -1208,7 +1208,7 @@ public DescribeClusterResult describeCluster(DescribeClusterOptions options) { @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new MetadataRequest.Builder(Collections.emptyList()); + return new MetadataRequest.Builder(Collections.emptyList(), false); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 52d94563804fd..055712eac067b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -659,7 +659,8 @@ private KafkaConsumer(ConsumerConfig config, this.valueDeserializer = valueDeserializer; } ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners); + this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), + true, false, clusterResourceListeners); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0); String metricGrpPrefix = "consumer"; @@ -1352,7 +1353,7 @@ public List partitionsFor(String topic) { return parts; Map> topicMetadata = fetcher.getTopicMetadata( - new MetadataRequest.Builder(Collections.singletonList(topic)), requestTimeoutMs); + new MetadataRequest.Builder(Collections.singletonList(topic), true), requestTimeoutMs); return topicMetadata.get(topic); } finally { release(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index dc6b911d09d65..89a18e3802f82 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -265,7 +265,8 @@ private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serial ProducerInterceptor.class); this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList); ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners); + this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), + true, true, clusterResourceListeners); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 91391e99c4d8e..cb4f9ca81f445 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -65,8 +65,16 @@ public class Protocol { /* The v2 metadata request is the same as v1. An additional field for cluster id has been added to the v2 metadata response */ public static final Schema METADATA_REQUEST_V2 = METADATA_REQUEST_V1; - /* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */ - public static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2; + /* V3 additions: a field for allowing auto topic creation was added to the request and a field for throttle time has + * been added to the response */ + public static final Schema METADATA_REQUEST_V3 = new Schema(new Field("topics", + ArrayOf.nullable(STRING), + "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."), + new Field("allow_auto_topic_creation", + BOOLEAN, + "If this and the broker config 'auto.create.topics.enable' are true, " + + "topics that don't exist will be created by the broker. " + + "Otherwise, no topics will be created by the broker.")); public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."), new Field("host", STRING, "The hostname of the broker."), @@ -1181,8 +1189,8 @@ public class Protocol { new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."), newThrottleTimeField()); - public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1}; - public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1}; + public static final Schema[] API_VERSIONS_REQUEST = {API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1}; + public static final Schema[] API_VERSIONS_RESPONSE = {API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1}; /* Admin requests common */ public static final Schema CONFIG_ENTRY = new Schema(new Field("config_name", STRING, "Configuration name"), @@ -1636,8 +1644,8 @@ public class Protocol { new ArrayOf(DESCRIBE_ACLS_RESOURCE), "The resources and their associated ACLs.")); - public static final Schema[] DESCRIBE_ACLS_REQUEST = new Schema[] {DESCRIBE_ACLS_REQUEST_V0}; - public static final Schema[] DESCRIBE_ACLS_RESPONSE = new Schema[] {DESCRIBE_ACLS_RESPONSE_V0}; + public static final Schema[] DESCRIBE_ACLS_REQUEST = {DESCRIBE_ACLS_REQUEST_V0}; + public static final Schema[] DESCRIBE_ACLS_RESPONSE = {DESCRIBE_ACLS_RESPONSE_V0}; public static final Schema CREATE_ACLS_REQUEST_V0 = new Schema( new Field("creations", @@ -1658,8 +1666,8 @@ public class Protocol { new Field("error_message", NULLABLE_STRING, "The error message.") )))); - public static final Schema[] CREATE_ACLS_REQUEST = new Schema[] {CREATE_ACLS_REQUEST_V0}; - public static final Schema[] CREATE_ACLS_RESPONSE = new Schema[] {CREATE_ACLS_RESPONSE_V0}; + public static final Schema[] CREATE_ACLS_REQUEST = {CREATE_ACLS_REQUEST_V0}; + public static final Schema[] CREATE_ACLS_RESPONSE = {CREATE_ACLS_RESPONSE_V0}; public static final Schema DELETE_ACLS_REQUEST_V0 = new Schema( new Field("filters", @@ -1691,8 +1699,8 @@ public class Protocol { new Field("error_message", NULLABLE_STRING, "The error message."), new Field("matching_acls", new ArrayOf(MATCHING_ACL), "The matching ACLs"))))); - public static final Schema[] DELETE_ACLS_REQUEST = new Schema[] {DELETE_ACLS_REQUEST_V0}; - public static final Schema[] DELETE_ACLS_RESPONSE = new Schema[] {DELETE_ACLS_RESPONSE_V0}; + public static final Schema[] DELETE_ACLS_REQUEST = {DELETE_ACLS_REQUEST_V0}; + public static final Schema[] DELETE_ACLS_RESPONSE = {DELETE_ACLS_RESPONSE_V0}; /* an array of all requests and responses with all schema versions; a null value in the inner array means that the * particular version is not supported */ diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 3c201391b9dac..7d3a32ff1f2c3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -30,19 +30,24 @@ public class MetadataRequest extends AbstractRequest { + private static final String TOPICS_KEY_NAME = "topics"; + private static final String ALLOW_AUTO_TOPIC_CREATION_KEY_NAME = "allow_auto_topic_creation"; + public static class Builder extends AbstractRequest.Builder { private static final List ALL_TOPICS = null; // The list of topics, or null if we want to request metadata about all topics. private final List topics; + private final boolean allowAutoTopicCreation; public static Builder allTopics() { - return new Builder(ALL_TOPICS); + return new Builder(ALL_TOPICS, false); } - public Builder(List topics) { + public Builder(List topics, boolean allowAutoTopicCreation) { super(ApiKeys.METADATA); this.topics = topics; + this.allowAutoTopicCreation = allowAutoTopicCreation; } public List topics() { @@ -59,7 +64,7 @@ public MetadataRequest build(short version) { throw new UnsupportedVersionException("MetadataRequest " + "versions older than 1 are not supported."); } - return new MetadataRequest(this.topics, version); + return new MetadataRequest(this.topics, allowAutoTopicCreation, version); } @Override @@ -77,18 +82,18 @@ public String toString() { } } - private static final String TOPICS_KEY_NAME = "topics"; - private final List topics; + private final boolean allowAutoTopicCreation; /** * In v0 null is not allowed and an empty list indicates requesting all topics. * Note: modern clients do not support sending v0 requests. * In v1 null indicates requesting all topics, and an empty list indicates requesting no topics. */ - public MetadataRequest(List topics, short version) { + public MetadataRequest(List topics, boolean allowAutoTopicCreation, short version) { super(version); this.topics = topics; + this.allowAutoTopicCreation = allowAutoTopicCreation; } public MetadataRequest(Struct struct, short version) { @@ -102,6 +107,10 @@ public MetadataRequest(Struct struct, short version) { } else { topics = null; } + if (struct.hasField(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME)) + allowAutoTopicCreation = struct.getBoolean(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME); + else + allowAutoTopicCreation = true; } @Override @@ -137,6 +146,10 @@ public List topics() { return topics; } + public boolean allowAutoTopicCreation() { + return allowAutoTopicCreation; + } + public static MetadataRequest parse(ByteBuffer buffer, short version) { return new MetadataRequest(ApiKeys.METADATA.parseRequest(version, buffer), version); } @@ -148,6 +161,8 @@ protected Struct toStruct() { struct.set(TOPICS_KEY_NAME, null); else struct.set(TOPICS_KEY_NAME, topics.toArray()); + if (struct.hasField(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME)) + struct.set(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME, allowAutoTopicCreation); return struct; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 0c87fc75d8dce..407eb9f19a054 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -45,8 +45,8 @@ public class MetadataTest { private long refreshBackoffMs = 100; private long metadataExpireMs = 1000; - private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs); - private AtomicReference backgroundError = new AtomicReference(); + private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true); + private AtomicReference backgroundError = new AtomicReference<>(); @After public void tearDown() { @@ -96,7 +96,7 @@ private static void checkTimeToNextUpdate(long refreshBackoffMs, long metadataEx } long largerOfBackoffAndExpire = Math.max(refreshBackoffMs, metadataExpireMs); - Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs); + Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true); assertEquals(0, metadata.timeToNextUpdate(now)); @@ -255,7 +255,7 @@ public void testClusterListenerGetsNotifiedOfUpdate() { MockClusterResourceListener mockClusterListener = new MockClusterResourceListener(); ClusterResourceListeners listeners = new ClusterResourceListeners(); listeners.maybeAdd(mockClusterListener); - metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, listeners); + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, false, listeners); String hostName = "www.example.com"; Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002))); @@ -348,7 +348,7 @@ public void onMetadataUpdate(Cluster cluster, Set unavailableTopics) { @Test public void testTopicExpiry() throws Exception { - metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners()); + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, true, new ClusterResourceListeners()); // Test that topic is expired if not used within the expiry interval long time = 0; @@ -380,7 +380,7 @@ public void testTopicExpiry() throws Exception { @Test public void testNonExpiringMetadata() throws Exception { - metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners()); + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, false, new ClusterResourceListeners()); // Test that topic is not expired if not used within the expiry interval long time = 0; diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index c87acd79a2162..0de76a1a6a04f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -49,7 +49,7 @@ public class NetworkClientTest { protected final int requestTimeoutMs = 1000; protected final MockTime time = new MockTime(); protected final MockSelector selector = new MockSelector(time); - protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE); + protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); protected final int nodeId = 1; protected final Cluster cluster = TestUtils.singletonCluster("test", nodeId); protected final Node node = cluster.nodes().get(0); @@ -86,8 +86,7 @@ public void setup() { @Test(expected = IllegalStateException.class) public void testSendToUnreadyNode() { - MetadataRequest.Builder builder = - new MetadataRequest.Builder(Arrays.asList("test")); + MetadataRequest.Builder builder = new MetadataRequest.Builder(Arrays.asList("test"), true); long now = time.milliseconds(); ClientRequest request = client.newClientRequest("5", builder, now, false); client.send(request, now); @@ -251,8 +250,7 @@ public void testDisconnectDuringUserMetadataRequest() { // metadata request when the remote node disconnects with the request in-flight. awaitReady(client, node); - MetadataRequest.Builder builder = - new MetadataRequest.Builder(Collections.emptyList()); + MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true); long now = time.milliseconds(); ClientRequest request = client.newClientRequest(node.idString(), builder, now, true); client.send(request, now); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java index ba7b528926f2e..6c1fd17e5b84a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java @@ -50,7 +50,7 @@ public MockKafkaAdminClientEnv(Cluster cluster, Map config) { this.adminClientConfig = new AdminClientConfig(config); this.cluster = cluster; this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), - adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG)); + adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false); this.mockClient = new MockClient(Time.SYSTEM, this.metadata); this.client = KafkaAdminClient.createInternal(adminClientConfig, mockClient, metadata); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 1249896d4d82b..f918a34ec10f0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -366,7 +366,7 @@ public void verifyHeartbeatSent() throws Exception { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -407,7 +407,7 @@ public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -448,7 +448,7 @@ public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -484,7 +484,7 @@ public void testCommitsFetchedDuringAssign() { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -534,7 +534,7 @@ public void testAutoCommitSentBeforePositionUpdate() { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -581,7 +581,7 @@ public void testRegexSubscription() { topicMetadata.put(unmatchedTopic, 1); Cluster cluster = TestUtils.clusterWith(1, topicMetadata); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); Node node = cluster.nodes().get(0); MockClient client = new MockClient(time, metadata); @@ -621,7 +621,7 @@ public void testChangingRegexSubscription() { topicMetadata.put(otherTopic, 1); Cluster cluster = TestUtils.clusterWith(1, topicMetadata); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); Node node = cluster.nodes().get(0); MockClient client = new MockClient(time, metadata); @@ -664,7 +664,7 @@ public void testWakeupWithFetchDataAvailable() throws Exception { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -720,7 +720,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception { Cluster cluster = TestUtils.singletonCluster(topic, 1); final Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); final MockClient client = new MockClient(time, metadata); @@ -760,7 +760,7 @@ public void fetchResponseWithUnexpectedPartitionIsIgnored() { Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1)); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -808,7 +808,7 @@ public void testSubscriptionChangesWithAutoCommitEnabled() { Cluster cluster = TestUtils.singletonCluster(tpCounts); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -928,7 +928,7 @@ public void testSubscriptionChangesWithAutoCommitDisabled() { Cluster cluster = TestUtils.singletonCluster(tpCounts); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -996,7 +996,7 @@ public void testManualAssignmentChangeWithAutoCommitEnabled() { Cluster cluster = TestUtils.singletonCluster(tpCounts); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -1061,7 +1061,7 @@ public void testManualAssignmentChangeWithAutoCommitDisabled() { Cluster cluster = TestUtils.singletonCluster(tpCounts); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -1122,7 +1122,7 @@ public void testOffsetOfPausedPartitions() { Cluster cluster = TestUtils.singletonCluster(topic, 2); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -1268,7 +1268,7 @@ private void consumerCloseTest(final long closeTimeoutMs, Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -1372,6 +1372,10 @@ public void onPartitionsAssigned(Collection partitions) { }; } + private Metadata createMetadata() { + return new Metadata(0, Long.MAX_VALUE, true); + } + private Node prepareRebalance(MockClient client, Node node, final Set subscribedTopics, PartitionAssignor assignor, List partitions, Node coordinator) { if (coordinator == null) { // lookup coordinator diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 4779f43cf7943..8a934398928fa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -73,7 +73,7 @@ public void setupCoordinator() { this.mockTime = new MockTime(); this.mockClient = new MockClient(mockTime); - Metadata metadata = new Metadata(); + Metadata metadata = new Metadata(100L, 60 * 60 * 1000L, true); this.consumerClient = new ConsumerNetworkClient(mockClient, metadata, mockTime, RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS); Metrics metrics = new Metrics(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 770d4f7f65252..7d22351369b99 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -119,7 +119,7 @@ public class ConsumerCoordinatorTest { public void setup() { this.time = new MockTime(); this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); - this.metadata = new Metadata(0, Long.MAX_VALUE); + this.metadata = new Metadata(0, Long.MAX_VALUE, true); this.metadata.update(cluster, Collections.emptySet(), time.milliseconds()); this.client = new MockClient(time, metadata); this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 2a6b2286c9d24..b46b65746b21c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -46,7 +46,7 @@ public class ConsumerNetworkClientTest { private MockClient client = new MockClient(time); private Cluster cluster = TestUtils.singletonCluster(topicName, 1); private Node node = cluster.nodes().get(0); - private Metadata metadata = new Metadata(0, Long.MAX_VALUE); + private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 7d4862372124b..f23a04ab6800b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -118,7 +118,7 @@ public class FetcherTest { private int fetchSize = 1000; private long retryBackoffMs = 100; private MockTime time = new MockTime(1); - private Metadata metadata = new Metadata(0, Long.MAX_VALUE); + private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); private MockClient client = new MockClient(time, metadata); private Cluster cluster = TestUtils.singletonCluster(topicName, 2); private Node node = cluster.nodes().get(0); @@ -1068,16 +1068,15 @@ public void testGetAllTopicsUnauthorized() { public void testGetTopicMetadataInvalidTopic() { client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION)); fetcher.getTopicMetadata( - new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L); + new MetadataRequest.Builder(Collections.singletonList(topicName), true), 5000L); } @Test public void testGetTopicMetadataUnknownTopic() { client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION)); - Map> topicMetadata = - fetcher.getTopicMetadata( - new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L); + Map> topicMetadata = fetcher.getTopicMetadata( + new MetadataRequest.Builder(Collections.singletonList(topicName), true), 5000L); assertNull(topicMetadata.get(topicName)); } @@ -1086,9 +1085,8 @@ public void testGetTopicMetadataLeaderNotAvailable() { client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE)); client.prepareResponse(newMetadataResponse(topicName, Errors.NONE)); - Map> topicMetadata = - fetcher.getTopicMetadata( - new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L); + Map> topicMetadata = fetcher.getTopicMetadata( + new MetadataRequest.Builder(Collections.singletonList(topicName), true), 5000L); assertTrue(topicMetadata.containsKey(topicName)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 3a6426a13afe3..e2fe614b81dc4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -325,7 +325,8 @@ public void testTopicRefreshInMetadata() throws Exception { KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); long refreshBackoffMs = 500L; long metadataExpireMs = 60000L; - final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners()); + final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, + true, new ClusterResourceListeners()); final Time time = new MockTime(); MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); MemberModifier.field(KafkaProducer.class, "time").set(producer, time); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index faa6ea5229dea..c1c5a2e0ff242 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -87,7 +87,7 @@ public class SenderTest { private MockTime time = new MockTime(); private MockClient client = new MockClient(time); private int batchSize = 16 * 1024; - private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, new ClusterResourceListeners()); + private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners()); private ApiVersions apiVersions = new ApiVersions(); private Cluster cluster = TestUtils.singletonCluster("test", 2); private Metrics metrics = null; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index ed7ec84d61d01..3e3f785c6edda 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -92,7 +92,7 @@ public class TransactionManagerTest { private MockTime time = new MockTime(); private MockClient client = new MockClient(time); - private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, new ClusterResourceListeners()); + private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners()); private ApiVersions apiVersions = new ApiVersions(); private Cluster cluster = TestUtils.singletonCluster("test", 2); private RecordAccumulator accumulator = null; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 56f0215668367..72b8b8b93b68e 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -703,7 +703,7 @@ private ListOffsetResponse createListOffsetResponse(int version) { } private MetadataRequest createMetadataRequest(int version, List topics) { - return new MetadataRequest.Builder(topics).build((short) version); + return new MetadataRequest.Builder(topics, true).build((short) version); } private MetadataResponse createMetadataResponse() { diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 28402f0ca1534..631ae08b31b78 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -548,10 +548,10 @@ public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception { // Send metadata request before Kafka SASL handshake request String node1 = "invalid1"; createClientConnection(SecurityProtocol.PLAINTEXT, node1); - MetadataRequest metadataRequest1 = - new MetadataRequest.Builder(Collections.singletonList("sometopic")).build(); - RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, - metadataRequest1.version(), "someclient", 1); + MetadataRequest metadataRequest1 = new MetadataRequest.Builder(Collections.singletonList("sometopic"), + true).build(); + RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, metadataRequest1.version(), + "someclient", 1); selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1)); NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); selector.close(); @@ -563,8 +563,7 @@ public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception { String node2 = "invalid2"; createClientConnection(SecurityProtocol.PLAINTEXT, node2); sendHandshakeRequestReceiveResponse(node2); - MetadataRequest metadataRequest2 = - new MetadataRequest.Builder(Collections.singletonList("sometopic")).build(); + MetadataRequest metadataRequest2 = new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build(); RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id, metadataRequest2.version(), "someclient", 2); selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2)); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 24d321eec7b7d..62e2fc1b243cb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -87,7 +87,7 @@ public WorkerGroupMember(DistributedConfig config, reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); - this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG)); + this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), true); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0); String metricGrpPrefix = "connect"; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index ab042dea47dea..edef7dcb3f326 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -90,7 +90,7 @@ public class WorkerCoordinatorTest { public void setup() { this.time = new MockTime(); this.client = new MockClient(time); - this.metadata = new Metadata(0, Long.MAX_VALUE); + this.metadata = new Metadata(0, Long.MAX_VALUE, true); this.metadata.update(cluster, Collections.emptySet(), time.milliseconds()); this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); this.metrics = new Metrics(time); diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 50198a763ede4..4410e94e033f7 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -213,7 +213,7 @@ class AdminClient(val time: Time, */ def deleteRecordsBefore(offsets: Map[TopicPartition, Long]): Future[Map[TopicPartition, DeleteRecordsResult]] = { - val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic()).toSet.toList.asJava) + val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic).toSet.toList.asJava, true) val response = sendAnyNode(ApiKeys.METADATA, metadataRequest).asInstanceOf[MetadataResponse] val errors = response.errors if (!errors.isEmpty) @@ -425,7 +425,7 @@ object AdminClient { def create(config: AdminConfig): AdminClient = { val time = Time.SYSTEM val metrics = new Metrics(time) - val metadata = new Metadata + val metadata = new Metadata(100L, 60 * 60 * 1000L, true) val channelBuilder = ClientUtils.createChannelBuilder(config) val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG) val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b780823c62a3f..eb0bf3b999c83 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -886,7 +886,8 @@ class KafkaApis(val requestChannel: RequestChannel, topicMetadata.headOption.getOrElse(createInternalTopic(topic)) } - private def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = { + private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName, + errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = { val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints) if (topics.isEmpty || topicResponses.size == topics.size) { topicResponses @@ -899,7 +900,7 @@ class KafkaApis(val requestChannel: RequestChannel, new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, true, java.util.Collections.emptyList()) else topicMetadata - } else if (config.autoCreateTopicsEnable) { + } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) { createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()) @@ -937,7 +938,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) - if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { + if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { if (!authorize(request.session, Create, Resource.ClusterResource)) { authorizedTopics --= nonExistingTopics unauthorizedForCreateTopics ++= nonExistingTopics @@ -965,7 +966,8 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedTopics.isEmpty) Seq.empty[MetadataResponse.TopicMetadata] else - getTopicMetadata(authorizedTopics, request.listenerName, errorUnavailableEndpoints) + getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.listenerName, + errorUnavailableEndpoints) val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 017c57f75cd2b..dce5da272a8a6 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -222,8 +222,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { super.tearDown() } - private def createMetadataRequest = { - new requests.MetadataRequest.Builder(List(topic).asJava).build() + private def createMetadataRequest(allowAutoTopicCreation: Boolean) = { + new requests.MetadataRequest.Builder(List(topic).asJava, allowAutoTopicCreation).build() } private def createProduceRequest = { @@ -328,7 +328,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testAuthorizationWithTopicExisting() { val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( - ApiKeys.METADATA -> createMetadataRequest, + ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true), ApiKeys.PRODUCE -> createProduceRequest, ApiKeys.FETCH -> createFetchRequest, ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, @@ -381,6 +381,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers) val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( + ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true), + ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false), ApiKeys.PRODUCE -> createProduceRequest, ApiKeys.FETCH -> createFetchRequest, ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, @@ -397,7 +399,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val resourceToAcls = requestKeysToAcls(key) resourceToAcls.get(topicResource).foreach { acls => val describeAcls = topicDescribeAcl(topicResource) - val isAuthorized = describeAcls == acls + val isAuthorized = describeAcls == acls addAndVerifyAcls(describeAcls, topicResource) sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false) removeAllAcls() diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala index 065759ffe6072..0e21da7fd012d 100644 --- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala @@ -30,8 +30,7 @@ import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.common.KafkaFuture import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.config.ConfigResource -import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TopicExistsException} -import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException} +import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException, UnknownTopicOrPartitionException} import org.apache.kafka.common.protocol.ApiKeys import org.junit.{After, Before, Rule, Test} import org.apache.kafka.common.requests.MetadataResponse @@ -80,7 +79,7 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin def waitForTopics(client: AdminClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = { TestUtils.waitUntilTrue(() => { - val topics = client.listTopics().names().get() + val topics = client.listTopics.names.get() expectedPresent.forall(topicName => topics.contains(topicName)) && expectedMissing.forall(topicName => !topics.contains(topicName)) }, "timed out waiting for topics") @@ -123,21 +122,39 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin val topics = Seq("mytopic", "mytopic2") val newTopics = topics.map(new NewTopic(_, 1, 1)) client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all.get() - waitForTopics(client, List(), List("mytopic", "mytopic2")) + waitForTopics(client, List(), topics) client.createTopics(newTopics.asJava).all.get() - waitForTopics(client, List("mytopic", "mytopic2"), List()) + waitForTopics(client, topics, List()) val results = client.createTopics(newTopics.asJava).results() assertTrue(results.containsKey("mytopic")) assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException]) assertTrue(results.containsKey("mytopic2")) assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException]) - val topicsFromDescribe = client.describeTopics(Seq("mytopic", "mytopic2").asJava).all.get().asScala.keys + val topicsFromDescribe = client.describeTopics(topics.asJava).all.get().asScala.keys assertEquals(topics.toSet, topicsFromDescribe) client.deleteTopics(topics.asJava).all.get() - waitForTopics(client, List(), List("mytopic", "mytopic2")) + waitForTopics(client, List(), topics) + } + + /** + * describe should not auto create topics + */ + @Test + def testDescribeNonExistingTopic(): Unit = { + client = AdminClient.create(createConfig()) + + val existingTopic = "existing-topic" + client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1)).asJava).all.get() + waitForTopics(client, Seq(existingTopic), List()) + + val nonExistingTopic = "non-existing" + val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).results + assertEquals(existingTopic, results.get(existingTopic).get.name) + intercept[ExecutionException](results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException] + assertEquals(None, zkUtils.getTopicPartitionCount(nonExistingTopic)) } @Test diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala index 8e6b11d78b68c..0ef3405287d7c 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala @@ -43,7 +43,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { def verifyMetadata(socketServer: SocketServer) = { val metadata = sendMetadataRequest( - new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala + new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala val metadataForTopic = metadata.filter(_.topic == topic).head val partitions = if (!details.replicasAssignments.isEmpty) @@ -127,7 +127,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { protected def validateTopicExists(topic: String): Unit = { TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) val metadata = sendMetadataRequest( - new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala + new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala assertTrue("The topic should be created", metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE)) } diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index 9cd53d8e15888..881bf8ee790cc 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -106,7 +106,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest { private def validateTopicIsDeleted(topic: String): Unit = { val metadata = sendMetadataRequest(new MetadataRequest. - Builder(List(topic).asJava).build).topicMetadata.asScala + Builder(List(topic).asJava, true).build).topicMetadata.asScala TestUtils.waitUntilTrue (() => !metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE), s"The topic $topic should not exist") } diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index fdc9a95e57055..2bf8cd4247acb 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -108,11 +108,41 @@ class MetadataRequestTest extends BaseRequestTest { // v0, Doesn't support a "no topics" request // v1, Empty list represents "no topics" - val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava, 1.toShort)) + val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava, true, 1.toShort)) assertTrue("Response should have no errors", metadataResponse.errors.isEmpty) assertTrue("Response should have no topics", metadataResponse.topicMetadata.isEmpty) } + @Test + def testAutoTopicCreation(): Unit = { + def checkAutoCreatedTopic(existingTopic: String, autoCreatedTopic: String, response: MetadataResponse): Unit = { + assertNull(response.errors.get(existingTopic)) + assertEquals(Errors.LEADER_NOT_AVAILABLE, response.errors.get(autoCreatedTopic)) + assertEquals(Some(servers.head.config.numPartitions), zkUtils.getTopicPartitionCount(autoCreatedTopic)) + for (i <- 0 until servers.head.config.numPartitions) + TestUtils.waitUntilMetadataIsPropagated(servers, autoCreatedTopic, i) + } + + val topic1 = "t1" + val topic2 = "t2" + val topic3 = "t3" + val topic4 = "t4" + TestUtils.createTopic(zkUtils, topic1, 1, 1, servers) + + val response1 = sendMetadataRequest(new MetadataRequest(Seq(topic1, topic2).asJava, true, ApiKeys.METADATA.latestVersion)) + checkAutoCreatedTopic(topic1, topic2, response1) + + // V2 doesn't support a configurable allowAutoTopicCreation, so the fact that we set it to `false` has no effect + val response2 = sendMetadataRequest(new MetadataRequest(Seq(topic2, topic3).asJava, false, 2)) + checkAutoCreatedTopic(topic2, topic3, response2) + + // V3 and higher support a configurable allowAutoTopicCreation + val response3 = sendMetadataRequest(new MetadataRequest(Seq(topic3, topic4).asJava, false, 3)) + assertNull(response3.errors.get(topic3)) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic4)) + assertEquals(None, zkUtils.getTopicPartitionCount(topic4)) + } + @Test def testAllTopicsRequest() { // create some topics @@ -120,7 +150,7 @@ class MetadataRequestTest extends BaseRequestTest { TestUtils.createTopic(zkUtils, "t2", 3, 2, servers) // v0, Empty list represents all topics - val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava, 0.toShort)) + val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava, true, 0.toShort)) assertTrue("V0 Response should have no errors", metadataResponseV0.errors.isEmpty) assertEquals("V0 Response should have 2 (all) topics", 2, metadataResponseV0.topicMetadata.size()) @@ -139,7 +169,7 @@ class MetadataRequestTest extends BaseRequestTest { TestUtils.createTopic(zkUtils, replicaDownTopic, 1, replicaCount, servers) // Kill a replica node that is not the leader - val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 1.toShort)) + val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort)) val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head val downNode = servers.find { server => val serverId = server.apis.brokerId @@ -150,14 +180,14 @@ class MetadataRequestTest extends BaseRequestTest { downNode.shutdown() TestUtils.waitUntilTrue(() => { - val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 1.toShort)) + val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort)) val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get replica.host == "" & replica.port == -1 }, "Replica was not found down", 5000) // Validate version 0 still filters unavailable replicas and contains error - val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 0.toShort)) + val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 0.toShort)) val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq assertTrue("Response should have no errors", v0MetadataResponse.errors.isEmpty) assertFalse(s"The downed broker should not be in the brokers list", v0BrokerIds.contains(downNode)) @@ -167,7 +197,7 @@ class MetadataRequestTest extends BaseRequestTest { assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicas.size == replicaCount - 1) // Validate version 1 returns unavailable replicas with no error - val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 1.toShort)) + val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort)) val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq assertTrue("Response should have no errors", v1MetadataResponse.errors.isEmpty) assertFalse(s"The downed broker should not be in the brokers list", v1BrokerIds.contains(downNode)) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index b261cb2ee427f..3b0e93c025e79 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -173,7 +173,7 @@ class RequestQuotaTest extends BaseRequestTest { FetchRequest.Builder.forConsumer(0, 0, partitionMap) case ApiKeys.METADATA => - new MetadataRequest.Builder(List(topic).asJava) + new MetadataRequest.Builder(List(topic).asJava, true) case ApiKeys.LIST_OFFSETS => ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java index 44f7900edffe2..b1c3f2b4c941a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java @@ -101,7 +101,8 @@ public StreamsKafkaClient(final Config streamsConfig) { final Metadata metadata = new Metadata(streamsConfig.getLong( StreamsConfig.RETRY_BACKOFF_MS_CONFIG), - streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG) + streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG), + false ); final List addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), time.milliseconds()); @@ -241,7 +242,8 @@ private String getControllerReadyBrokerId(final MetadataResponse metadata) { private String getAnyReadyBrokerId() { final Metadata metadata = new Metadata( streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG), - streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG)); + streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG), + false); final List addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), Time.SYSTEM.milliseconds()); @@ -289,7 +291,7 @@ public MetadataResponse fetchMetadata() { final ClientRequest clientRequest = kafkaClient.newClientRequest( getAnyReadyBrokerId(), - new MetadataRequest.Builder(null), + MetadataRequest.Builder.allTopics(), Time.SYSTEM.milliseconds(), true); final ClientResponse clientResponse = sendRequest(clientRequest); From 95a87a7e0462a4924f93456488670d2d7173ac99 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 25 May 2017 17:11:10 +0100 Subject: [PATCH 2/5] Fallback to retrieving metadata for all topics when talking to older brokers --- .../kafka/clients/admin/AdminClient.java | 5 ----- .../kafka/clients/admin/KafkaAdminClient.java | 18 ++++++++++++++++-- .../kafka/common/requests/MetadataRequest.java | 9 +++++---- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 8b5e8dc9ed49c..73ea75452abb2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -167,11 +167,6 @@ public DescribeTopicsResult describeTopics(Collection topicNames) { /** * Describe some topics in the cluster. * - * If the Kafka cluster is older than 0.11.0 and the broker config auto.create.topics.enable is true, - * describeTopics(topicName, ...) may create a topic named topicName. - * There are two workarounds that don't involve upgrading the Kafka cluster: disable auto.create.topics.enable on - * every broker or check if the topic is present via AdminClient#listTopics. - * * @param topicNames The names of the topics to describe. * @param options The options to use when describing the topic. * diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index a258e7bbf6221..199b07a755d18 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1144,7 +1144,7 @@ public DescribeTopicsResult describeTopics(final Collection topicNames, final Map> topicFutures = new HashMap<>(topicNames.size()); final ArrayList topicNamesList = new ArrayList<>(); for (String topicName : topicNames) { - if (topicFutures.get(topicName) == null) { + if (!topicFutures.containsKey(topicName)) { topicFutures.put(topicName, new KafkaFutureImpl()); topicNamesList.add(topicName); } @@ -1153,9 +1153,14 @@ public DescribeTopicsResult describeTopics(final Collection topicNames, runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()) { + private boolean supportsDisablingTopicCreation = true; + @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new MetadataRequest.Builder(topicNamesList, false); + if (supportsDisablingTopicCreation) + return new MetadataRequest.Builder(topicNamesList, false); + else + return MetadataRequest.Builder.allTopics(); } @Override @@ -1189,6 +1194,15 @@ void handleResponse(AbstractResponse abstractResponse) { } } + @Override + boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { + if (supportsDisablingTopicCreation) { + supportsDisablingTopicCreation = false; + return true; + } + return false; + } + @Override void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 7d3a32ff1f2c3..ad2dd0dd9d561 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -60,10 +60,11 @@ public boolean isAllTopics() { @Override public MetadataRequest build(short version) { - if (version < 1) { - throw new UnsupportedVersionException("MetadataRequest " + - "versions older than 1 are not supported."); - } + if (version < 1) + throw new UnsupportedVersionException("MetadataRequest versions older than 1 are not supported."); + if (!allowAutoTopicCreation && version < 3 && topics != null && !topics.isEmpty()) + throw new UnsupportedVersionException("MetadataRequest versions older than 3 don't support the " + + "allowAutoTopicCreation field"); return new MetadataRequest(this.topics, allowAutoTopicCreation, version); } From be6cb0d91a870f9768361e223015de73b4384c94 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 31 May 2017 03:55:20 +0100 Subject: [PATCH 3/5] Change allTopics to set allowAutoTopicCreation to true --- .../org/apache/kafka/common/requests/MetadataRequest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index ad2dd0dd9d561..3993e6370bca2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -41,7 +41,9 @@ public static class Builder extends AbstractRequest.Builder { private final boolean allowAutoTopicCreation; public static Builder allTopics() { - return new Builder(ALL_TOPICS, false); + // This never causes auto-creation, but we set the boolean to true because that is the default value when + // deserializing V2 and older. This way, the value is consistent after serialization and deserialization. + return new Builder(ALL_TOPICS, true); } public Builder(List topics, boolean allowAutoTopicCreation) { @@ -62,7 +64,7 @@ public boolean isAllTopics() { public MetadataRequest build(short version) { if (version < 1) throw new UnsupportedVersionException("MetadataRequest versions older than 1 are not supported."); - if (!allowAutoTopicCreation && version < 3 && topics != null && !topics.isEmpty()) + if (!allowAutoTopicCreation && version < 3) throw new UnsupportedVersionException("MetadataRequest versions older than 3 don't support the " + "allowAutoTopicCreation field"); return new MetadataRequest(this.topics, allowAutoTopicCreation, version); From ef816cc5181650bc7964861bed12d75f7776ef98 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 31 May 2017 21:49:41 +0100 Subject: [PATCH 4/5] Bump the version of Metadata request --- .../apache/kafka/common/protocol/Protocol.java | 16 ++++++++++------ .../kafka/common/requests/MetadataRequest.java | 5 +++-- .../common/requests/RequestResponseTest.java | 4 ++++ 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index cb4f9ca81f445..383332b93f00f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -65,11 +65,13 @@ public class Protocol { /* The v2 metadata request is the same as v1. An additional field for cluster id has been added to the v2 metadata response */ public static final Schema METADATA_REQUEST_V2 = METADATA_REQUEST_V1; - /* V3 additions: a field for allowing auto topic creation was added to the request and a field for throttle time has - * been added to the response */ - public static final Schema METADATA_REQUEST_V3 = new Schema(new Field("topics", + /* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */ + public static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2; + + /* The v4 metadata request has an additional field for allowing auto topic creation. The response is the same as v3. */ + public static final Schema METADATA_REQUEST_V4 = new Schema(new Field("topics", ArrayOf.nullable(STRING), - "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."), + "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."), new Field("allow_auto_topic_creation", BOOLEAN, "If this and the broker config 'auto.create.topics.enable' are true, " + @@ -150,8 +152,10 @@ public class Protocol { "The broker id of the controller broker."), new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1))); - public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3}; - public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3}; + public static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3; + + public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4}; + public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4}; /* Produce api */ diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 3993e6370bca2..0493f3d5cf57f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -64,8 +64,8 @@ public boolean isAllTopics() { public MetadataRequest build(short version) { if (version < 1) throw new UnsupportedVersionException("MetadataRequest versions older than 1 are not supported."); - if (!allowAutoTopicCreation && version < 3) - throw new UnsupportedVersionException("MetadataRequest versions older than 3 don't support the " + + if (!allowAutoTopicCreation && version < 4) + throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); return new MetadataRequest(this.topics, allowAutoTopicCreation, version); } @@ -134,6 +134,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { case 2: return new MetadataResponse(Collections.emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas); case 3: + case 4: return new MetadataResponse(throttleTimeMs, Collections.emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 72b8b8b93b68e..a05b680f1805e 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -117,6 +117,10 @@ public void testSerialization() throws Exception { checkErrorResponse(createMetadataRequest(1, singletonList("topic1")), new UnknownServerException()); checkResponse(createMetadataResponse(), 2); checkErrorResponse(createMetadataRequest(2, singletonList("topic1")), new UnknownServerException()); + checkResponse(createMetadataResponse(), 3); + checkErrorResponse(createMetadataRequest(3, singletonList("topic1")), new UnknownServerException()); + checkResponse(createMetadataResponse(), 4); + checkErrorResponse(createMetadataRequest(4, singletonList("topic1")), new UnknownServerException()); checkRequest(createOffsetCommitRequest(2)); checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException()); checkResponse(createOffsetCommitResponse(), 0); From a0476e073631b0a3ee4ca1733780fee2c660ff35 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 31 May 2017 22:59:48 +0100 Subject: [PATCH 5/5] Update `testAutoTopicCreation` to use the new metadata request versions --- .../scala/unit/kafka/server/MetadataRequestTest.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index 2bf8cd4247acb..177a9ee4fd412 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -132,12 +132,12 @@ class MetadataRequestTest extends BaseRequestTest { val response1 = sendMetadataRequest(new MetadataRequest(Seq(topic1, topic2).asJava, true, ApiKeys.METADATA.latestVersion)) checkAutoCreatedTopic(topic1, topic2, response1) - // V2 doesn't support a configurable allowAutoTopicCreation, so the fact that we set it to `false` has no effect - val response2 = sendMetadataRequest(new MetadataRequest(Seq(topic2, topic3).asJava, false, 2)) + // V3 doesn't support a configurable allowAutoTopicCreation, so the fact that we set it to `false` has no effect + val response2 = sendMetadataRequest(new MetadataRequest(Seq(topic2, topic3).asJava, false, 3)) checkAutoCreatedTopic(topic2, topic3, response2) - // V3 and higher support a configurable allowAutoTopicCreation - val response3 = sendMetadataRequest(new MetadataRequest(Seq(topic3, topic4).asJava, false, 3)) + // V4 and higher support a configurable allowAutoTopicCreation + val response3 = sendMetadataRequest(new MetadataRequest(Seq(topic3, topic4).asJava, false, 4)) assertNull(response3.errors.get(topic3)) assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic4)) assertEquals(None, zkUtils.getTopicPartitionCount(topic4))