diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 83c3794415..b1ba940220 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -16,6 +16,7 @@ import static io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.TopicKey; import static org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails; +import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory; @@ -32,8 +33,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -48,9 +51,11 @@ class AdminManager { .build(); private final PulsarAdmin admin; + private final int defaultNumPartitions; - AdminManager(PulsarAdmin admin) { + public AdminManager(PulsarAdmin admin, KafkaServiceConfiguration conf) { this.admin = admin; + this.defaultNumPartitions = conf.getDefaultNumPartitions(); } public void shutdown() { @@ -84,11 +89,26 @@ CompletableFuture> createTopicsAsync(Map { if (e == null) { if (log.isDebugEnabled()) { @@ -97,13 +117,15 @@ CompletableFuture> createTopicsAsync(Map topicToNumPartitions = new HashMap(){{ - put("xxx/testCreateInvalidTopics-0", 1); - }}; + Map topicToNumPartitions = Collections.singletonMap("xxx/testCreateInvalidTopics-0", 1); + try { + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + fail("create a invalid topic should fail"); + } catch (Exception e) { + log.info("Failed to create topics: {} caused by {}", topicToNumPartitions, e.getCause()); + assertTrue(e.getCause() instanceof UnknownServerException); + } + topicToNumPartitions = Collections.singletonMap("testCreateInvalidTopics-1", -1234); try { createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); fail("create a invalid topic should fail"); } catch (Exception e) { log.info("Failed to create topics: {} caused by {}", topicToNumPartitions, e.getCause()); - final Throwable cause = e.getCause(); - assertTrue(cause instanceof TimeoutException || cause instanceof UnknownServerException); + assertTrue(e.getCause() instanceof InvalidRequestException); + } + } + + @Test(timeOut = 10000) + public void testCreateExistedTopic() { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + final Map topicToNumPartitions = Collections.singletonMap("testCreatedExistedTopic", 1); + try { + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + } catch (ExecutionException | InterruptedException e) { + fail(e.getMessage()); + } + try { + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + fail("Create the existed topic should fail"); + } catch (ExecutionException e) { + log.info("Failed to create existed topic: {}", e.getMessage()); + assertTrue(e.getCause() instanceof TopicExistsException); + } catch (InterruptedException e) { + fail(e.getMessage()); } } + @Test(timeOut = 10000) + public void testCreateTopicWithDefaultPartitions() throws Exception { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + + final String topic = "testCreatedTopicWithDefaultPartitions"; + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + final Map topicToNumPartitions = Collections.singletonMap( + topic, + CreateTopicsRequest.NO_NUM_PARTITIONS); + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, conf.getDefaultNumPartitions()); + } + @Test(timeOut = 10000) public void testDeleteNotExistedTopics() throws Exception { Properties props = new Properties(); @@ -464,6 +514,8 @@ public void testProduceCallback() throws Exception { final int numMessages = 10; final String messagePrefix = "msg-"; + admin.topics().createPartitionedTopic(topic, 1); + final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java index 4a7b9a6c1c..ad151b70c2 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java @@ -68,7 +68,7 @@ protected void setup() throws Exception { GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler).getGroupCoordinator(); TransactionCoordinator transactionCoordinator = ((KafkaProtocolHandler) handler).getTransactionCoordinator(); - adminManager = new AdminManager(pulsar.getAdminClient()); + adminManager = new AdminManager(pulsar.getAdminClient(), conf); kafkaRequestHandler = new KafkaRequestHandler( pulsar, (KafkaServiceConfiguration) conf,