From c1093870f72a99d920c7fe5ad43c12709219bfcd Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 3 Dec 2020 17:06:23 +0800 Subject: [PATCH 1/5] Handle DescribeConfig request --- .../pulsar/handlers/kop/AdminManager.java | 36 +++++++++++ .../handlers/kop/KafkaCommandDecoder.java | 6 ++ .../pulsar/handlers/kop/KafkaLogConfig.java | 62 +++++++++++++++++++ .../handlers/kop/KafkaRequestHandler.java | 18 ++++++ 4 files changed, 122 insertions(+) create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaLogConfig.java diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 9d31724d8a..f8f8bd67ef 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -21,16 +21,23 @@ import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory; import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.pulsar.client.admin.PulsarAdmin; @Slf4j @@ -111,4 +118,33 @@ CompletableFuture> createTopicsAsync(Map> describeConfigsAsync( + Map>> resourceToConfigNames) { + // Since Kafka's storage and policies are much different from Pulsar, here we just return a default config + // to avoid some Kafka based systems need to send DescribeConfigs request, like confluent schema registry + Map configMap = resourceToConfigNames.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> { + ConfigResource resource = entry.getKey(); + List entries = new ArrayList<>(); + try { + switch (resource.type()) { + case TOPIC: + KafkaLogConfig.getEntries().forEach((key, value) -> + entries.add(new DescribeConfigsResponse.ConfigEntry(key, value, + DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, + false, false, Collections.emptyList()))); + break; + case BROKER: + throw new RuntimeException("KoP doesn't support resource type: " + resource.type()); + default: + throw new InvalidRequestException("Unsupported resource type: " + resource.type()); + } + return new DescribeConfigsResponse.Config(ApiError.NONE, entries); + } catch (Exception e) { + return new DescribeConfigsResponse.Config(ApiError.fromThrowable(e), Collections.emptyList()); + } + })); + return CompletableFuture.completedFuture(configMap); + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index e6ea1f9ba0..b9abf5df6f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -232,6 +232,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case CREATE_TOPICS: handleCreateTopics(kafkaHeaderAndRequest, responseFuture); break; + case DESCRIBE_CONFIGS: + handleDescribeConfigs(kafkaHeaderAndRequest, responseFuture); + break; default: handleError(kafkaHeaderAndRequest, responseFuture); } @@ -366,6 +369,9 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) { protected abstract void handleCreateTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + protected abstract void + handleDescribeConfigs(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + static class KafkaHeaderAndRequest implements Closeable { private static final String DEFAULT_CLIENT_HOST = ""; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaLogConfig.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaLogConfig.java new file mode 100644 index 0000000000..5cf382aab1 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaLogConfig.java @@ -0,0 +1,62 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.Records; + +/** + * KafkaLogConfig is ported from kafka.log.LogConfig. + */ +public class KafkaLogConfig { + private static final Map entries = defaultEntries(); + + public static Map getEntries() { + return entries; + } + + private static Map defaultEntries() { + return Collections.unmodifiableMap(new HashMap(){{ + put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(1024 * 1024 * 1024)); + put(TopicConfig.SEGMENT_MS_CONFIG, Long.toString(24 * 7 * 7 * 60 * 1000L)); + put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, "0"); + put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, Integer.toString(10 * 1024 * 1024)); + put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(Long.MAX_VALUE)); + put(TopicConfig.FLUSH_MS_CONFIG, Long.toString(Long.MAX_VALUE)); + put(TopicConfig.RETENTION_BYTES_CONFIG, Long.toString(-1L)); + put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(24 * 7 * 60 * 60 * 60 * 1000L)); + put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, Integer.toString(1000000 + Records.LOG_OVERHEAD)); + put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "4096"); + put(TopicConfig.DELETE_RETENTION_MS_CONFIG, Long.toString(24 * 60 * 60 * 1000L)); + put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, Long.toString(0L)); + put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "60000"); + put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.5"); + // Kafka's default value of cleanup.policy is "delete", but here we set it to "compact" because confluent + // schema registry needs this config value. + put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact"); + put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1"); + put(TopicConfig.COMPRESSION_TYPE_CONFIG, "producer"); + put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false"); + put(TopicConfig.PREALLOCATE_CONFIG, "false"); + put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "2.0"); + put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"); + put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.toString(Long.MAX_VALUE)); + put(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, "true"); + }}); + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 95016e53cc..791b680db0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -88,6 +88,8 @@ import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteGroupsRequest; import org.apache.kafka.common.requests.DeleteGroupsResponse; +import org.apache.kafka.common.requests.DescribeConfigsRequest; +import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; import org.apache.kafka.common.requests.DescribeGroupsResponse; import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMember; @@ -1286,6 +1288,22 @@ protected void handleCreateTopics(KafkaHeaderAndRequest createTopics, } } + protected void handleDescribeConfigs(KafkaHeaderAndRequest describeConfigs, + CompletableFuture resultFuture) { + checkArgument(describeConfigs.getRequest() instanceof DescribeConfigsRequest); + DescribeConfigsRequest request = (DescribeConfigsRequest) describeConfigs.getRequest(); + + adminManager.describeConfigsAsync(new ArrayList<>(request.resources()).stream() + .collect(Collectors.toMap( + resource -> resource, + resource -> Optional.of(new HashSet<>(request.configNames(resource))) + )) + ).thenApply(configResourceConfigMap -> { + resultFuture.complete(new DescribeConfigsResponse(0, configResourceConfigMap)); + return null; + }); + } + private SaslHandshakeResponse checkSaslMechanism(String mechanism) { if (getKafkaConfig().getSaslAllowedMechanisms().contains(mechanism)) { return new SaslHandshakeResponse(Errors.NONE, getKafkaConfig().getSaslAllowedMechanisms()); From 8db405f9c36d9e788dea8f78800fed758621a6cf Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 3 Dec 2020 17:43:14 +0800 Subject: [PATCH 2/5] Add check for topic existence --- .../pulsar/handlers/kop/AdminManager.java | 45 +++++++++++++------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index f8f8bd67ef..9adab691b0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -21,7 +21,6 @@ import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory; import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -121,30 +120,50 @@ CompletableFuture> createTopicsAsync(Map> describeConfigsAsync( Map>> resourceToConfigNames) { - // Since Kafka's storage and policies are much different from Pulsar, here we just return a default config - // to avoid some Kafka based systems need to send DescribeConfigs request, like confluent schema registry - Map configMap = resourceToConfigNames.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> { + // Since Kafka's storage and policies are much different from Pulsar, here we just return a default config to + // avoid some Kafka based systems need to send DescribeConfigs request, like confluent schema registry. + final DescribeConfigsResponse.Config defaultTopicConfig = new DescribeConfigsResponse.Config(ApiError.NONE, + KafkaLogConfig.getEntries().entrySet().stream().map(entry -> + new DescribeConfigsResponse.ConfigEntry(entry.getKey(), entry.getValue(), + DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, false, + Collections.emptyList()) + ).collect(Collectors.toList())); + + Map> futureMap = + resourceToConfigNames.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> { ConfigResource resource = entry.getKey(); - List entries = new ArrayList<>(); try { + CompletableFuture future = new CompletableFuture<>(); switch (resource.type()) { case TOPIC: - KafkaLogConfig.getEntries().forEach((key, value) -> - entries.add(new DescribeConfigsResponse.ConfigEntry(key, value, - DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, - false, false, Collections.emptyList()))); + KopTopic kopTopic = new KopTopic(resource.name()); + admin.topics().getPartitionedTopicMetadataAsync(kopTopic.getFullName()) + .whenComplete((ignored, e) -> { + if (e == null) { + future.complete(defaultTopicConfig); + } else { + future.complete(new DescribeConfigsResponse.Config( + ApiError.fromThrowable(e), Collections.emptyList())); + } + }); break; case BROKER: throw new RuntimeException("KoP doesn't support resource type: " + resource.type()); default: throw new InvalidRequestException("Unsupported resource type: " + resource.type()); } - return new DescribeConfigsResponse.Config(ApiError.NONE, entries); + return future; } catch (Exception e) { - return new DescribeConfigsResponse.Config(ApiError.fromThrowable(e), Collections.emptyList()); + return CompletableFuture.completedFuture( + new DescribeConfigsResponse.Config(ApiError.fromThrowable(e), Collections.emptyList())); } })); - return CompletableFuture.completedFuture(configMap); + CompletableFuture> resultFuture = new CompletableFuture<>(); + CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).whenComplete((ignored, e) -> { + resultFuture.complete(futureMap.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getNow(null)) + )); + }); + return resultFuture; } } From 801420b170a85764b1a17e49068d7bbe1e0f5ad7 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 3 Dec 2020 18:14:39 +0800 Subject: [PATCH 3/5] Fix NPE when DescribeConfigsRequest has no config names --- .../streamnative/pulsar/handlers/kop/KafkaRequestHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 791b680db0..57aea1e22c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -1296,7 +1296,7 @@ protected void handleDescribeConfigs(KafkaHeaderAndRequest describeConfigs, adminManager.describeConfigsAsync(new ArrayList<>(request.resources()).stream() .collect(Collectors.toMap( resource -> resource, - resource -> Optional.of(new HashSet<>(request.configNames(resource))) + resource -> Optional.ofNullable(request.configNames(resource)).map(HashSet::new) )) ).thenApply(configResourceConfigMap -> { resultFuture.complete(new DescribeConfigsResponse(0, configResourceConfigMap)); From d9811be98429af59f321d346618a806aedb27159 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 3 Dec 2020 18:32:38 +0800 Subject: [PATCH 4/5] Check for non-existed topic --- .../pulsar/handlers/kop/AdminManager.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 9adab691b0..253a458b04 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -138,12 +138,17 @@ CompletableFuture> describeC case TOPIC: KopTopic kopTopic = new KopTopic(resource.name()); admin.topics().getPartitionedTopicMetadataAsync(kopTopic.getFullName()) - .whenComplete((ignored, e) -> { - if (e == null) { + .whenComplete((metadata, e) -> { + if (e != null) { + future.complete(new DescribeConfigsResponse.Config( + ApiError.fromThrowable(e), Collections.emptyList())); + } else if (metadata.partitions > 0) { future.complete(defaultTopicConfig); } else { + final ApiError error = new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, + "Topic " + kopTopic.getOriginalName() + " doesn't exist"); future.complete(new DescribeConfigsResponse.Config( - ApiError.fromThrowable(e), Collections.emptyList())); + error, Collections.emptyList())); } }); break; From 8c0080409f75c46ec1522507dc37c87c2f3b5047 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 3 Dec 2020 18:48:25 +0800 Subject: [PATCH 5/5] Add unit test for DescribeConfigsRequest --- .../handlers/kop/KafkaRequestHandlerTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 190733ea67..bb0216efc7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -34,6 +34,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -54,7 +55,9 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Node; +import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiVersionsRequest; @@ -362,6 +365,34 @@ public void testCreateInvalidTopics() { } } + @Test(timeOut = 10000) + public void testDescribeConfigs() throws Exception { + final String topic = "testDescribeConfigs"; + admin.topics().createPartitionedTopic(topic, 1); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + final Map entries = KafkaLogConfig.getEntries(); + + kafkaAdmin.describeConfigs(Collections.singletonList(new ConfigResource(ConfigResource.Type.TOPIC, topic))) + .all().get().forEach((resource, config) -> { + assertEquals(resource.name(), topic); + config.entries().forEach(entry -> assertEquals(entry.value(), entries.get(entry.name()))); + }); + + final String invalidTopic = "invalid-topic"; + try { + kafkaAdmin.describeConfigs(Collections.singletonList( + new ConfigResource(ConfigResource.Type.TOPIC, invalidTopic))).all().get(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof UnknownTopicOrPartitionException); + assertTrue(e.getMessage().contains("Topic " + invalidTopic + " doesn't exist")); + } + } + @Test(timeOut = 10000) public void testProduceCallback() throws Exception { final String topic = "test-produce-callback";