diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 9c408cd0e4..3977463507 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -195,6 +195,7 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final Map pendingProduceQueueMap = new ConcurrentHashMap<>(); private final StatsLogger statsLogger; private final RequestStats requestStats; + private final Set groupIds = new HashSet<>(); public KafkaRequestHandler(PulsarService pulsarService, KafkaServiceConfiguration kafkaConfig, @@ -254,6 +255,7 @@ protected void close() { if (isActive.getAndSet(false)) { log.info("close channel {}", ctx.channel()); writeAndFlushWhenInactiveChannel(ctx.channel()); + groupCoordinator.getOffsetAcker().close(groupIds); ctx.close(); topicManager.close(); String clientHost = ctx.channel().remoteAddress().toString(); @@ -1179,6 +1181,7 @@ protected void handleSyncGroupRequest(KafkaHeaderAndRequest syncGroup, checkArgument(syncGroup.getRequest() instanceof SyncGroupRequest); SyncGroupRequest request = (SyncGroupRequest) syncGroup.getRequest(); + groupIds.add(request.groupId()); groupCoordinator.handleSyncGroup( request.groupId(), request.generationId(), diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java index b3ea3a37e9..d898de0f3c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java @@ -124,26 +124,22 @@ public void ackOffsets(String groupId, Map of } public void close(Set groupIds) { - groupIds.forEach(groupId -> { - // consumers cache is empty if the broker restart. - if (!consumers.containsKey(groupId)) { - return; - } - consumers.get(groupId).values().forEach(consumerFuture -> { - consumerFuture.whenComplete((consumer, throwable) -> { - if (throwable != null) { - log.warn("Error when get consumer for consumer group close:", throwable); - return; - } - try { - consumer.close(); - } catch (Exception e) { - log.warn("Error when close consumer topic: {}, sub: {}.", - consumer.getTopic(), consumer.getSubscription(), e); + for (String groupId : groupIds) { + consumers.remove(groupId).forEach((topicPartition, consumerFuture) -> { + if (!consumerFuture.isDone()) { + log.warn("Consumer of [group={}] [topic={}] is not done while being closed", + groupId, topicPartition); + consumerFuture.complete(null); + } + final Consumer consumer = consumerFuture.getNow(null); + if (consumer != null) { + if (log.isDebugEnabled()) { + log.debug("Try to close consumer of [group={}] [topic={}]", groupId, topicPartition.toString()); } - }); + consumer.closeAsync(); + } }); - }); + } } @Override diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java index 743468b455..1040305c3f 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndKafkaTest.java @@ -14,12 +14,15 @@ package io.streamnative.pulsar.handlers.kop; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import java.util.Arrays; +import java.util.Collections; import java.util.List; import lombok.Cleanup; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.testng.annotations.Test; /** @@ -47,4 +50,41 @@ public void testNullValueMessages() throws Exception { List kafkaReceives = receiveMessages(kafkaConsumer, expectedMessages.size()); assertEquals(kafkaReceives, expectedMessages); } + + @Test(timeOut = 20000) + public void testDeleteClosedTopics() throws Exception { + final String topic = "test-delete-closed-topics"; + final List expectedMessages = Collections.singletonList("msg"); + + final KafkaProducer kafkaProducer = newKafkaProducer(); + sendSingleMessages(kafkaProducer, topic, expectedMessages); + + try { + admin.topics().deletePartitionedTopic(topic); + } catch (PulsarAdminException e) { + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); + } + + final KafkaConsumer kafkaConsumer1 = newKafkaConsumer(topic, "sub-1"); + assertEquals(receiveMessages(kafkaConsumer1, expectedMessages.size()), expectedMessages); + try { + admin.topics().deletePartitionedTopic(topic); + } catch (PulsarAdminException e) { + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); + } + + final KafkaConsumer kafkaConsumer2 = newKafkaConsumer(topic, "sub-2"); + assertEquals(receiveMessages(kafkaConsumer2, expectedMessages.size()), expectedMessages); + + kafkaProducer.close(); + kafkaConsumer1.close(); + try { + admin.topics().deletePartitionedTopic(topic); + } catch (PulsarAdminException e) { + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions")); + } + + kafkaConsumer2.close(); + admin.topics().deletePartitionedTopic(topic); + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndTestBase.java index 6639bd3a9d..571919fbb7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/BasicEndToEndTestBase.java @@ -49,6 +49,8 @@ @Slf4j public class BasicEndToEndTestBase extends KopProtocolHandlerTestBase { + protected static final String GROUP_ID = "my-group"; + public BasicEndToEndTestBase(final String entryFormat) { super(entryFormat); } @@ -78,10 +80,14 @@ protected KafkaProducer newKafkaProducer() { } protected KafkaConsumer newKafkaConsumer(final String topic) { + return newKafkaConsumer(topic, null); + } + + protected KafkaConsumer newKafkaConsumer(final String topic, final String group) { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, (group == null) ? GROUP_ID : group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaIntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaIntegrationTest.java index 6ac5b4b5c7..f1c48181d1 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaIntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaIntegrationTest.java @@ -49,7 +49,6 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; -import org.testng.annotations.Factory; import org.testng.annotations.Test; /** @@ -77,15 +76,7 @@ public class KafkaIntegrationTest extends KopProtocolHandlerTestBase { public KafkaIntegrationTest(final String entryFormat) { - super(entryFormat); - } - - @Factory - public static Object[] instances() { - return new Object[] { - new KafkaIntegrationTest("pulsar"), - new KafkaIntegrationTest("kafka") - }; + super("kafka"); } @DataProvider diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderKafkaTest.java new file mode 100644 index 0000000000..e3fb37fea4 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderKafkaTest.java @@ -0,0 +1,24 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +/** + * Unit test for Different kafka produce messages with `entryFormat=kafka`. + */ +public class KafkaMessageOrderKafkaTest extends KafkaMessageOrderTestBase { + + public KafkaMessageOrderKafkaTest() { + super("kafka"); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderPulsarTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderPulsarTest.java new file mode 100644 index 0000000000..6ed3ee2539 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderPulsarTest.java @@ -0,0 +1,24 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +/** + * Unit test for Different kafka produce messages with `entryFormat=pulsar`. + */ +public class KafkaMessageOrderPulsarTest extends KafkaMessageOrderTestBase { + + public KafkaMessageOrderPulsarTest() { + super("pulsar"); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTestBase.java similarity index 95% rename from tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTest.java rename to tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTestBase.java index f8eaaeb7ab..0005fbf72e 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTestBase.java @@ -46,27 +46,18 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; -import org.testng.annotations.Factory; import org.testng.annotations.Test; /** * Unit test for Different kafka produce messages. */ @Slf4j -public class KafkaMessageOrderTest extends KopProtocolHandlerTestBase { +public abstract class KafkaMessageOrderTestBase extends KopProtocolHandlerTestBase { - public KafkaMessageOrderTest(final String entryFormat) { + public KafkaMessageOrderTestBase(final String entryFormat) { super(entryFormat); } - @Factory - public static Object[] instances() { - return new Object[] { - new KafkaMessageOrderTest("pulsar"), - new KafkaMessageOrderTest("kafka") - }; - } - @DataProvider(name = "batchSizeList") public static Object[][] batchSizeList() { // For the messageStrPrefix in testKafkaProduceMessageOrder(), 100 messages will be split to 50, 34, 25, 20