From f67ee8663a35f3f9622b512fab3b6683631adea0 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 26 Jan 2021 21:13:50 +0800 Subject: [PATCH 1/3] Fix offset in send callback may be wrong --- .../handlers/kop/MessagePublishContext.java | 22 +++++++++--- .../handlers/kop/KafkaRequestHandlerTest.java | 35 +++++++++++++++---- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java index 15b0d953ad..0dfaaf6fb4 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.kop; +import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; @@ -22,6 +23,8 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.Topic.PublishContext; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; +import org.apache.pulsar.common.protocol.Commands; /** * Implementation for PublishContext. @@ -34,6 +37,7 @@ public final class MessagePublishContext implements PublishContext { private long startTimeNs; private long numberOfMessages; private ManagedLedger managedLedger; + private long offset = -1L; /** * Executed from managed ledger thread when the message is persisted. @@ -54,16 +58,26 @@ public void completed(Exception exception, long ledgerId, long entryId) { topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.MICROSECONDS); - // In asynchronously send mode, there's a possibility that some messages' offsets are larger than the actual - // offsets. For example, two messages are sent concurrently, if the 1st message is completed while the 2nd - // message is already persisted, `MessageIdUtils.getCurrentOffset` will return the 2nd message's offset. - final long baseOffset = MessageIdUtils.getCurrentOffset(managedLedger) - (numberOfMessages - 1); + if (offset < 0) { + log.error("offset is {} (< 0) but no exception was thrown, use the offset from managed ledger", offset); + offset = MessageIdUtils.getCurrentOffset(managedLedger); + } + + final long baseOffset = offset - (numberOfMessages - 1); offsetFuture.complete(baseOffset); } recycle(); } + @Override + public void setMetadataFromEntryData(ByteBuf entryData) { + final BrokerEntryMetadata metadata = Commands.peekBrokerEntryMetadataIfExist(entryData); + if (metadata != null && metadata.hasIndex()) { + offset = metadata.getIndex(); + } + } + // recycler public static MessagePublishContext get(CompletableFuture offsetFuture, Topic topic, diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index d689a52d1c..ab7d71910c 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -18,6 +18,7 @@ import static io.streamnative.pulsar.handlers.kop.utils.TopicNameUtils.getPartitionedTopicNameWithoutPartitions; import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -44,7 +45,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -56,7 +57,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; @@ -76,6 +76,11 @@ import org.apache.kafka.common.utils.Time; import org.apache.pulsar.broker.protocol.ProtocolHandler; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; @@ -448,6 +453,7 @@ public void testProduceCallback() throws Exception { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, "100"); // avoid all messages being in the same batch @Cleanup KafkaProducer producer = new KafkaProducer<>(props); @@ -456,7 +462,7 @@ public void testProduceCallback() throws Exception { final List offsets = new ArrayList<>(); for (int i = 0; i < numMessages; i++) { final int index = i; - Future future = producer.send(new ProducerRecord<>(topic, i, messagePrefix + i), + producer.send(new ProducerRecord<>(topic, i, messagePrefix + i), (recordMetadata, e) -> { if (e != null) { log.error("Failed to send {}: {}", index, e); @@ -466,15 +472,30 @@ public void testProduceCallback() throws Exception { } latch.countDown(); }); - // The first half messages are sent in batch, the second half messages are sent synchronously. - if (i >= numMessages / 2) { - future.get(); - } } latch.await(); final List expectedOffsets = LongStream.range(0, numMessages).boxed().collect(Collectors.toList()); log.info("Actual offsets: {}", offsets); assertEquals(offsets, expectedOffsets); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("testProducerCallback") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + int numBatches = 0; + for (int i = 0; i < numMessages; i++) { + Message msg = consumer.receive(1, TimeUnit.SECONDS); + assertNotNull(msg); + consumer.acknowledge(msg); + BatchMessageIdImpl id = (BatchMessageIdImpl) ((TopicMessageIdImpl) msg.getMessageId()).getInnerMessageId(); + if (id.getBatchIndex() == 0) { + numBatches++; + } + } + log.info("Receive {} batches", numBatches); + assertTrue(numBatches > 1 && numBatches < numMessages); } @Test(timeOut = 10000) From da7889f0ff02e738633327896147b19218cd62bc Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 27 Jan 2021 11:12:46 +0800 Subject: [PATCH 2/3] Fix NPE caused by null entry data --- .../pulsar/handlers/kop/MessagePublishContext.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java index 0dfaaf6fb4..fda7e6014a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java @@ -59,7 +59,9 @@ public void completed(Exception exception, long ledgerId, long entryId) { topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.MICROSECONDS); if (offset < 0) { - log.error("offset is {} (< 0) but no exception was thrown, use the offset from managed ledger", offset); + if (offset != -1L) { + log.error("offset was changed to {} (< 0) but no exception was thrown", offset); + } offset = MessageIdUtils.getCurrentOffset(managedLedger); } @@ -72,6 +74,10 @@ public void completed(Exception exception, long ledgerId, long entryId) { @Override public void setMetadataFromEntryData(ByteBuf entryData) { + if (entryData == null) { + // When the managed ledger was closed, the `entryData` is null. + return; + } final BrokerEntryMetadata metadata = Commands.peekBrokerEntryMetadataIfExist(entryData); if (metadata != null && metadata.hasIndex()) { offset = metadata.getIndex(); From 35d46a8fcfe9a2b0b51d38a615e9ea34abda6ea9 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 28 Jan 2021 09:48:49 +0800 Subject: [PATCH 3/3] Add logs for case that BrokerEntryData is invalid --- .../streamnative/pulsar/handlers/kop/MessagePublishContext.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java index fda7e6014a..afbf60008e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessagePublishContext.java @@ -61,6 +61,8 @@ public void completed(Exception exception, long ledgerId, long entryId) { if (offset < 0) { if (offset != -1L) { log.error("offset was changed to {} (< 0) but no exception was thrown", offset); + } else { + log.error("There's no valid BrokerEntryData in entry ({}, {})", ledgerId, entryId); } offset = MessageIdUtils.getCurrentOffset(managedLedger); }