Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand Down Expand Up @@ -229,13 +230,20 @@ public Set<String> createTopics(NewTopic... topics) {
newlyCreatedTopicNames.add(topic);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (e.getCause() instanceof TopicExistsException) {
if (cause instanceof TopicExistsException) {
log.debug("Found existing topic '{}' on the brokers at {}", topic, bootstrapServers);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're in here anyway, can we fix the if in the line above to use cause instead of e.getCause()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

continue;
}
if (cause instanceof UnsupportedVersionException) {
log.debug("Unable to use Kafka admin client to create topic descriptions for '{}' using the brokers at {}," +
"falling back to assume topic(s) exist or will be auto-created by the broker", topicNameList, bootstrapServers);
log.debug("Unable to create topic(s) '{}' since the brokers at {} do not support the CreateTopics API.",
" Falling back to assume topic(s) exist or will be auto-created by the broker.",
topicNameList, bootstrapServers);
return Collections.emptySet();
}
if (cause instanceof ClusterAuthorizationException) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, a bit late to the party, but if Kafka Connect can't create the topic but can still read / write to it, it should also have the describe rights. In which case, we can check if the topic exist using a describe?

I feel that right now this might be introducing a bug. Say the Kafka Connect isn't authorized to create a topic and the topic doesn't exist, then it will still go on with the execution

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point. I didn't think about it, but in fact if the user doesn't have permission to create the topic through the CreateTopic API, then they won't be able to use auto-creation either. I think it would be clearer to check topic existence prior to attempting creation. Then if topic creation fails due to a permission error, perhaps we should make the error fatal.

@rhauch what do you think?

log.debug("Not authorized to create topic(s) '{}'." +
" Falling back to assume topic(s) exist or will be auto-created by the broker.",
topicNameList, bootstrapServers);
return Collections.emptySet();
}
if (cause instanceof TimeoutException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ public void returnNullWithApiVersionMismatch() {
}
}

@Test
public void returnNullWithClusterAuthorizationFailure() {
final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
TopicAdmin admin = new TopicAdmin(null, env.adminClient());
boolean created = admin.createTopic(newTopic);
assertFalse(created);
}
}

@Test
public void shouldNotCreateTopicWhenItAlreadyExists() {
NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Expand Down Expand Up @@ -120,6 +133,10 @@ private CreateTopicsResponse createTopicResponseWithUnsupportedVersion(NewTopic.
return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not supported"), topics);
}

private CreateTopicsResponse createTopicResponseWithClusterAuthorizationException(NewTopic... topics) {
return createTopicResponse(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics);
}

private CreateTopicsResponse createTopicResponse(ApiError error, NewTopic... topics) {
if (error == null) error = new ApiError(Errors.NONE, "");
Map<String, ApiError> topicResults = new HashMap<>();
Expand Down