From 940012dd70df992c10e9af6fc41feecbf4af062c Mon Sep 17 00:00:00 2001 From: Gavrie Philipson Date: Wed, 22 Nov 2017 08:56:28 +0200 Subject: [PATCH] KAFKA-6250: Use existing Kafka Connect internal topics without requiring ACL When using Kafka Connect with a cluster that doesn't allow the user to create topics (due to ACL configuration), Connect fails when trying to create its internal topics even if these topics already exist. This is incorrect behavior according to the documentation, which mentions that R/W access should be enough. This happens specifically when using Aiven Kafka, which does not permit creation of topics via the Kafka Admin Client API. The patch ignores the returned error, similar to the behavior for older brokers that don't support the API. --- .../apache/kafka/connect/util/TopicAdmin.java | 14 +++++++++++--- .../kafka/connect/util/TopicAdminTest.java | 17 +++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index 5da4f2d00d0db..ad21561baf259 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.CreateTopicsOptions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -229,13 +230,20 @@ public Set createTopics(NewTopic... topics) { newlyCreatedTopicNames.add(topic); } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (e.getCause() instanceof TopicExistsException) { + if (cause instanceof TopicExistsException) { log.debug("Found existing topic '{}' on the brokers at {}", topic, bootstrapServers); continue; } if (cause instanceof UnsupportedVersionException) { - log.debug("Unable to use Kafka admin client to create topic descriptions for '{}' using the brokers at {}," + - "falling back to assume topic(s) exist or will be auto-created by the broker", topicNameList, bootstrapServers); + log.debug("Unable to create topic(s) '{}' since the brokers at {} do not support the CreateTopics API.", + " Falling back to assume topic(s) exist or will be auto-created by the broker.", + topicNameList, bootstrapServers); + return Collections.emptySet(); + } + if (cause instanceof ClusterAuthorizationException) { + log.debug("Not authorized to create topic(s) '{}'." + + " Falling back to assume topic(s) exist or will be auto-created by the broker.", + topicNameList, bootstrapServers); return Collections.emptySet(); } if (cause instanceof TimeoutException) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index c58d6741f580f..cda68795689bf 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -60,6 +60,19 @@ public void returnNullWithApiVersionMismatch() { } } + @Test + public void returnNullWithClusterAuthorizationFailure() { + final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); + Cluster cluster = createCluster(1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet()); + env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + boolean created = admin.createTopic(newTopic); + assertFalse(created); + } + } + @Test public void shouldNotCreateTopicWhenItAlreadyExists() { NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); @@ -120,6 +133,10 @@ private CreateTopicsResponse createTopicResponseWithUnsupportedVersion(NewTopic. return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not supported"), topics); } + private CreateTopicsResponse createTopicResponseWithClusterAuthorizationException(NewTopic... topics) { + return createTopicResponse(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics); + } + private CreateTopicsResponse createTopicResponse(ApiError error, NewTopic... topics) { if (error == null) error = new ApiError(Errors.NONE, ""); Map topicResults = new HashMap<>();