diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index 5da4f2d00d0db..ad21561baf259 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.CreateTopicsOptions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -229,13 +230,20 @@ public Set createTopics(NewTopic... topics) { newlyCreatedTopicNames.add(topic); } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (e.getCause() instanceof TopicExistsException) { + if (cause instanceof TopicExistsException) { log.debug("Found existing topic '{}' on the brokers at {}", topic, bootstrapServers); continue; } if (cause instanceof UnsupportedVersionException) { - log.debug("Unable to use Kafka admin client to create topic descriptions for '{}' using the brokers at {}," + - "falling back to assume topic(s) exist or will be auto-created by the broker", topicNameList, bootstrapServers); + log.debug("Unable to create topic(s) '{}' since the brokers at {} do not support the CreateTopics API.", + " Falling back to assume topic(s) exist or will be auto-created by the broker.", + topicNameList, bootstrapServers); + return Collections.emptySet(); + } + if (cause instanceof ClusterAuthorizationException) { + log.debug("Not authorized to create topic(s) '{}'." + + " Falling back to assume topic(s) exist or will be auto-created by the broker.", + topicNameList, bootstrapServers); return Collections.emptySet(); } if (cause instanceof TimeoutException) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index c58d6741f580f..cda68795689bf 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -60,6 +60,19 @@ public void returnNullWithApiVersionMismatch() { } } + @Test + public void returnNullWithClusterAuthorizationFailure() { + final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); + Cluster cluster = createCluster(1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet()); + env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + boolean created = admin.createTopic(newTopic); + assertFalse(created); + } + } + @Test public void shouldNotCreateTopicWhenItAlreadyExists() { NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); @@ -120,6 +133,10 @@ private CreateTopicsResponse createTopicResponseWithUnsupportedVersion(NewTopic. return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not supported"), topics); } + private CreateTopicsResponse createTopicResponseWithClusterAuthorizationException(NewTopic... topics) { + return createTopicResponse(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics); + } + private CreateTopicsResponse createTopicResponse(ApiError error, NewTopic... topics) { if (error == null) error = new ApiError(Errors.NONE, ""); Map topicResults = new HashMap<>();