From e1f4efb44150108a5885991d578d5e56a83d8587 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Wed, 27 Jan 2021 09:03:52 +0800 Subject: [PATCH 1/2] support delete topics using the kafka client (#348) fixes https://github.com/streamnative/kop/issues/347 --- .../pulsar/handlers/kop/AdminManager.java | 17 +++++++ .../handlers/kop/KafkaCommandDecoder.java | 6 +++ .../handlers/kop/KafkaRequestHandler.java | 11 +++++ .../handlers/kop/KafkaRequestHandlerTest.java | 44 +++++++++++++++++-- 4 files changed, 74 insertions(+), 4 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 8a8abf5e3d..174d84d1b3 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; @Slf4j class AdminManager { @@ -175,4 +176,20 @@ CompletableFuture> describeC }); return resultFuture; } + + public Map deleteTopics(Set topicsToDelete) { + Map result = new ConcurrentHashMap<>(); + topicsToDelete.forEach(topic -> { + try { + String topicFullName = new KopTopic(topic).getFullName(); + admin.topics().deletePartitionedTopic(topicFullName); + result.put(topic, Errors.NONE); + log.info("delete topic {} successfully.", topicFullName); + } catch (PulsarAdminException e) { + log.error("delete topic {} failed, exception: ", topic, e); + result.put(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION); + } + }); + return result; + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index b9abf5df6f..d0d0b8d583 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -235,6 +235,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case DESCRIBE_CONFIGS: handleDescribeConfigs(kafkaHeaderAndRequest, responseFuture); break; + case DELETE_TOPICS: + handleDeleteTopics(kafkaHeaderAndRequest, responseFuture); + break; default: handleError(kafkaHeaderAndRequest, responseFuture); } @@ -372,6 +375,9 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) { protected abstract void handleDescribeConfigs(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + protected abstract void + handleDeleteTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + static class KafkaHeaderAndRequest implements Closeable { private static final String DEFAULT_CLIENT_HOST = ""; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index d7bad66c24..e8d0a43739 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -77,6 +77,8 @@ import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteGroupsRequest; import org.apache.kafka.common.requests.DeleteGroupsResponse; +import org.apache.kafka.common.requests.DeleteTopicsRequest; +import org.apache.kafka.common.requests.DeleteTopicsResponse; import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; @@ -1207,6 +1209,15 @@ protected void handleDescribeConfigs(KafkaHeaderAndRequest describeConfigs, }); } + @Override + protected void handleDeleteTopics(KafkaHeaderAndRequest deleteTopics, + CompletableFuture resultFuture) { + checkArgument(deleteTopics.getRequest() instanceof DeleteTopicsRequest); + DeleteTopicsRequest request = (DeleteTopicsRequest) deleteTopics.getRequest(); + Set topicsToDelete = request.topics(); + resultFuture.complete(new DeleteTopicsResponse(adminManager.deleteTopics(topicsToDelete))); + } + private SaslHandshakeResponse checkSaslMechanism(String mechanism) { if (getKafkaConfig().getSaslAllowedMechanisms().contains(mechanism)) { return new SaslHandshakeResponse(Errors.NONE, getKafkaConfig().getSaslAllowedMechanisms()); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 323ade2d77..6cfef12327 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -39,6 +39,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -316,7 +317,7 @@ public void testGetKafkaTopicNameFromPulsarTopicName() { assertEquals(localName, getKafkaTopicNameFromPulsarTopicname(topicNamePartition)); } - void createTopicsByKafkaAdmin(AdminClient admin, Map topicToNumPartitions) + private void createTopicsByKafkaAdmin(AdminClient admin, Map topicToNumPartitions) throws ExecutionException, InterruptedException { final short replicationFactor = 1; // replication factor will be ignored admin.createTopics(topicToNumPartitions.entrySet().stream().map(entry -> { @@ -326,7 +327,7 @@ void createTopicsByKafkaAdmin(AdminClient admin, Map topicToNum }).collect(Collectors.toList())).all().get(); } - void verifyTopicsByPulsarAdmin(Map topicToNumPartitions) + private void verifyTopicsCreatedByPulsarAdmin(Map topicToNumPartitions) throws PulsarAdminException { for (Map.Entry entry : topicToNumPartitions.entrySet()) { final String topic = entry.getKey(); @@ -335,8 +336,22 @@ void verifyTopicsByPulsarAdmin(Map topicToNumPartitions) } } + private void verifyTopicsDeletedByPulsarAdmin(Map topicToNumPartitions) + throws PulsarAdminException { + for (Map.Entry entry : topicToNumPartitions.entrySet()) { + final String topic = entry.getKey(); + assertEquals(this.admin.topics().getPartitionedTopicMetadata(topic).partitions, 0); + } + } + + private void deleteTopicsByKafkaAdmin(AdminClient admin, Set topicsToDelete) + throws ExecutionException, InterruptedException { + admin.deleteTopics(topicsToDelete).all().get(); + } + + @Test(timeOut = 10000) - public void testCreateTopics() throws Exception { + public void testCreateAndDeleteTopics() throws Exception { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); @@ -348,8 +363,12 @@ public void testCreateTopics() throws Exception { put("my-tenant/my-ns/testCreateTopics-2", 1); put("persistent://my-tenant/my-ns/testCreateTopics-3", 5); }}; + // create createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); - verifyTopicsByPulsarAdmin(topicToNumPartitions); + verifyTopicsCreatedByPulsarAdmin(topicToNumPartitions); + // delete + deleteTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions.keySet()); + verifyTopicsDeletedByPulsarAdmin(topicToNumPartitions); } @Test(timeOut = 10000) @@ -373,6 +392,23 @@ public void testCreateInvalidTopics() { } } + @Test(timeOut = 10000) + public void testDeleteNotExistedTopics() throws Exception { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + Set topics = new HashSet<>(); + topics.add("testDeleteNotExistedTopics"); + try { + deleteTopicsByKafkaAdmin(kafkaAdmin, topics); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof UnknownTopicOrPartitionException); + } + } + @Test(timeOut = 10000) public void testDescribeTopics() throws Exception { Properties props = new Properties(); From c3119ab956cc118db81afcb2b3666fd4da1b6d86 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 28 Jun 2021 11:01:53 +0800 Subject: [PATCH 2/2] Fix some corner cases not handled well for CreateTopics request (#592) Fixes #591 Kafka Connect use `Admin#createTopics` and check the `TopicExistsException` exception for existed topics. See https://github.com/apache/kafka/blob/ef3cd68c852daf21d8e207fa8e21c8770f4041d7/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L394 for details. However, currently `Admin#createTopics` will return an `UnknownServerException` for any failure when create topics. In addition, the handler for CreateTopics request has other problems. 1. The default partition number (-1) is not handled, which may throw exception from `createPartitionedTopicAsync`. 2. If topic name is invalid, the future won't be completed until timeout. See https://github.com/streamnative/kop/pull/361, TimeoutException will be thrown in this case. 1. When the topic to create already exists, return a `TopicExistsException`. 2. Add a `tryComplete` method to complete a future of topic creation that may also complete the future of response. 3. Add related tests. ===== * Fix some bugs for create topic request * Fix KStreamAggregationTest * Catch KoPTopicException instead of RuntimeException --- .../pulsar/handlers/kop/AdminManager.java | 37 ++++++++--- .../handlers/kop/KafkaProtocolHandler.java | 2 +- .../pulsar/handlers/kop/KafkaApisTest.java | 2 +- .../handlers/kop/KafkaRequestHandlerTest.java | 66 ++++++++++++++++--- .../kop/KafkaTopicConsumerManagerTest.java | 2 +- 5 files changed, 90 insertions(+), 19 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 174d84d1b3..a6f3370f0f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -34,8 +34,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -50,9 +52,11 @@ class AdminManager { .build(); private final PulsarAdmin admin; + private final int defaultNumPartitions; - AdminManager(PulsarAdmin admin) { + public AdminManager(PulsarAdmin admin, KafkaServiceConfiguration conf) { this.admin = admin; + this.defaultNumPartitions = conf.getDefaultNumPartitions(); } public void shutdown() { @@ -88,9 +92,24 @@ CompletableFuture> createTopicsAsync(Map { if (e == null) { if (log.isDebugEnabled()) { @@ -99,13 +118,15 @@ CompletableFuture> createTopicsAsync(Map topicToNumPartitions = new HashMap(){{ - put("xxx/testCreateInvalidTopics-0", 1); - }}; + Map topicToNumPartitions = Collections.singletonMap("xxx/testCreateInvalidTopics-0", 1); + try { + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + fail("create a invalid topic should fail"); + } catch (Exception e) { + log.info("Failed to create topics: {} caused by {}", topicToNumPartitions, e.getCause()); + assertTrue(e.getCause() instanceof UnknownServerException); + } + topicToNumPartitions = Collections.singletonMap("testCreateInvalidTopics-1", -1234); try { createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); fail("create a invalid topic should fail"); } catch (Exception e) { log.info("Failed to create topics: {} caused by {}", topicToNumPartitions, e.getCause()); - final Throwable cause = e.getCause(); - assertTrue(cause instanceof TimeoutException || cause instanceof UnknownServerException); + assertTrue(e.getCause() instanceof InvalidRequestException); + } + } + + @Test(timeOut = 10000) + public void testCreateExistedTopic() { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + final Map topicToNumPartitions = Collections.singletonMap("testCreatedExistedTopic", 1); + try { + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + } catch (ExecutionException | InterruptedException e) { + fail(e.getMessage()); } + try { + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + fail("Create the existed topic should fail"); + } catch (ExecutionException e) { + log.info("Failed to create existed topic: {}", e.getMessage()); + assertTrue(e.getCause() instanceof TopicExistsException); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + } + + @Test(timeOut = 10000) + public void testCreateTopicWithDefaultPartitions() throws Exception { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + + final String topic = "testCreatedTopicWithDefaultPartitions"; + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + final Map topicToNumPartitions = Collections.singletonMap( + topic, + CreateTopicsRequest.NO_NUM_PARTITIONS); + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, conf.getDefaultNumPartitions()); } @Test(timeOut = 10000) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java index f5d9734126..3fa8deb18c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java @@ -60,7 +60,7 @@ protected void setup() throws Exception { ProtocolHandler handler = pulsar.getProtocolHandlers().protocol("kafka"); GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler).getGroupCoordinator(); - adminManager = new AdminManager(pulsar.getAdminClient()); + adminManager = new AdminManager(pulsar.getAdminClient(), conf); kafkaRequestHandler = new KafkaRequestHandler( pulsar, (KafkaServiceConfiguration) conf,