diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 9d31724d8a..253a458b04 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -21,16 +21,22 @@ import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory; import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.pulsar.client.admin.PulsarAdmin; @Slf4j @@ -111,4 +117,58 @@ CompletableFuture> createTopicsAsync(Map> describeConfigsAsync( + Map>> resourceToConfigNames) { + // Since Kafka's storage and policies are much different from Pulsar, here we just return a default config to + // avoid some Kafka based systems need to send DescribeConfigs request, like confluent schema registry. + final DescribeConfigsResponse.Config defaultTopicConfig = new DescribeConfigsResponse.Config(ApiError.NONE, + KafkaLogConfig.getEntries().entrySet().stream().map(entry -> + new DescribeConfigsResponse.ConfigEntry(entry.getKey(), entry.getValue(), + DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, false, + Collections.emptyList()) + ).collect(Collectors.toList())); + + Map> futureMap = + resourceToConfigNames.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> { + ConfigResource resource = entry.getKey(); + try { + CompletableFuture future = new CompletableFuture<>(); + switch (resource.type()) { + case TOPIC: + KopTopic kopTopic = new KopTopic(resource.name()); + admin.topics().getPartitionedTopicMetadataAsync(kopTopic.getFullName()) + .whenComplete((metadata, e) -> { + if (e != null) { + future.complete(new DescribeConfigsResponse.Config( + ApiError.fromThrowable(e), Collections.emptyList())); + } else if (metadata.partitions > 0) { + future.complete(defaultTopicConfig); + } else { + final ApiError error = new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, + "Topic " + kopTopic.getOriginalName() + " doesn't exist"); + future.complete(new DescribeConfigsResponse.Config( + error, Collections.emptyList())); + } + }); + break; + case BROKER: + throw new RuntimeException("KoP doesn't support resource type: " + resource.type()); + default: + throw new InvalidRequestException("Unsupported resource type: " + resource.type()); + } + return future; + } catch (Exception e) { + return CompletableFuture.completedFuture( + new DescribeConfigsResponse.Config(ApiError.fromThrowable(e), Collections.emptyList())); + } + })); + CompletableFuture> resultFuture = new CompletableFuture<>(); + CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).whenComplete((ignored, e) -> { + resultFuture.complete(futureMap.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getNow(null)) + )); + }); + return resultFuture; + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index e6ea1f9ba0..b9abf5df6f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -232,6 +232,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case CREATE_TOPICS: handleCreateTopics(kafkaHeaderAndRequest, responseFuture); break; + case DESCRIBE_CONFIGS: + handleDescribeConfigs(kafkaHeaderAndRequest, responseFuture); + break; default: handleError(kafkaHeaderAndRequest, responseFuture); } @@ -366,6 +369,9 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) { protected abstract void handleCreateTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + protected abstract void + handleDescribeConfigs(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + static class KafkaHeaderAndRequest implements Closeable { private static final String DEFAULT_CLIENT_HOST = ""; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaLogConfig.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaLogConfig.java new file mode 100644 index 0000000000..5cf382aab1 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaLogConfig.java @@ -0,0 +1,62 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.Records; + +/** + * KafkaLogConfig is ported from kafka.log.LogConfig. + */ +public class KafkaLogConfig { + private static final Map entries = defaultEntries(); + + public static Map getEntries() { + return entries; + } + + private static Map defaultEntries() { + return Collections.unmodifiableMap(new HashMap(){{ + put(TopicConfig.SEGMENT_BYTES_CONFIG, Integer.toString(1024 * 1024 * 1024)); + put(TopicConfig.SEGMENT_MS_CONFIG, Long.toString(24 * 7 * 7 * 60 * 1000L)); + put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, "0"); + put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, Integer.toString(10 * 1024 * 1024)); + put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, Long.toString(Long.MAX_VALUE)); + put(TopicConfig.FLUSH_MS_CONFIG, Long.toString(Long.MAX_VALUE)); + put(TopicConfig.RETENTION_BYTES_CONFIG, Long.toString(-1L)); + put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(24 * 7 * 60 * 60 * 60 * 1000L)); + put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, Integer.toString(1000000 + Records.LOG_OVERHEAD)); + put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "4096"); + put(TopicConfig.DELETE_RETENTION_MS_CONFIG, Long.toString(24 * 60 * 60 * 1000L)); + put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, Long.toString(0L)); + put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "60000"); + put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.5"); + // Kafka's default value of cleanup.policy is "delete", but here we set it to "compact" because confluent + // schema registry needs this config value. + put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact"); + put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1"); + put(TopicConfig.COMPRESSION_TYPE_CONFIG, "producer"); + put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false"); + put(TopicConfig.PREALLOCATE_CONFIG, "false"); + put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "2.0"); + put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"); + put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.toString(Long.MAX_VALUE)); + put(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, "true"); + }}); + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 95016e53cc..57aea1e22c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -88,6 +88,8 @@ import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteGroupsRequest; import org.apache.kafka.common.requests.DeleteGroupsResponse; +import org.apache.kafka.common.requests.DescribeConfigsRequest; +import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; import org.apache.kafka.common.requests.DescribeGroupsResponse; import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMember; @@ -1286,6 +1288,22 @@ protected void handleCreateTopics(KafkaHeaderAndRequest createTopics, } } + protected void handleDescribeConfigs(KafkaHeaderAndRequest describeConfigs, + CompletableFuture resultFuture) { + checkArgument(describeConfigs.getRequest() instanceof DescribeConfigsRequest); + DescribeConfigsRequest request = (DescribeConfigsRequest) describeConfigs.getRequest(); + + adminManager.describeConfigsAsync(new ArrayList<>(request.resources()).stream() + .collect(Collectors.toMap( + resource -> resource, + resource -> Optional.ofNullable(request.configNames(resource)).map(HashSet::new) + )) + ).thenApply(configResourceConfigMap -> { + resultFuture.complete(new DescribeConfigsResponse(0, configResourceConfigMap)); + return null; + }); + } + private SaslHandshakeResponse checkSaslMechanism(String mechanism) { if (getKafkaConfig().getSaslAllowedMechanisms().contains(mechanism)) { return new SaslHandshakeResponse(Errors.NONE, getKafkaConfig().getSaslAllowedMechanisms()); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 190733ea67..bb0216efc7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -34,6 +34,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -54,7 +55,9 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Node; +import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiVersionsRequest; @@ -362,6 +365,34 @@ public void testCreateInvalidTopics() { } } + @Test(timeOut = 10000) + public void testDescribeConfigs() throws Exception { + final String topic = "testDescribeConfigs"; + admin.topics().createPartitionedTopic(topic, 1); + + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + final Map entries = KafkaLogConfig.getEntries(); + + kafkaAdmin.describeConfigs(Collections.singletonList(new ConfigResource(ConfigResource.Type.TOPIC, topic))) + .all().get().forEach((resource, config) -> { + assertEquals(resource.name(), topic); + config.entries().forEach(entry -> assertEquals(entry.value(), entries.get(entry.name()))); + }); + + final String invalidTopic = "invalid-topic"; + try { + kafkaAdmin.describeConfigs(Collections.singletonList( + new ConfigResource(ConfigResource.Type.TOPIC, invalidTopic))).all().get(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof UnknownTopicOrPartitionException); + assertTrue(e.getMessage().contains("Topic " + invalidTopic + " doesn't exist")); + } + } + @Test(timeOut = 10000) public void testProduceCallback() throws Exception { final String topic = "test-produce-callback";