diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java new file mode 100644 index 0000000000..9d31724d8a --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -0,0 +1,114 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import static io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.TopicKey; +import static org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails; + +import io.streamnative.pulsar.handlers.kop.utils.KopTopic; +import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation; +import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory; +import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; +import org.apache.pulsar.client.admin.PulsarAdmin; + +@Slf4j +class AdminManager { + + private final DelayedOperationPurgatory topicPurgatory = + DelayedOperationPurgatory.builder() + .purgatoryName("topic") + .timeoutTimer(SystemTimer.builder().executorName("topic").build()) + .build(); + + private final PulsarAdmin admin; + + AdminManager(PulsarAdmin admin) { + this.admin = admin; + } + + CompletableFuture> createTopicsAsync(Map createInfo, int timeoutMs) { + final Map> futureMap = new ConcurrentHashMap<>(); + final AtomicInteger numTopics = new AtomicInteger(createInfo.size()); + final CompletableFuture> resultFuture = new CompletableFuture<>(); + + Runnable complete = () -> { + // prevent `futureMap` from being modified by createPartitionedTopicAsync()'s callback + numTopics.set(0); + // complete the pending futures with timeout error + futureMap.values().forEach(future -> { + if (!future.isDone()) { + future.complete(new ApiError(Errors.REQUEST_TIMED_OUT, null)); + } + }); + resultFuture.complete(futureMap.entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().getNow(ApiError.NONE) + ))); + }; + + createInfo.forEach((topic, detail) -> { + final CompletableFuture errorFuture = new CompletableFuture<>(); + futureMap.put(topic, errorFuture); + + KopTopic kopTopic; + try { + kopTopic = new KopTopic(topic); + } catch (RuntimeException e) { + errorFuture.complete(ApiError.fromThrowable(e)); + return; + } + admin.topics().createPartitionedTopicAsync(kopTopic.getFullName(), detail.numPartitions) + .whenComplete((ignored, e) -> { + if (e == null) { + if (log.isDebugEnabled()) { + log.debug("Successfully create topic '{}'", topic); + } + } else { + log.error("Failed to create topic '{}': {}", topic, e); + } + + int restNumTopics = numTopics.decrementAndGet(); + if (restNumTopics < 0) { + return; + } + errorFuture.complete((e == null) ? ApiError.NONE : ApiError.fromThrowable(e)); + if (restNumTopics == 0) { + complete.run(); + } + }); + }); + + if (timeoutMs <= 0) { + complete.run(); + } else { + List delayedCreateKeys = + createInfo.keySet().stream().map(TopicKey::new).collect(Collectors.toList()); + DelayedCreateTopics delayedCreate = new DelayedCreateTopics(timeoutMs, numTopics, complete); + topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys); + } + + return resultFuture; + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedCreateTopics.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedCreateTopics.java new file mode 100644 index 0000000000..ede2f426cb --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedCreateTopics.java @@ -0,0 +1,54 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A delayed create topic operation that is stored in the topic purgatory. + */ +class DelayedCreateTopics extends DelayedOperation { + + private final AtomicInteger numTopics; + private final Runnable callback; + + DelayedCreateTopics(long delayMs, AtomicInteger numTopics, Runnable callback) { + super(delayMs, Optional.empty()); + this.numTopics = numTopics; + this.callback = callback; + } + + @Override + public void onExpiration() { + callback.run(); + } + + @Override + public void onComplete() { + callback.run(); + } + + @Override + public boolean tryComplete() { + if (numTopics.get() <= 0) { + forceComplete(); + return true; + } else { + return false; + } + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 10086c91d2..e6ea1f9ba0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -229,6 +229,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case SASL_AUTHENTICATE: handleSaslAuthenticate(kafkaHeaderAndRequest, responseFuture); break; + case CREATE_TOPICS: + handleCreateTopics(kafkaHeaderAndRequest, responseFuture); + break; default: handleError(kafkaHeaderAndRequest, responseFuture); } @@ -360,6 +363,9 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) { protected abstract void handleSaslHandshake(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + protected abstract void + handleCreateTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + static class KafkaHeaderAndRequest implements Closeable { private static final String DEFAULT_CLIENT_HOST = ""; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 9a4350dcc4..95016e53cc 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -23,6 +23,7 @@ import static io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils.recordsToByteBuf; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; +import static org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -51,6 +52,7 @@ import java.util.Map; import java.util.Optional; import java.util.Queue; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; @@ -80,7 +82,10 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.CreateTopicsRequest; +import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteGroupsRequest; import org.apache.kafka.common.requests.DeleteGroupsResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; @@ -149,6 +154,8 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final ScheduledExecutorService executor; private final PulsarAdmin admin; private final SaslAuthenticator authenticator; + private final AdminManager adminManager; + private final Boolean tlsEnabled; private final String localListeners; private final int plaintextPort; @@ -172,6 +179,7 @@ public KafkaRequestHandler(PulsarService pulsarService, this.authenticator = authenticationEnabled ? new SaslAuthenticator(pulsarService, kafkaConfig.getSaslAllowedMechanisms()) : null; + this.adminManager = new AdminManager(admin); this.tlsEnabled = tlsEnabled; this.localListeners = KafkaProtocolHandler.getListenersFromConfig(kafkaConfig); this.plaintextPort = getListenerPort(localListeners, PLAINTEXT); @@ -1246,6 +1254,38 @@ protected void handleSaslHandshake(KafkaHeaderAndRequest saslHandshake, resultFuture.complete(new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, Collections.emptySet())); } + @Override + protected void handleCreateTopics(KafkaHeaderAndRequest createTopics, + CompletableFuture resultFuture) { + checkArgument(createTopics.getRequest() instanceof CreateTopicsRequest); + CreateTopicsRequest request = (CreateTopicsRequest) createTopics.getRequest(); + + final Map result = new HashMap<>(); + final Map validTopics = new HashMap<>(); + final Set duplicateTopics = request.duplicateTopics(); + + request.topics().forEach((topic, details) -> { + if (!duplicateTopics.contains(topic)) { + validTopics.put(topic, details); + } else { + final String errorMessage = "Create topics request from client `" + createTopics.getHeader().clientId() + + "` contains multiple entries for the following topics: " + duplicateTopics; + result.put(topic, new ApiError(Errors.INVALID_REQUEST, errorMessage)); + } + }); + + if (validTopics.isEmpty()) { + resultFuture.complete(new CreateTopicsResponse(result)); + } else { + // TODO: handle request.validateOnly() + adminManager.createTopicsAsync(validTopics, request.timeout()).thenApply(validResult -> { + result.putAll(validResult); + resultFuture.complete(new CreateTopicsResponse(result)); + return null; + }); + } + } + private SaslHandshakeResponse checkSaslMechanism(String mechanism) { if (getKafkaConfig().getSaslAllowedMechanisms().contains(mechanism)) { return new SaslHandshakeResponse(Errors.NONE, getKafkaConfig().getSaslAllowedMechanisms()); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 3ccca66481..190733ea67 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -34,7 +34,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -43,6 +43,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -53,6 +54,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiVersionsRequest; @@ -63,6 +65,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.broker.protocol.ProtocolHandler; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -119,6 +122,10 @@ protected void setup() throws Exception { new RetentionPolicies(-1, -1)); } + admin.tenants().createTenant("my-tenant", + new TenantInfo(Sets.newHashSet(), Sets.newHashSet(super.configClusterName))); + admin.namespaces().createNamespace("my-tenant/my-ns"); + log.info("created namespaces, init handler"); ProtocolHandler handler1 = pulsar.getProtocolHandlers().protocol("kafka"); @@ -299,23 +306,59 @@ public void testGetKafkaTopicNameFromPulsarTopicName() { assertEquals(localName, getKafkaTopicNameFromPulsarTopicname(topicNamePartition)); } + void createTopicsByKafkaAdmin(AdminClient admin, Map topicToNumPartitions) + throws ExecutionException, InterruptedException { + final short replicationFactor = 1; // replication factor will be ignored + admin.createTopics(topicToNumPartitions.entrySet().stream().map(entry -> { + final String topic = entry.getKey(); + final int numPartitions = entry.getValue(); + return new NewTopic(topic, numPartitions, replicationFactor); + }).collect(Collectors.toList())).all().get(); + } + + void verifyTopicsByPulsarAdmin(Map topicToNumPartitions) + throws PulsarAdminException { + for (Map.Entry entry : topicToNumPartitions.entrySet()) { + final String topic = entry.getKey(); + final int numPartitions = entry.getValue(); + assertEquals(this.admin.topics().getPartitionedTopicMetadata(topic).partitions, numPartitions); + } + } + + @Test(timeOut = 10000) + public void testCreateTopics() throws Exception { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(props); + Map topicToNumPartitions = new HashMap(){{ + put("testCreateTopics-0", 1); + put("testCreateTopics-1", 3); + put("my-tenant/my-ns/testCreateTopics-2", 1); + put("persistent://my-tenant/my-ns/testCreateTopics-3", 5); + }}; + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + verifyTopicsByPulsarAdmin(topicToNumPartitions); + } + @Test(timeOut = 10000) - public void testCreateTopics() throws InterruptedException { + public void testCreateInvalidTopics() { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); @Cleanup AdminClient kafkaAdmin = AdminClient.create(props); - final String topic = "testCreateTopic-0"; - final int numPartitions = 1; - final short replicationFactor = 1; + Map topicToNumPartitions = new HashMap(){{ + put("xxx/testCreateInvalidTopics-0", 1); + }}; try { - kafkaAdmin.createTopics(Collections.singleton(new NewTopic(topic, numPartitions, replicationFactor))).all() - .get(); - } catch (ExecutionException e) { - // TODO: it should fail after CreateTopics was supported, see https://github.com/streamnative/kop/issues/241 - log.info("Failed to create topic '{}': {}", topic, e); - assertTrue(e.getMessage().contains("Not supported by kop server.")); + createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); + fail("create a invalid topic should fail"); + } catch (Exception e) { + log.info("Failed to create topics: {}", topicToNumPartitions); + assertTrue(e.getCause() instanceof TimeoutException); } }