From 04243c591a65674ffcc2cf251c4ca78de179d66a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 24 Dec 2020 17:23:57 +0800 Subject: [PATCH 1/8] Add KafkaEntryFormatter --- kafka-impl/conf/kop.conf | 10 ++- kafka-impl/conf/kop_standalone.conf | 8 +++ .../kop/KafkaServiceConfiguration.java | 2 +- .../kop/format/EntryFormatterFactory.java | 5 +- .../kop/format/KafkaEntryFormatter.java | 70 +++++++++++++++++++ .../kop/format/KafkaEntryFormatterHeader.java | 55 +++++++++++++++ 6 files changed, 147 insertions(+), 3 deletions(-) create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatterHeader.java diff --git a/kafka-impl/conf/kop.conf b/kafka-impl/conf/kop.conf index a5bb1c4d04..92291a99fa 100755 --- a/kafka-impl/conf/kop.conf +++ b/kafka-impl/conf/kop.conf @@ -72,13 +72,21 @@ offsetsTopicNumPartitions=8 maxReadEntriesNum=5 # The format of an entry. The default value is pulsar. -# Optional values: [pulsar] +# Optional values: [pulsar, kafka] # # pulsar: # When KoP receives messages from kafka producer, it will serialize these messages to # the format so that pulsar consumer can read directly. # When KoP sends entries to kafka consumer, it will treat each entry as pulsar's # format and deserialize each entry to kafka's format. +# +# kafka: +# When KoP receives messages from kafka producer, add a header which is PulsarApi.Metadata +# before the messages' bytes, and then write to BK directly. +# When KoP sends entries to kafka consumer, it will treat each entry as kafka's format and +# just discard the pulsar header and send left bytes to Kafka consumer. +# This mode means that current pulsar clients cannot interact with kafka clients, but +# kafka producer works well with kafka consumer. entry.format=pulsar ### --- KoP SSL configs--- ### diff --git a/kafka-impl/conf/kop_standalone.conf b/kafka-impl/conf/kop_standalone.conf index 21e7cfb9d9..5a47bf0e62 100644 --- a/kafka-impl/conf/kop_standalone.conf +++ b/kafka-impl/conf/kop_standalone.conf @@ -79,6 +79,14 @@ maxReadEntriesNum=1 # the format so that pulsar consumer can read directly. # When KoP sends entries to kafka consumer, it will treat each entry as pulsar's # format and deserialize each entry to kafka's format. +# +# kafka: +# When KoP receives messages from kafka producer, add a header which is PulsarApi.Metadata +# before the messages' bytes, and then write to BK directly. +# When KoP sends entries to kafka consumer, it will treat each entry as kafka's format and +# just discard the pulsar header and send left bytes to Kafka consumer. +# This mode means that current pulsar clients cannot interact with kafka clients, but +# kafka producer works well with kafka consumer. entry.format=pulsar ### --- KoP SSL configs--- ### diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index ec561fbfb0..6131b861df 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -251,7 +251,7 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { @FieldContext( category = CATEGORY_KOP, - doc = "The format of an entry. Default: pulsar. Optional: [pulsar]" + doc = "The format of an entry. Default: pulsar. Optional: [pulsar, kafka]" ) private String entryFormat = "pulsar"; } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java index 4dba4b7999..585d7796e9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java @@ -21,7 +21,8 @@ public class EntryFormatterFactory { enum EntryFormat { - PULSAR + PULSAR, + KAFKA } public static EntryFormatter create(final String format) { @@ -30,6 +31,8 @@ public static EntryFormatter create(final String format) { switch (entryFormat) { case PULSAR: return new PulsarEntryFormatter(); + case KAFKA: + return new KafkaEntryFormatter(); default: throw new Exception("No EntryFormatter for " + entryFormat); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java new file mode 100644 index 0000000000..b71eb21d19 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java @@ -0,0 +1,70 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils; +import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.TimestampType; +import org.apache.pulsar.common.protocol.Commands; + + +/** + * The entry formatter that uses Kafka's format. + */ +public class KafkaEntryFormatter implements EntryFormatter { + private final KafkaEntryFormatterHeader header = new KafkaEntryFormatterHeader(); + + @Override + public ByteBuf encode(MemoryRecords records, int numMessages) { + return Commands.serializeMetadataAndPayload( + Commands.ChecksumType.None, + header.getMessageMetadata(), + Unpooled.wrappedBuffer(records.buffer()) + ); + } + + @Override + public MemoryRecords decode(List entries, byte magic) { + int size = 0; + for (Entry entry : entries) { + size += entry.getLength(); + } + final MemoryRecordsBuilder builder = MemoryRecords.builder( + ByteBuffer.allocate(size), + magic, + CompressionType.NONE, + TimestampType.CREATE_TIME, + MessageIdUtils.getOffset(entries.get(0).getLedgerId(), entries.get(0).getEntryId())); + entries.forEach(entry -> { + final ByteBuf byteBuf = entry.getDataBuffer(); + Commands.skipMessageMetadata(byteBuf); + final MemoryRecords records = MemoryRecords.readableRecords(ByteBufUtils.getNioBuffer(byteBuf)); + long offset = MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()); + for (Record record : records.records()) { + builder.appendWithOffset(offset, record); + offset++; + } + }); + return builder.build(); + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatterHeader.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatterHeader.java new file mode 100644 index 0000000000..aac9442726 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatterHeader.java @@ -0,0 +1,55 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; + +import org.apache.pulsar.common.api.proto.PulsarApi; + + +/** + * The header of KafkaEntryFormatter. + */ +public class KafkaEntryFormatterHeader { + + private static volatile PulsarApi.MessageMetadata messageMetadata = null; + + public PulsarApi.MessageMetadata getMessageMetadata() { + if (messageMetadata == null) { + synchronized (KafkaEntryFormatterHeader.class) { + if (messageMetadata == null) { + messageMetadata = createMessageMetadata(); + } + } + } + return messageMetadata; + } + + private static PulsarApi.MessageMetadata createMessageMetadata() { + final PulsarApi.MessageMetadata.Builder builder = PulsarApi.MessageMetadata.newBuilder(); + + // TODO: Pulsar broker may add a field that represents entry.format to MessageMetadata in future. After that we + // should set that field instead of adding a key-value property. + builder.addProperties(PulsarApi.KeyValue.newBuilder() + .setKey("entry.format") + .setValue(EntryFormatterFactory.EntryFormat.KAFKA.name().toLowerCase()) + .build()); + + // Following fields are meaningless because the metadata is already contained in MemoryRecords. Here we set + // them just because they're required fields. + builder.setProducerName(""); + builder.setSequenceId(0L); + builder.setPublishTime(0L); + + return builder.build(); + } +} From c05d326e6b1e6349c2db5bceef011bbbd215e229 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 24 Dec 2020 17:32:06 +0800 Subject: [PATCH 2/8] Apply KafkaEntryFormatter to DifferentNamespaceTest --- .../handlers/kop/DifferentNamespaceTest.java | 15 +++++++++++++++ .../handlers/kop/KopProtocolHandlerTestBase.java | 9 +++++++++ 2 files changed, 24 insertions(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTest.java index 8642f2ea00..52d56dd50e 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/DifferentNamespaceTest.java @@ -40,8 +40,10 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; import org.testng.annotations.Test; /** @@ -55,6 +57,17 @@ public class DifferentNamespaceTest extends KopProtocolHandlerTestBase { private static final String ANOTHER_TENANT = "my-tenant"; private static final String ANOTHER_NAMESPACE = "my-ns"; + public DifferentNamespaceTest(final String entryFormat) { + super(entryFormat); + } + + @Factory + public static Object[] instances() { + return new Object[] { + new DifferentNamespaceTest("pulsar"), + new DifferentNamespaceTest("kafka") + }; + } @DataProvider(name = "topics") public static Object[][] topics() { @@ -77,6 +90,8 @@ protected void setup() throws Exception { admin.namespaces().createNamespace(ANOTHER_TENANT + "/" + ANOTHER_NAMESPACE); } + @AfterClass + @Override protected void cleanup() throws Exception { super.internalCleanup(); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java index e62db21437..2aad5faf1d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java @@ -121,7 +121,15 @@ public abstract class KopProtocolHandlerTestBase { protected Server restServer; protected String restConnect; + private final String entryFormat; + public KopProtocolHandlerTestBase() { + this.entryFormat = "pulsar"; + resetConfig(); + } + + public KopProtocolHandlerTestBase(final String entryFormat) { + this.entryFormat = entryFormat; resetConfig(); } @@ -154,6 +162,7 @@ protected void resetConfig() { kafkaConfig.setListeners( PLAINTEXT_PREFIX + "localhost:" + kafkaBrokerPort + "," + SSL_PREFIX + "localhost:" + kafkaBrokerPortTls); + kafkaConfig.setEntryFormat(entryFormat); // set protocol related config URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar"); From 62e5d5af80b19bbed4a27e2e8621c81cee245ae7 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 24 Dec 2020 18:42:27 +0800 Subject: [PATCH 3/8] Apply KafkaEntryFormatter to Kafka Streams tests --- .../handlers/kop/streams/GlobalKTableTest.java | 13 +++++++++++++ .../kop/streams/KafkaStreamsTestBase.java | 15 ++++++++++----- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java index af5869529f..ae28989543 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/GlobalKTableTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.Stores; +import org.testng.annotations.Factory; import org.testng.annotations.Test; @@ -55,6 +56,18 @@ public class GlobalKTableTest extends KafkaStreamsTestBase { private KStream stream; private ForeachAction foreachAction; + public GlobalKTableTest(final String entryFormat) { + super(entryFormat); + } + + @Factory + public static Object[] instances() { + return new Object[] { + new GlobalKTableTest("pulsar"), + new GlobalKTableTest("kafka") + }; + } + @Override protected void createTopics() throws Exception { streamTopic = "stream-" + getTestNo(); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java index d7dda7b723..db9fed1541 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java @@ -16,6 +16,7 @@ import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase; import io.streamnative.pulsar.handlers.kop.utils.timer.MockTime; import java.util.Properties; +import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.NonNull; @@ -23,10 +24,10 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; -import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.BeforeSuite; /** * Base test class for tests related to Kafka Streams. @@ -40,14 +41,18 @@ public abstract class KafkaStreamsTestBase extends KopProtocolHandlerTestBase { protected StreamsBuilder builder; // the builder to build `kafkaStreams` and other objects of Kafka Streams protected KafkaStreams kafkaStreams; - @BeforeSuite + public KafkaStreamsTestBase(final String entryFormat) { + super(entryFormat); + } + + @BeforeClass @Override protected void setup() throws Exception { super.internalSetup(); bootstrapServers = "localhost:" + getKafkaBrokerPort(); } - @AfterSuite + @AfterClass @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -79,7 +84,7 @@ protected void setupTestCase() throws Exception { @AfterMethod protected void cleanupTestCase() throws Exception { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(3, TimeUnit.SECONDS); TestUtils.purgeLocalStreamsState(streamsConfiguration); } } From 1027ab9c79f6759c7b755e1de664f4e7ab7cc171 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 24 Dec 2020 20:07:43 +0800 Subject: [PATCH 4/8] Apply KafkaEntryFormatter to other tests --- .../handlers/kop/KafkaIntegrationTest.java | 13 ++++ .../handlers/kop/KafkaMessageOrderTest.java | 69 ++++++++++------- .../handlers/kop/KafkaSSLChannelTest.java | 13 ++++ .../KafkaSSLChannelWithClientAuthTest.java | 13 ++++ .../kop/MessagePublishBufferThrottleTest.java | 14 ++++ .../pulsar/handlers/kop/MultiLedgerTest.java | 74 ++++++++++++------- .../handlers/kop/PulsarAuthEnabledTest.java | 13 ++++ .../handlers/kop/SchemaRegistryTest.java | 13 ++++ 8 files changed, 167 insertions(+), 55 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaIntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaIntegrationTest.java index 666c4f2ff5..adb37bb508 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaIntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaIntegrationTest.java @@ -51,6 +51,7 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; import org.testng.annotations.Test; /** @@ -77,6 +78,18 @@ @Slf4j public class KafkaIntegrationTest extends KopProtocolHandlerTestBase { + public KafkaIntegrationTest(final String entryFormat) { + super(entryFormat); + } + + @Factory + public static Object[] instances() { + return new Object[] { + new KafkaIntegrationTest("pulsar"), + new KafkaIntegrationTest("kafka") + }; + } + @DataProvider public static Object[][] integrations() { return new Object[][]{ diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTest.java index 9f9d9ffc18..2afe4fcee7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTest.java @@ -49,9 +49,10 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; import org.testng.annotations.Test; /** @@ -60,6 +61,18 @@ @Slf4j public class KafkaMessageOrderTest extends KopProtocolHandlerTestBase { + public KafkaMessageOrderTest(final String entryFormat) { + super(entryFormat); + } + + @Factory + public static Object[] instances() { + return new Object[] { + new KafkaMessageOrderTest("pulsar"), + new KafkaMessageOrderTest("kafka") + }; + } + @DataProvider(name = "batchSizeList") public static Object[][] batchSizeList() { // For the messageStrPrefix in testKafkaProduceMessageOrder(), 100 messages will be split to 50, 34, 25, 20 @@ -67,7 +80,7 @@ public static Object[][] batchSizeList() { return new Object[][] { { 200 }, { 250 }, { 300 }, { 350 } }; } - @BeforeMethod + @BeforeClass @Override protected void setup() throws Exception { super.internalSetup(); @@ -103,7 +116,7 @@ protected void setup() throws Exception { } } - @AfterMethod + @AfterClass @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -111,7 +124,7 @@ protected void cleanup() throws Exception { @Test(timeOut = 20000, dataProvider = "batchSizeList") public void testKafkaProduceMessageOrder(int batchSize) throws Exception { - String topicName = "kopKafkaProducePulsarConsumeMessageOrder"; + String topicName = "kopKafkaProducePulsarConsumeMessageOrder-" + batchSize; String pulsarTopicName = "persistent://public/default/" + topicName; // create partitioned topic with 1 partition. @@ -150,32 +163,34 @@ public void testKafkaProduceMessageOrder(int batchSize) throws Exception { } // 2. Consume messages use Pulsar client Consumer. - Message msg = null; - for (int i = 0; i < totalMsgs; i++) { - msg = consumer.receive(1000, TimeUnit.MILLISECONDS); - assertNotNull(msg); - Integer key = kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())); - assertEquals(messageStrPrefix + key.toString(), new String(msg.getValue())); + if (conf.getEntryFormat().equals("pulsar")) { + Message msg = null; + for (int i = 0; i < totalMsgs; i++) { + msg = consumer.receive(1000, TimeUnit.MILLISECONDS); + assertNotNull(msg); + Integer key = kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())); + assertEquals(messageStrPrefix + key.toString(), new String(msg.getValue())); + + if (log.isDebugEnabled()) { + log.debug("Pulsar consumer get i: {} message: {}, key: {}", + i, + new String(msg.getData()), + kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())).toString()); + } + assertEquals(i, key.intValue()); - if (log.isDebugEnabled()) { - log.debug("Pulsar consumer get i: {} message: {}, key: {}", - i, - new String(msg.getData()), - kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())).toString()); + consumer.acknowledge(msg); } - assertEquals(i, key.intValue()); - consumer.acknowledge(msg); - } + // verify have received all messages + msg = consumer.receive(100, TimeUnit.MILLISECONDS); + assertNull(msg); - // verify have received all messages - msg = consumer.receive(100, TimeUnit.MILLISECONDS); - assertNull(msg); - - final AtomicInteger numEntries = new AtomicInteger(0); - ledgerToEntrySet.forEach((ledgerId, entrySet) -> numEntries.set(numEntries.get() + entrySet.size())); - log.info("Successfully write {} entries of {} messages to bookie", numEntries.get(), totalMsgs); - assertTrue(numEntries.get() > 1 && numEntries.get() < totalMsgs); + final AtomicInteger numEntries = new AtomicInteger(0); + ledgerToEntrySet.forEach((ledgerId, entrySet) -> numEntries.set(numEntries.get() + entrySet.size())); + log.info("Successfully write {} entries of {} messages to bookie", numEntries.get(), totalMsgs); + assertTrue(numEntries.get() > 1 && numEntries.get() < totalMsgs); + } // 3. Consume messages use Kafka consumer. @Cleanup diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaSSLChannelTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaSSLChannelTest.java index c7f51077b6..cc21436c05 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaSSLChannelTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaSSLChannelTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Factory; import org.testng.annotations.Test; /** @@ -59,6 +60,18 @@ public boolean verify(String hostname, javax.net.ssl.SSLSession sslSession) { javax.net.ssl.HttpsURLConnection.setDefaultHostnameVerifier(localhostAcceptedHostnameVerifier); } + public KafkaSSLChannelTest(final String entryFormat) { + super(entryFormat); + } + + @Factory + public static Object[] instances() { + return new Object[] { + new KafkaSSLChannelTest("pulsar"), + new KafkaSSLChannelTest("kafka") + }; + } + protected void sslSetUpForBroker() throws Exception { ((KafkaServiceConfiguration) conf).setKopSslKeystoreType("JKS"); ((KafkaServiceConfiguration) conf).setKopSslKeystoreLocation(kopSslKeystoreLocation); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaSSLChannelWithClientAuthTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaSSLChannelWithClientAuthTest.java index 26a3ce7005..b4a372ac46 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaSSLChannelWithClientAuthTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaSSLChannelWithClientAuthTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Factory; import org.testng.annotations.Test; /** @@ -60,6 +61,18 @@ public boolean verify(String hostname, javax.net.ssl.SSLSession sslSession) { javax.net.ssl.HttpsURLConnection.setDefaultHostnameVerifier(localhostAcceptedHostnameVerifier); } + public KafkaSSLChannelWithClientAuthTest(final String entryFormat) { + super(entryFormat); + } + + @Factory + public static Object[] instances() { + return new Object[] { + new KafkaSSLChannelWithClientAuthTest("pulsar"), + new KafkaSSLChannelWithClientAuthTest("kafka") + }; + } + protected void sslSetUpForBroker() throws Exception { ((KafkaServiceConfiguration) conf).setKopSslClientAuth("required"); ((KafkaServiceConfiguration) conf).setKopSslKeystoreType("JKS"); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MessagePublishBufferThrottleTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MessagePublishBufferThrottleTest.java index be3f7535a8..e6dbad7eda 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MessagePublishBufferThrottleTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MessagePublishBufferThrottleTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.pulsar.broker.service.Topic; import org.testng.Assert; +import org.testng.annotations.Factory; import org.testng.annotations.Test; /** @@ -29,6 +30,19 @@ * */ public class MessagePublishBufferThrottleTest extends KopProtocolHandlerTestBase{ + + public MessagePublishBufferThrottleTest(final String entryFormat) { + super(entryFormat); + } + + @Factory + public static Object[] instances() { + return new Object[] { + new MessagePublishBufferThrottleTest("pulsar"), + new MessagePublishBufferThrottleTest("kafka") + }; + } + @Test public void testMessagePublishBufferThrottleDisabled() throws Exception { conf.setMaxMessagePublishBufferSizeInMB(-1); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MultiLedgerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MultiLedgerTest.java index bfe1370532..4ee785fd7a 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MultiLedgerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/MultiLedgerTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.policies.data.ClusterData; @@ -41,6 +42,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Factory; import org.testng.annotations.Test; /** @@ -49,6 +51,18 @@ @Slf4j public class MultiLedgerTest extends KopProtocolHandlerTestBase { + public MultiLedgerTest(final String entryFormat) { + super(entryFormat); + } + + @Factory + public static Object[] instances() { + return new Object[] { + new MultiLedgerTest("pulsar"), + new MultiLedgerTest("kafka") + }; + } + @Override protected void resetConfig() { super.resetConfig(); @@ -107,12 +121,6 @@ public void testProduceConsumeMultiLedger() throws Exception { // create partitioned topic with 1 partition. admin.topics().createPartitionedTopic(kafkaTopicName, 1); - @Cleanup - Consumer consumer = pulsarClient.newConsumer() - .topic(pulsarTopicName) - .subscriptionName("test_produce_consume_multi_ledger_sub") - .subscribe(); - // 1. produce message with Kafka producer. @Cleanup KProducer kProducer = new KProducer(kafkaTopicName, false, getKafkaBrokerPort()); @@ -137,33 +145,43 @@ public void testProduceConsumeMultiLedger() throws Exception { // 2. Consume messages use Pulsar client Consumer. verify content and key is in order // also verify messages are in different ledgers. - Message msg = null; - for (int i = 0; i < totalMsgs; i++) { - msg = consumer.receive(100, TimeUnit.MILLISECONDS); - assertNotNull(msg); - Integer key = kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())); - assertEquals(messageStrPrefix + key.toString(), new String(msg.getValue())); + if (conf.getEntryFormat().equals("pulsar")) { + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .topic(pulsarTopicName) + .subscriptionName("test_produce_consume_multi_ledger_sub") + .subscribe(); + + Message msg = null; + for (int i = 0; i < totalMsgs; i++) { + msg = consumer.receive(100, TimeUnit.MILLISECONDS); + assertNotNull(msg); + Integer key = kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())); + assertEquals(messageStrPrefix + key.toString(), new String(msg.getValue())); + + MessageIdImpl messageId = + (MessageIdImpl) (((TopicMessageIdImpl) msg.getMessageId()).getInnerMessageId()); + if (log.isDebugEnabled()) { + log.info("Pulsar consumer get i: {} , messageId: {}, message: {}, key: {}", + i, + ((TopicMessageIdImpl) msg.getMessageId()).getInnerMessageId(), + new String(msg.getData()), + kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())).toString()); + } + assertEquals(i, key.intValue()); - MessageIdImpl messageId = (MessageIdImpl) (((TopicMessageIdImpl) msg.getMessageId()).getInnerMessageId()); - if (log.isDebugEnabled()) { - log.info("Pulsar consumer get i: {} , messageId: {}, message: {}, key: {}", - i, - ((TopicMessageIdImpl) msg.getMessageId()).getInnerMessageId(), - new String(msg.getData()), - kafkaIntDeserialize(Base64.getDecoder().decode(msg.getKey())).toString()); - } - assertEquals(i, key.intValue()); + // each ledger should only have 5 entry. + assertTrue(messageId.getEntryId() / 5 == 0); - // each ledger should only have 5 entry. - assertTrue(messageId.getEntryId() / 5 == 0); + consumer.acknowledge(msg); + } - consumer.acknowledge(msg); + // verify have received all messages + msg = consumer.receive(100, TimeUnit.MILLISECONDS); + assertNull(msg); } - // verify have received all messages - msg = consumer.receive(100, TimeUnit.MILLISECONDS); - assertNull(msg); - // 3. use kafka consumer to consume. messages in order. @Cleanup KConsumer kConsumer = new KConsumer(kafkaTopicName, getKafkaBrokerPort()); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/PulsarAuthEnabledTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/PulsarAuthEnabledTest.java index ca865895bf..aac84277f5 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/PulsarAuthEnabledTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/PulsarAuthEnabledTest.java @@ -40,6 +40,7 @@ import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; import org.testng.annotations.Test; /** @@ -57,6 +58,18 @@ public class PulsarAuthEnabledTest extends KopProtocolHandlerTestBase { private static final String TOPIC = "persistent://" + TENANT + "/" + NAMESPACE + "/" + SHORT_TOPIC; private String adminToken; + public PulsarAuthEnabledTest(final String entryFormat) { + super(entryFormat); + } + + @Factory + public static Object[] instances() { + return new Object[] { + new PulsarAuthEnabledTest("pulsar"), + new PulsarAuthEnabledTest("kafka") + }; + } + @BeforeClass @Override protected void setup() throws Exception { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryTest.java index 1bad581927..0c99a9cef8 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SchemaRegistryTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Factory; import org.testng.annotations.Test; /** @@ -47,6 +48,18 @@ public class SchemaRegistryTest extends KopProtocolHandlerTestBase { private String bootstrapServers; + public SchemaRegistryTest(final String entryFormat) { + super(entryFormat); + } + + @Factory + public static Object[] instances() { + return new Object[] { + new SchemaRegistryTest("pulsar"), + new SchemaRegistryTest("kafka") + }; + } + @BeforeMethod @Override protected void setup() throws Exception { From eafd80b6115d24c463dbb64272a84088db723307 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 24 Dec 2020 22:05:28 +0800 Subject: [PATCH 5/8] Apply KafkaEntryFormatter to SaslPlainTest --- .../handlers/kop/SaslPlainKafkaTest.java | 24 +++++++++++++++++++ .../handlers/kop/SaslPlainPulsarTest.java | 24 +++++++++++++++++++ ...lPlainTest.java => SaslPlainTestBase.java} | 8 +++++-- 3 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainKafkaTest.java create mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainPulsarTest.java rename tests/src/test/java/io/streamnative/pulsar/handlers/kop/{SaslPlainTest.java => SaslPlainTestBase.java} (97%) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainKafkaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainKafkaTest.java new file mode 100644 index 0000000000..4389f160a3 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainKafkaTest.java @@ -0,0 +1,24 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +/** + * Testing the SASL-PLAIN features on KoP with `entry.format=kafka`. + */ +public class SaslPlainKafkaTest extends SaslPlainTestBase { + + public SaslPlainKafkaTest() { + super("kafka"); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainPulsarTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainPulsarTest.java new file mode 100644 index 0000000000..15ab454916 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainPulsarTest.java @@ -0,0 +1,24 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +/** + * Testing the SASL-PLAIN features on KoP with `entry.format=pulsar`. + */ +public class SaslPlainPulsarTest extends SaslPlainTestBase { + + public SaslPlainPulsarTest() { + super("pulsar"); + } +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTestBase.java similarity index 97% rename from tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTest.java rename to tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTestBase.java index 9d73bcfb00..2c818ced9d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslPlainTestBase.java @@ -53,7 +53,7 @@ */ @Test @Slf4j -public class SaslPlainTest extends KopProtocolHandlerTestBase { +public abstract class SaslPlainTestBase extends KopProtocolHandlerTestBase { private static final String SIMPLE_USER = "muggle_user"; private static final String TENANT = "SaslPlainTest"; @@ -65,6 +65,10 @@ public class SaslPlainTest extends KopProtocolHandlerTestBase { private String userToken; private String anotherToken; + public SaslPlainTestBase(final String entryFormat) { + super(entryFormat); + } + @BeforeClass @Override protected void setup() throws Exception { @@ -201,7 +205,7 @@ void badNamespaceProvided() throws Exception { @Test(timeOut = 20000) void clientWithoutAuth() throws Exception { - final int metadataTimeoutMs = 8000; + final int metadataTimeoutMs = 3000; Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); From 43adb1b1df5cc8d54acf62b099e3bb8699760d5c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 25 Dec 2020 13:53:02 +0800 Subject: [PATCH 6/8] Add performance test for different EntryFormatter --- .../kop/format/EncodePerformanceTest.java | 109 ++++++++++++++++++ .../format/NoHeaderKafkaEntryFormatter.java | 38 ++++++ kafka-impl/src/test/resources/log4j2.xml | 1 + 3 files changed, 148 insertions(+) create mode 100644 kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EncodePerformanceTest.java create mode 100644 kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/NoHeaderKafkaEntryFormatter.java diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EncodePerformanceTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EncodePerformanceTest.java new file mode 100644 index 0000000000..1c73aef36e --- /dev/null +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/EncodePerformanceTest.java @@ -0,0 +1,109 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; + +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; + + +/** + * The performance test for {@link EntryFormatter#encode(MemoryRecords, int)}. + */ +public class EncodePerformanceTest { + + private static final int NUM_MESSAGES = 2048; + private static final int MESSAGE_SIZE = 1024; + + public static void main(String[] args) { + // The first time to run PulsarEntryFormatter a warn log will be printed that could take a lot of time. + runSingleTest(prepareFixedRecords(), "fixed records", 1); + + runSingleTest(prepareFixedRecords(), "fixed records", 100); + runSingleTest(prepareRandomRecords(), "random records", 100); + + runSingleTest(prepareFixedRecords(), "fixed records", 1000); + runSingleTest(prepareRandomRecords(), "random records", 1000); + } + + private static void runSingleTest(final MemoryRecords records, final String description, final int repeatTimes) { + final EntryFormatter pulsarFormatter = EntryFormatterFactory.create("pulsar"); + final EntryFormatter kafkaFormatter = EntryFormatterFactory.create("kafka"); + // Here we also add a comparison with NoHeaderKafkaEntryFormatter to measure the overhead of adding a header + // and copy the ByteBuffer of MemoryRecords that are done by KafkaEntryFormatter. + final EntryFormatter noHeaderKafkaFormatter = new NoHeaderKafkaEntryFormatter(); + + System.out.println("--- " + description + " for " + repeatTimes + " times ---"); + + long t1 = System.currentTimeMillis(); + for (int i = 0; i < repeatTimes; i++) { + pulsarFormatter.encode(records, NUM_MESSAGES).release(); + } + long t2 = System.currentTimeMillis(); + System.out.println("PulsarEntryFormatter encode time: " + (t2 - t1) + " ms"); + + t1 = System.currentTimeMillis(); + for (int i = 0; i < repeatTimes; i++) { + kafkaFormatter.encode(records, NUM_MESSAGES).release(); + } + t2 = System.currentTimeMillis(); + System.out.println("KafkaEntryFormatter encode time: " + (t2 - t1) + " ms"); + + t1 = System.currentTimeMillis(); + for (int i = 0; i < repeatTimes; i++) { + noHeaderKafkaFormatter.encode(records, NUM_MESSAGES).release(); + } + t2 = System.currentTimeMillis(); + System.out.println("NoHeaderKafkaEntryFormatter encode time: " + (t2 - t1) + " ms"); + } + + private static MemoryRecordsBuilder newMemoryRecordsBuilder() { + return MemoryRecords.builder( + ByteBuffer.allocate(1024 * 1024 * 5), + RecordBatch.CURRENT_MAGIC_VALUE, + CompressionType.NONE, + TimestampType.CREATE_TIME, + 0L); + } + + private static MemoryRecords prepareFixedRecords() { + final MemoryRecordsBuilder builder = newMemoryRecordsBuilder(); + for (int i = 0; i < NUM_MESSAGES; i++) { + final byte[] value = new byte[MESSAGE_SIZE]; + Arrays.fill(value, (byte) 'a'); + builder.append(new SimpleRecord(System.currentTimeMillis(), "key".getBytes(), value)); + } + return builder.build(); + } + + private static MemoryRecords prepareRandomRecords() { + final MemoryRecordsBuilder builder = newMemoryRecordsBuilder(); + final Random random = new Random(); + for (int i = 0; i < NUM_MESSAGES; i++) { + final ByteBuffer buffer = ByteBuffer.allocate(MESSAGE_SIZE); + for (int j = 0; j < MESSAGE_SIZE / 4; j++) { + buffer.putInt(random.nextInt()); + } + builder.append(new SimpleRecord(System.currentTimeMillis(), "key".getBytes(), buffer.array())); + } + return builder.build(); + } +} diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/NoHeaderKafkaEntryFormatter.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/NoHeaderKafkaEntryFormatter.java new file mode 100644 index 0000000000..8e3733eed4 --- /dev/null +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/format/NoHeaderKafkaEntryFormatter.java @@ -0,0 +1,38 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.List; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.kafka.common.record.MemoryRecords; + +/** + * The entry formatter that uses Kafka's format but has no header. + */ +public class NoHeaderKafkaEntryFormatter implements EntryFormatter { + + @Override + public ByteBuf encode(MemoryRecords records, int numMessages) { + // The difference from KafkaEntryFormatter is here we don't add the header + return Unpooled.wrappedBuffer(records.buffer()); + } + + @Override + public MemoryRecords decode(List entries, byte magic) { + // Do nothing + return null; + } +} diff --git a/kafka-impl/src/test/resources/log4j2.xml b/kafka-impl/src/test/resources/log4j2.xml index d4e681da61..6cd1058fd0 100644 --- a/kafka-impl/src/test/resources/log4j2.xml +++ b/kafka-impl/src/test/resources/log4j2.xml @@ -26,6 +26,7 @@ + From 06d27a7fee6f012be44d9ad71695ed0f82b56940 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 25 Dec 2020 16:40:12 +0800 Subject: [PATCH 7/8] Fix doc error --- kafka-impl/conf/kop.conf | 2 +- kafka-impl/conf/kop_standalone.conf | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka-impl/conf/kop.conf b/kafka-impl/conf/kop.conf index 92291a99fa..2aafe0de90 100755 --- a/kafka-impl/conf/kop.conf +++ b/kafka-impl/conf/kop.conf @@ -87,7 +87,7 @@ maxReadEntriesNum=5 # just discard the pulsar header and send left bytes to Kafka consumer. # This mode means that current pulsar clients cannot interact with kafka clients, but # kafka producer works well with kafka consumer. -entry.format=pulsar +entryFormat=pulsar ### --- KoP SSL configs--- ### diff --git a/kafka-impl/conf/kop_standalone.conf b/kafka-impl/conf/kop_standalone.conf index 5a47bf0e62..92a027c55a 100644 --- a/kafka-impl/conf/kop_standalone.conf +++ b/kafka-impl/conf/kop_standalone.conf @@ -87,7 +87,7 @@ maxReadEntriesNum=1 # just discard the pulsar header and send left bytes to Kafka consumer. # This mode means that current pulsar clients cannot interact with kafka clients, but # kafka producer works well with kafka consumer. -entry.format=pulsar +entryFormat=pulsar ### --- KoP SSL configs--- ### From 30eee7d0bd2c4eb81136073e345d146dc7956084 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 26 Dec 2020 02:19:36 +0800 Subject: [PATCH 8/8] Fix entries not released --- .../streamnative/pulsar/handlers/kop/format/EntryFormatter.java | 1 + .../pulsar/handlers/kop/format/KafkaEntryFormatter.java | 1 + 2 files changed, 2 insertions(+) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java index 76b4afda7e..6003f0690c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java @@ -37,6 +37,7 @@ public interface EntryFormatter { /** * Decode a stream of entries to Kafka records. + * It should be noted that this method is responsible for releasing the entries. * * @param entries the list of entries * @param magic the Kafka record batch's magic value diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java index b71eb21d19..19e9d91ab9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/KafkaEntryFormatter.java @@ -64,6 +64,7 @@ public MemoryRecords decode(List entries, byte magic) { builder.appendWithOffset(offset, record); offset++; } + entry.release(); }); return builder.build(); }