diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 8a8abf5e3d..a6f3370f0f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -34,10 +34,13 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; @Slf4j class AdminManager { @@ -49,9 +52,11 @@ class AdminManager { .build(); private final PulsarAdmin admin; + private final int defaultNumPartitions; - AdminManager(PulsarAdmin admin) { + public AdminManager(PulsarAdmin admin, KafkaServiceConfiguration conf) { this.admin = admin; + this.defaultNumPartitions = conf.getDefaultNumPartitions(); } public void shutdown() { @@ -87,9 +92,24 @@ CompletableFuture> createTopicsAsync(Map { if (e == null) { if (log.isDebugEnabled()) { @@ -98,13 +118,15 @@ CompletableFuture> createTopicsAsync(Map> describeC }); return resultFuture; } + + public Map deleteTopics(Set topicsToDelete) { + Map result = new ConcurrentHashMap<>(); + topicsToDelete.forEach(topic -> { + try { + String topicFullName = new KopTopic(topic).getFullName(); + admin.topics().deletePartitionedTopic(topicFullName); + result.put(topic, Errors.NONE); + log.info("delete topic {} successfully.", topicFullName); + } catch (PulsarAdminException e) { + log.error("delete topic {} failed, exception: ", topic, e); + result.put(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION); + } + }); + return result; + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index b9abf5df6f..d0d0b8d583 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -235,6 +235,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case DESCRIBE_CONFIGS: handleDescribeConfigs(kafkaHeaderAndRequest, responseFuture); break; + case DELETE_TOPICS: + handleDeleteTopics(kafkaHeaderAndRequest, responseFuture); + break; default: handleError(kafkaHeaderAndRequest, responseFuture); } @@ -372,6 +375,9 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) { protected abstract void handleDescribeConfigs(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + protected abstract void + handleDeleteTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + static class KafkaHeaderAndRequest implements Closeable { private static final String DEFAULT_CLIENT_HOST = ""; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 44b5e43970..e6b3d1a3f1 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -241,7 +241,7 @@ public void start(BrokerService service) { KopVersion.getBuildTime()); try { - adminManager = new AdminManager(brokerService.getPulsar().getAdminClient()); + adminManager = new AdminManager(brokerService.getPulsar().getAdminClient(), kafkaConfig); } catch (PulsarServerException e) { log.error("Failed to create PulsarAdmin: {}", e.getMessage()); throw new IllegalStateException(e); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index d7bad66c24..e8d0a43739 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -77,6 +77,8 @@ import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteGroupsRequest; import org.apache.kafka.common.requests.DeleteGroupsResponse; +import org.apache.kafka.common.requests.DeleteTopicsRequest; +import org.apache.kafka.common.requests.DeleteTopicsResponse; import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; @@ -1207,6 +1209,15 @@ protected void handleDescribeConfigs(KafkaHeaderAndRequest describeConfigs, }); } + @Override + protected void handleDeleteTopics(KafkaHeaderAndRequest deleteTopics, + CompletableFuture resultFuture) { + checkArgument(deleteTopics.getRequest() instanceof DeleteTopicsRequest); + DeleteTopicsRequest request = (DeleteTopicsRequest) deleteTopics.getRequest(); + Set topicsToDelete = request.topics(); + resultFuture.complete(new DeleteTopicsResponse(adminManager.deleteTopics(topicsToDelete))); + } + private SaslHandshakeResponse checkSaslMechanism(String mechanism) { if (getKafkaConfig().getSaslAllowedMechanisms().contains(mechanism)) { return new SaslHandshakeResponse(Errors.NONE, getKafkaConfig().getSaslAllowedMechanisms()); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java index 6b73ed3cfd..9943da07af 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java @@ -141,7 +141,7 @@ protected void setup() throws Exception { ProtocolHandler handler = pulsar.getProtocolHandlers().protocol("kafka"); GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler).getGroupCoordinator(); - adminManager = new AdminManager(pulsar.getAdminClient()); + adminManager = new AdminManager(pulsar.getAdminClient(), conf); kafkaRequestHandler = new KafkaRequestHandler( pulsar, (KafkaServiceConfiguration) conf, diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 323ade2d77..73463943e6 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -39,6 +39,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -57,13 +58,15 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Node; import org.apache.kafka.common.config.ConfigResource; -import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.ResponseHeader; @@ -96,6 +99,7 @@ public class KafkaRequestHandlerTest extends KopProtocolHandlerTestBase { @BeforeMethod @Override protected void setup() throws Exception { + conf.setDefaultNumPartitions(2); super.internalSetup(); log.info("success internal setup"); @@ -137,7 +141,7 @@ protected void setup() throws Exception { ProtocolHandler handler1 = pulsar.getProtocolHandlers().protocol("kafka"); GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler1).getGroupCoordinator(); - adminManager = new AdminManager(pulsar.getAdminClient()); + adminManager = new AdminManager(pulsar.getAdminClient(), conf); handler = new KafkaRequestHandler( pulsar, (KafkaServiceConfiguration) conf, @@ -316,7 +320,7 @@ public void testGetKafkaTopicNameFromPulsarTopicName() { assertEquals(localName, getKafkaTopicNameFromPulsarTopicname(topicNamePartition)); } - void createTopicsByKafkaAdmin(AdminClient admin, Map topicToNumPartitions) + private void createTopicsByKafkaAdmin(AdminClient admin, Map topicToNumPartitions) throws ExecutionException, InterruptedException { final short replicationFactor = 1; // replication factor will be ignored admin.createTopics(topicToNumPartitions.entrySet().stream().map(entry -> { @@ -326,7 +330,7 @@ void createTopicsByKafkaAdmin(AdminClient admin, Map topicToNum }).collect(Collectors.toList())).all().get(); } - void verifyTopicsByPulsarAdmin(Map topicToNumPartitions) + private void verifyTopicsCreatedByPulsarAdmin(Map topicToNumPartitions) throws PulsarAdminException { for (Map.Entry entry : topicToNumPartitions.entrySet()) { final String topic = entry.getKey(); @@ -335,8 +339,22 @@ void verifyTopicsByPulsarAdmin(Map topicToNumPartitions) } } + private void verifyTopicsDeletedByPulsarAdmin(Map topicToNumPartitions) + throws PulsarAdminException { + for (Map.Entry entry : topicToNumPartitions.entrySet()) { + final String topic = entry.getKey(); + assertEquals(this.admin.topics().getPartitionedTopicMetadata(topic).partitions, 0); + } + } + + private void deleteTopicsByKafkaAdmin(AdminClient admin, Set topicsToDelete) + throws ExecutionException, InterruptedException { + admin.deleteTopics(topicsToDelete).all().get(); + } + + @Test(timeOut = 10000) - public void testCreateTopics() throws Exception { + public void testCreateAndDeleteTopics() throws Exception { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); @@ -348,11 +366,15 @@ public void testCreateTopics() throws Exception { put("my-tenant/my-ns/testCreateTopics-2", 1); put("persistent://my-tenant/my-ns/testCreateTopics-3", 5); }}; + // create createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); - verifyTopicsByPulsarAdmin(topicToNumPartitions); + verifyTopicsCreatedByPulsarAdmin(topicToNumPartitions); + // delete + deleteTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions.keySet()); + verifyTopicsDeletedByPulsarAdmin(topicToNumPartitions); } - @Test(timeOut = 10000) + @Test(timeOut = 20000) public void testCreateInvalidTopics() { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); @@ -360,16 +382,80 @@ public void testCreateInvalidTopics() { @Cleanup AdminClient kafkaAdmin = AdminClient.create(props); - Map topicToNumPartitions = new HashMap(){{ - put("xxx/testCreateInvalidTopics-0", 1); - }}; + Map topicToNumPartitions = Collections.singletonMap("xxx/testCreateInvalidTopics-0", 1); try { createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); fail("create a invalid topic should fail"); } catch (Exception e) { log.info("Failed to create topics: {} caused by {}", topicToNumPartitions, e.getCause()); - final Throwable cause = e.getCause(); - assertTrue(cause instanceof TimeoutException || cause instanceof UnknownServerException); + assertTrue(e.getCause() instanceof UnknownServerException); + } + topicToNumPartitions = Collections.singletonMap("testCreateInvalidTopics-1", -1234); + try { + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + fail("create a invalid topic should fail"); + } catch (Exception e) { + log.info("Failed to create topics: {} caused by {}", topicToNumPartitions, e.getCause()); + assertTrue(e.getCause() instanceof InvalidRequestException); + } + } + + @Test(timeOut = 10000) + public void testCreateExistedTopic() { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + final Map topicToNumPartitions = Collections.singletonMap("testCreatedExistedTopic", 1); + try { + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + } catch (ExecutionException | InterruptedException e) { + fail(e.getMessage()); + } + try { + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + fail("Create the existed topic should fail"); + } catch (ExecutionException e) { + log.info("Failed to create existed topic: {}", e.getMessage()); + assertTrue(e.getCause() instanceof TopicExistsException); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + } + + @Test(timeOut = 10000) + public void testCreateTopicWithDefaultPartitions() throws Exception { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + + final String topic = "testCreatedTopicWithDefaultPartitions"; + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + final Map topicToNumPartitions = Collections.singletonMap( + topic, + CreateTopicsRequest.NO_NUM_PARTITIONS); + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, conf.getDefaultNumPartitions()); + } + + @Test(timeOut = 10000) + public void testDeleteNotExistedTopics() throws Exception { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + Set topics = new HashSet<>(); + topics.add("testDeleteNotExistedTopics"); + try { + deleteTopicsByKafkaAdmin(kafkaAdmin, topics); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof UnknownTopicOrPartitionException); } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java index f5d9734126..3fa8deb18c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java @@ -60,7 +60,7 @@ protected void setup() throws Exception { ProtocolHandler handler = pulsar.getProtocolHandlers().protocol("kafka"); GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler).getGroupCoordinator(); - adminManager = new AdminManager(pulsar.getAdminClient()); + adminManager = new AdminManager(pulsar.getAdminClient(), conf); kafkaRequestHandler = new KafkaRequestHandler( pulsar, (KafkaServiceConfiguration) conf,