From e5fc3adc9556478fda66e8f03a4657861f1b00dc Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 24 Jun 2021 22:15:22 +0800 Subject: [PATCH 1/3] Fix some bugs for create topic request --- .../pulsar/handlers/kop/AdminManager.java | 41 +++++++++-- .../handlers/kop/KafkaProtocolHandler.java | 2 +- .../handlers/kop/EntryPublishTimeTest.java | 2 +- .../pulsar/handlers/kop/KafkaApisTest.java | 2 +- .../handlers/kop/KafkaRequestHandlerTest.java | 68 ++++++++++++++++--- .../kop/KafkaTopicConsumerManagerTest.java | 2 +- 6 files changed, 99 insertions(+), 18 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 83c3794415..7d50ab4f99 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -28,12 +28,15 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -48,9 +51,11 @@ class AdminManager { .build(); private final PulsarAdmin admin; + private final int defaultNumPartitions; - AdminManager(PulsarAdmin admin) { + public AdminManager(PulsarAdmin admin, KafkaServiceConfiguration conf) { this.admin = admin; + this.defaultNumPartitions = conf.getDefaultNumPartitions(); } public void shutdown() { @@ -81,14 +86,34 @@ CompletableFuture> createTopicsAsync(Map errorFuture = new CompletableFuture<>(); futureMap.put(topic, errorFuture); + Consumer tryComplete = error -> { + errorFuture.complete(error); + int restNumTopics = numTopics.decrementAndGet(); + if (restNumTopics < 0) { + return; + } + if (restNumTopics == 0) { + complete.run(); + } + }; + KopTopic kopTopic; try { kopTopic = new KopTopic(topic); } catch (RuntimeException e) { - errorFuture.complete(ApiError.fromThrowable(e)); + tryComplete.accept(ApiError.fromThrowable(e)); + return; + } + int numPartitions = detail.numPartitions; + if (numPartitions == CreateTopicsRequest.NO_NUM_PARTITIONS) { + numPartitions = defaultNumPartitions; + } + if (numPartitions < 0) { + tryComplete.accept(ApiError.fromThrowable( + new InvalidRequestException("The partition '" + numPartitions + "' is negative"))); return; } - admin.topics().createPartitionedTopicAsync(kopTopic.getFullName(), detail.numPartitions) + admin.topics().createPartitionedTopicAsync(kopTopic.getFullName(), numPartitions) .whenComplete((ignored, e) -> { if (e == null) { if (log.isDebugEnabled()) { @@ -102,9 +127,13 @@ CompletableFuture> createTopicsAsync(Map topicToNumPartitions = new HashMap(){{ - put("xxx/testCreateInvalidTopics-0", 1); - }}; + Map topicToNumPartitions = Collections.singletonMap("xxx/testCreateInvalidTopics-0", 1); + try { + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + fail("create a invalid topic should fail"); + } catch (Exception e) { + log.info("Failed to create topics: {} caused by {}", topicToNumPartitions, e.getCause()); + assertTrue(e.getCause() instanceof UnknownServerException); + } + topicToNumPartitions = Collections.singletonMap("testCreateInvalidTopics-1", -1234); try { createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); fail("create a invalid topic should fail"); } catch (Exception e) { log.info("Failed to create topics: {} caused by {}", topicToNumPartitions, e.getCause()); - final Throwable cause = e.getCause(); - assertTrue(cause instanceof TimeoutException || cause instanceof UnknownServerException); + assertTrue(e.getCause() instanceof InvalidRequestException); + } + } + + @Test(timeOut = 10000) + public void testCreateExistedTopic() { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + final Map topicToNumPartitions = Collections.singletonMap("testCreatedExistedTopic", 1); + try { + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + } catch (ExecutionException | InterruptedException e) { + fail(e.getMessage()); + } + try { + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + fail("Create the existed topic should fail"); + } catch (ExecutionException e) { + log.info("Failed to create existed topic: {}", e.getMessage()); + assertTrue(e.getCause() instanceof TopicExistsException); + } catch (InterruptedException e) { + fail(e.getMessage()); } } + @Test(timeOut = 10000) + public void testCreateTopicWithDefaultPartitions() throws Exception { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + + final String topic = "testCreatedTopicWithDefaultPartitions"; + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + final Map topicToNumPartitions = Collections.singletonMap( + topic, + CreateTopicsRequest.NO_NUM_PARTITIONS); + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, conf.getDefaultNumPartitions()); + } + @Test(timeOut = 10000) public void testDeleteNotExistedTopics() throws Exception { Properties props = new Properties(); @@ -464,6 +514,8 @@ public void testProduceCallback() throws Exception { final int numMessages = 10; final String messagePrefix = "msg-"; + admin.topics().createPartitionedTopic(topic, 1); + final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java index 4a7b9a6c1c..ad151b70c2 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java @@ -68,7 +68,7 @@ protected void setup() throws Exception { GroupCoordinator groupCoordinator = ((KafkaProtocolHandler) handler).getGroupCoordinator(); TransactionCoordinator transactionCoordinator = ((KafkaProtocolHandler) handler).getTransactionCoordinator(); - adminManager = new AdminManager(pulsar.getAdminClient()); + adminManager = new AdminManager(pulsar.getAdminClient(), conf); kafkaRequestHandler = new KafkaRequestHandler( pulsar, (KafkaServiceConfiguration) conf, From 9341f2708b0b69880ddfc6b1b382336e70ea2d5f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 25 Jun 2021 12:14:38 +0800 Subject: [PATCH 2/3] Fix KStreamAggregationTest --- .../pulsar/handlers/kop/AdminManager.java | 36 ++++++++----------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 7d50ab4f99..0d132983e1 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -28,7 +28,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.config.ConfigResource; @@ -86,22 +85,14 @@ CompletableFuture> createTopicsAsync(Map errorFuture = new CompletableFuture<>(); futureMap.put(topic, errorFuture); - Consumer tryComplete = error -> { - errorFuture.complete(error); - int restNumTopics = numTopics.decrementAndGet(); - if (restNumTopics < 0) { - return; - } - if (restNumTopics == 0) { - complete.run(); - } - }; - KopTopic kopTopic; try { kopTopic = new KopTopic(topic); } catch (RuntimeException e) { - tryComplete.accept(ApiError.fromThrowable(e)); + errorFuture.complete(ApiError.fromThrowable(e)); + if (numTopics.decrementAndGet() == 0) { + complete.run(); + } return; } int numPartitions = detail.numPartitions; @@ -109,8 +100,11 @@ CompletableFuture> createTopicsAsync(Map> createTopicsAsync(Map Date: Fri, 25 Jun 2021 12:17:53 +0800 Subject: [PATCH 3/3] Catch KoPTopicException instead of RuntimeException --- .../java/io/streamnative/pulsar/handlers/kop/AdminManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 0d132983e1..b1ba940220 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -16,6 +16,7 @@ import static io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.TopicKey; import static org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails; +import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory; @@ -88,7 +89,7 @@ CompletableFuture> createTopicsAsync(Map