Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop;

import static io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.TopicKey;
import static org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails;

import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.pulsar.client.admin.PulsarAdmin;

@Slf4j
class AdminManager {

private final DelayedOperationPurgatory<DelayedOperation> topicPurgatory =
DelayedOperationPurgatory.<DelayedOperation>builder()
.purgatoryName("topic")
.timeoutTimer(SystemTimer.builder().executorName("topic").build())
.build();

private final PulsarAdmin admin;

AdminManager(PulsarAdmin admin) {
this.admin = admin;
}

CompletableFuture<Map<String, ApiError>> createTopicsAsync(Map<String, TopicDetails> createInfo, int timeoutMs) {
final Map<String, CompletableFuture<ApiError>> futureMap = new ConcurrentHashMap<>();
final AtomicInteger numTopics = new AtomicInteger(createInfo.size());
final CompletableFuture<Map<String, ApiError>> resultFuture = new CompletableFuture<>();

Runnable complete = () -> {
// prevent `futureMap` from being modified by createPartitionedTopicAsync()'s callback
numTopics.set(0);
// complete the pending futures with timeout error
futureMap.values().forEach(future -> {
if (!future.isDone()) {
future.complete(new ApiError(Errors.REQUEST_TIMED_OUT, null));
}
});
resultFuture.complete(futureMap.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().getNow(ApiError.NONE)
)));
};

createInfo.forEach((topic, detail) -> {
final CompletableFuture<ApiError> errorFuture = new CompletableFuture<>();
futureMap.put(topic, errorFuture);

KopTopic kopTopic;
try {
kopTopic = new KopTopic(topic);
} catch (RuntimeException e) {
errorFuture.complete(ApiError.fromThrowable(e));
return;
}
admin.topics().createPartitionedTopicAsync(kopTopic.getFullName(), detail.numPartitions)
.whenComplete((ignored, e) -> {
if (e == null) {
if (log.isDebugEnabled()) {
log.debug("Successfully create topic '{}'", topic);
}
} else {
log.error("Failed to create topic '{}': {}", topic, e);
}

int restNumTopics = numTopics.decrementAndGet();
if (restNumTopics < 0) {
return;
}
errorFuture.complete((e == null) ? ApiError.NONE : ApiError.fromThrowable(e));
if (restNumTopics == 0) {
complete.run();
}
});
});

if (timeoutMs <= 0) {
complete.run();
} else {
List<Object> delayedCreateKeys =
createInfo.keySet().stream().map(TopicKey::new).collect(Collectors.toList());
DelayedCreateTopics delayedCreate = new DelayedCreateTopics(timeoutMs, numTopics, complete);
topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys);
}

return resultFuture;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop;

import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A delayed create topic operation that is stored in the topic purgatory.
*/
class DelayedCreateTopics extends DelayedOperation {

private final AtomicInteger numTopics;
private final Runnable callback;

DelayedCreateTopics(long delayMs, AtomicInteger numTopics, Runnable callback) {
super(delayMs, Optional.empty());
this.numTopics = numTopics;
this.callback = callback;
}

@Override
public void onExpiration() {
callback.run();
}

@Override
public void onComplete() {
callback.run();
}

@Override
public boolean tryComplete() {
if (numTopics.get() <= 0) {
forceComplete();
return true;
} else {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case SASL_AUTHENTICATE:
handleSaslAuthenticate(kafkaHeaderAndRequest, responseFuture);
break;
case CREATE_TOPICS:
handleCreateTopics(kafkaHeaderAndRequest, responseFuture);
break;
default:
handleError(kafkaHeaderAndRequest, responseFuture);
}
Expand Down Expand Up @@ -360,6 +363,9 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) {
protected abstract void
handleSaslHandshake(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleCreateTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

static class KafkaHeaderAndRequest implements Closeable {

private static final String DEFAULT_CLIENT_HOST = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils.recordsToByteBuf;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
import static org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -51,6 +52,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -80,7 +82,10 @@
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
Expand Down Expand Up @@ -149,6 +154,8 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
private final ScheduledExecutorService executor;
private final PulsarAdmin admin;
private final SaslAuthenticator authenticator;
private final AdminManager adminManager;

private final Boolean tlsEnabled;
private final String localListeners;
private final int plaintextPort;
Expand All @@ -172,6 +179,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.authenticator = authenticationEnabled
? new SaslAuthenticator(pulsarService, kafkaConfig.getSaslAllowedMechanisms())
: null;
this.adminManager = new AdminManager(admin);
this.tlsEnabled = tlsEnabled;
this.localListeners = KafkaProtocolHandler.getListenersFromConfig(kafkaConfig);
this.plaintextPort = getListenerPort(localListeners, PLAINTEXT);
Expand Down Expand Up @@ -1246,6 +1254,38 @@ protected void handleSaslHandshake(KafkaHeaderAndRequest saslHandshake,
resultFuture.complete(new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, Collections.emptySet()));
}

@Override
protected void handleCreateTopics(KafkaHeaderAndRequest createTopics,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(createTopics.getRequest() instanceof CreateTopicsRequest);
CreateTopicsRequest request = (CreateTopicsRequest) createTopics.getRequest();

final Map<String, ApiError> result = new HashMap<>();
final Map<String, TopicDetails> validTopics = new HashMap<>();
final Set<String> duplicateTopics = request.duplicateTopics();

request.topics().forEach((topic, details) -> {
if (!duplicateTopics.contains(topic)) {
validTopics.put(topic, details);
} else {
final String errorMessage = "Create topics request from client `" + createTopics.getHeader().clientId()
+ "` contains multiple entries for the following topics: " + duplicateTopics;
result.put(topic, new ApiError(Errors.INVALID_REQUEST, errorMessage));
}
});

if (validTopics.isEmpty()) {
resultFuture.complete(new CreateTopicsResponse(result));
} else {
// TODO: handle request.validateOnly()
adminManager.createTopicsAsync(validTopics, request.timeout()).thenApply(validResult -> {
result.putAll(validResult);
resultFuture.complete(new CreateTopicsResponse(result));
return null;
});
}
}

private SaslHandshakeResponse checkSaslMechanism(String mechanism) {
if (getKafkaConfig().getSaslAllowedMechanisms().contains(mechanism)) {
return new SaslHandshakeResponse(Errors.NONE, getKafkaConfig().getSaslAllowedMechanisms());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -43,6 +43,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -53,6 +54,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiVersionsRequest;
Expand All @@ -63,6 +65,7 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
Expand Down Expand Up @@ -119,6 +122,10 @@ protected void setup() throws Exception {
new RetentionPolicies(-1, -1));
}

admin.tenants().createTenant("my-tenant",
new TenantInfo(Sets.newHashSet(), Sets.newHashSet(super.configClusterName)));
admin.namespaces().createNamespace("my-tenant/my-ns");

log.info("created namespaces, init handler");

ProtocolHandler handler1 = pulsar.getProtocolHandlers().protocol("kafka");
Expand Down Expand Up @@ -299,23 +306,59 @@ public void testGetKafkaTopicNameFromPulsarTopicName() {
assertEquals(localName, getKafkaTopicNameFromPulsarTopicname(topicNamePartition));
}

void createTopicsByKafkaAdmin(AdminClient admin, Map<String, Integer> topicToNumPartitions)
throws ExecutionException, InterruptedException {
final short replicationFactor = 1; // replication factor will be ignored
admin.createTopics(topicToNumPartitions.entrySet().stream().map(entry -> {
final String topic = entry.getKey();
final int numPartitions = entry.getValue();
return new NewTopic(topic, numPartitions, replicationFactor);
}).collect(Collectors.toList())).all().get();
}

void verifyTopicsByPulsarAdmin(Map<String, Integer> topicToNumPartitions)
throws PulsarAdminException {
for (Map.Entry<String, Integer> entry : topicToNumPartitions.entrySet()) {
final String topic = entry.getKey();
final int numPartitions = entry.getValue();
assertEquals(this.admin.topics().getPartitionedTopicMetadata(topic).partitions, numPartitions);
}
}

@Test(timeOut = 10000)
public void testCreateTopics() throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort());

@Cleanup
AdminClient kafkaAdmin = AdminClient.create(props);
Map<String, Integer> topicToNumPartitions = new HashMap<String, Integer>(){{
put("testCreateTopics-0", 1);
put("testCreateTopics-1", 3);
put("my-tenant/my-ns/testCreateTopics-2", 1);
put("persistent://my-tenant/my-ns/testCreateTopics-3", 5);
}};
createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions);
verifyTopicsByPulsarAdmin(topicToNumPartitions);
}

@Test(timeOut = 10000)
public void testCreateTopics() throws InterruptedException {
public void testCreateInvalidTopics() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort());
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);

@Cleanup
AdminClient kafkaAdmin = AdminClient.create(props);
final String topic = "testCreateTopic-0";
final int numPartitions = 1;
final short replicationFactor = 1;
Map<String, Integer> topicToNumPartitions = new HashMap<String, Integer>(){{
put("xxx/testCreateInvalidTopics-0", 1);
}};
try {
kafkaAdmin.createTopics(Collections.singleton(new NewTopic(topic, numPartitions, replicationFactor))).all()
.get();
} catch (ExecutionException e) {
// TODO: it should fail after CreateTopics was supported, see https://github.com/streamnative/kop/issues/241
log.info("Failed to create topic '{}': {}", topic, e);
assertTrue(e.getMessage().contains("Not supported by kop server."));
createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions);
fail("create a invalid topic should fail");
} catch (Exception e) {
log.info("Failed to create topics: {}", topicToNumPartitions);
assertTrue(e.getCause() instanceof TimeoutException);
}
}

Expand Down