From 9e306ab393d38587f35eca3763e82aaeac75d513 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 21 Dec 2020 22:03:04 +0800 Subject: [PATCH 1/7] Add entry.format config --- kafka-impl/conf/kop.conf | 10 + kafka-impl/conf/kop_standalone.conf | 10 + .../handlers/kop/KafkaRequestHandler.java | 9 +- .../kop/KafkaServiceConfiguration.java | 5 + .../handlers/kop/MessageFetchContext.java | 5 +- .../pulsar/handlers/kop/PendingProduce.java | 8 +- .../handlers/kop/format/EntryFormatter.java | 43 ++ .../kop/format/EntryFormatterFactory.java | 25 ++ .../PulsarEntryFormatter.java} | 377 ++++++------------ .../handlers/kop/format/package-info.java | 14 + 10 files changed, 237 insertions(+), 269 deletions(-) create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java rename kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/{utils/MessageRecordUtils.java => format/PulsarEntryFormatter.java} (52%) create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java diff --git a/kafka-impl/conf/kop.conf b/kafka-impl/conf/kop.conf index a8ea392735..a5bb1c4d04 100755 --- a/kafka-impl/conf/kop.conf +++ b/kafka-impl/conf/kop.conf @@ -71,6 +71,16 @@ offsetsTopicNumPartitions=8 # Maximum number of entries that are read from cursor once per time maxReadEntriesNum=5 +# The format of an entry. The default value is pulsar. +# Optional values: [pulsar] +# +# pulsar: +# When KoP receives messages from kafka producer, it will serialize these messages to +# the format so that pulsar consumer can read directly. +# When KoP sends entries to kafka consumer, it will treat each entry as pulsar's +# format and deserialize each entry to kafka's format. +entry.format=pulsar + ### --- KoP SSL configs--- ### # Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol diff --git a/kafka-impl/conf/kop_standalone.conf b/kafka-impl/conf/kop_standalone.conf index b9b58ca777..21e7cfb9d9 100644 --- a/kafka-impl/conf/kop_standalone.conf +++ b/kafka-impl/conf/kop_standalone.conf @@ -71,6 +71,16 @@ offsetsTopicNumPartitions=8 # Maximum number of entries that are read from cursor once per time maxReadEntriesNum=1 +# The format of an entry. The default value is pulsar. +# Optional values: [pulsar] +# +# pulsar: +# When KoP receives messages from kafka producer, it will serialize these messages to +# the format so that pulsar consumer can read directly. +# When KoP sends entries to kafka consumer, it will treat each entry as pulsar's +# format and deserialize each entry to kafka's format. +entry.format=pulsar + ### --- KoP SSL configs--- ### # Kafka ssl configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 2c40bc457f..2368322bad 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -29,6 +29,8 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary; +import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; +import io.streamnative.pulsar.handlers.kop.format.EntryFormatterFactory; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator; import io.streamnative.pulsar.handlers.kop.utils.CoreUtils; @@ -157,6 +159,8 @@ public class KafkaRequestHandler extends KafkaCommandDecoder { private final int sslPort; private final int defaultNumPartitions; public final int maxReadEntriesNum; + @Getter + private final EntryFormatter entryFormatter; private final Map pendingProduceQueueMap = new ConcurrentHashMap<>(); @@ -184,6 +188,7 @@ public KafkaRequestHandler(PulsarService pulsarService, this.topicManager = new KafkaTopicManager(this); this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions(); this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum(); + this.entryFormatter = EntryFormatterFactory.create(kafkaConfig.getEntryFormat()); } @Override @@ -600,8 +605,8 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, MemoryRecords records = (MemoryRecords) entry.getValue(); String fullPartitionName = KopTopic.toString(topicPartition); - PendingProduce pendingProduce = - new PendingProduce(partitionResponse, topicManager, fullPartitionName, records, executor); + PendingProduce pendingProduce = new PendingProduce(partitionResponse, topicManager, fullPartitionName, + entryFormatter, records, executor); PendingProduceQueue queue = pendingProduceQueueMap.computeIfAbsent(topicPartition, ignored -> new PendingProduceQueue()); queue.add(pendingProduce); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java index 40bfbffe0f..ec561fbfb0 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java @@ -249,4 +249,9 @@ public class KafkaServiceConfiguration extends ServiceConfiguration { ) private int maxReadEntriesNum = 5; + @FieldContext( + category = CATEGORY_KOP, + doc = "The format of an entry. Default: pulsar. Optional: [pulsar]" + ) + private String entryFormat = "pulsar"; } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java index 9ba41c5892..f65d8d8757 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java @@ -13,7 +13,6 @@ */ package io.streamnative.pulsar.handlers.kop; -import static io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils.entriesToRecords; import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; import com.google.common.collect.Lists; @@ -322,9 +321,7 @@ private void readMessagesInternal(KafkaHeaderAndRequest fetch, } else if (apiVersion <= 3) { magic = RecordBatch.MAGIC_VALUE_V1; } - MemoryRecords records; - // by default kafka is produced message in batched mode. - records = entriesToRecords(entries, magic); + final MemoryRecords records = requestHandler.getEntryFormatter().decode(entries, magic); partitionData = new FetchResponse.PartitionData( Errors.NONE, diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java index 068de28dfa..a5b777c69b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java @@ -14,7 +14,7 @@ package io.streamnative.pulsar.handlers.kop; import io.netty.buffer.ByteBuf; -import io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils; +import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -42,6 +42,7 @@ public class PendingProduce { public PendingProduce(CompletableFuture responseFuture, KafkaTopicManager topicManager, String partitionName, + EntryFormatter entryFormatter, MemoryRecords memoryRecords, ExecutorService executor) { this.responseFuture = responseFuture; @@ -58,10 +59,7 @@ public PendingProduce(CompletableFuture responseFuture, log.error("Failed to compute ByteBuf for partition '{}': {}", partitionName, e); return null; }); - executor.execute(() -> { - ByteBuf byteBuf = MessageRecordUtils.recordsToByteBuf(memoryRecords, this.numMessages); - this.byteBufFuture.complete(byteBuf); - }); + executor.execute(() -> this.byteBufFuture.complete(entryFormatter.encode(memoryRecords))); this.offsetFuture = new CompletableFuture<>(); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java new file mode 100644 index 0000000000..4999b6c471 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java @@ -0,0 +1,43 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; + +import io.netty.buffer.ByteBuf; +import java.util.List; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.kafka.common.record.MemoryRecords; + + +/** + * The formatter for conversion between Kafka records and Bookie entries. + */ +public interface EntryFormatter { + + /** + * Encode Kafka records to a ByteBuf. + * + * @param records messages with Kafka's format + * @return the ByteBuf of an entry that is to be written to Bookie + */ + ByteBuf encode(MemoryRecords records); + + /** + * Decode a stream of entries to Kafka records. + * + * @param entries the list of entries + * @param magic the Kafka record batch's magic value + * @return the Kafka records + */ + MemoryRecords decode(List entries, byte magic); +} \ No newline at end of file diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java new file mode 100644 index 0000000000..6181189561 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java @@ -0,0 +1,25 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; + +public class EntryFormatterFactory { + + public static EntryFormatter create(final String format) { + if (format.equalsIgnoreCase("pulsar")) { + return new PulsarEntryFormatter(); + } else { + throw new IllegalArgumentException("Unsupported entry.format: " + format); + } + } +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java similarity index 52% rename from kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java rename to kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java index cd4ec237ad..0385c53d4b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java @@ -11,25 +11,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.streamnative.pulsar.handlers.kop.utils; +package io.streamnative.pulsar.handlers.kop.format; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; +import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; -import java.time.Clock; import java.util.Base64; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.stream.IntStream; import java.util.stream.StreamSupport; -import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.record.MemoryRecords; @@ -38,120 +36,29 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.PulsarApi; -import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; -import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; -import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.protocol.Commands.ChecksumType; /** - * Pulsar Message and Kafka Record utils. + * The entry formatter that uses Pulsar's format. */ -@UtilityClass @Slf4j -public final class MessageRecordUtils { +public class PulsarEntryFormatter implements EntryFormatter { private static final int DEFAULT_FETCH_BUFFER_SIZE = 1024 * 1024; private static final int MAX_RECORDS_BUFFER_SIZE = 100 * 1024 * 1024; - private static final String FAKE_KOP_PRODUCER_NAME = "fake_kop_producer_name"; - - private static final Clock clock = Clock.systemDefaultZone(); - - // convert kafka Record to Pulsar Message. - // called when publish received Kafka Record into Pulsar. - public static MessageImpl recordToEntry(Record record) { - @SuppressWarnings("unchecked") - TypedMessageBuilderImpl builder = new TypedMessageBuilderImpl(null, Schema.BYTES); - - // key - if (record.hasKey()) { - byte[] key = new byte[record.keySize()]; - record.key().get(key); - builder.keyBytes(key); - // reuse ordering key to avoid converting string < > bytes - builder.orderingKey(key); - } - - // value - if (record.hasValue()) { - byte[] value = new byte[record.valueSize()]; - record.value().get(value); - builder.value(value); - } else { - builder.value(new byte[0]); - } - - // sequence - if (record.sequence() >= 0) { - builder.sequenceId(record.sequence()); - } - - // timestamp - if (record.timestamp() >= 0) { - builder.eventTime(record.timestamp()); - builder.getMetadataBuilder().setPublishTime(record.timestamp()); - } else { - builder.getMetadataBuilder().setPublishTime(System.currentTimeMillis()); - } - - // header - for (Header h : record.headers()) { - builder.property(h.key(), - new String(h.value(), UTF_8)); - } - - return (MessageImpl) builder.getMessage(); - } - - // convert message to ByteBuf payload for ledger.addEntry. - // parameter message is converted from passed in Kafka record. - // called when publish received Kafka Record into Pulsar. - public static ByteBuf messageToByteBuf(Message message) { - checkArgument(message instanceof MessageImpl); - - MessageImpl msg = (MessageImpl) message; - MessageMetadata.Builder msgMetadataBuilder = msg.getMessageBuilder(); - ByteBuf payload = msg.getDataBuffer(); - - // filled in required fields - if (!msgMetadataBuilder.hasSequenceId()) { - msgMetadataBuilder.setSequenceId(-1); - } - if (!msgMetadataBuilder.hasPublishTime()) { - msgMetadataBuilder.setPublishTime(clock.millis()); - } - if (!msgMetadataBuilder.hasProducerName()) { - msgMetadataBuilder.setProducerName(FAKE_KOP_PRODUCER_NAME); - } - - msgMetadataBuilder.setCompression( - CompressionCodecProvider.convertToWireProtocol(CompressionType.NONE)); - msgMetadataBuilder.setUncompressedSize(payload.readableBytes()); - MessageMetadata msgMetadata = msgMetadataBuilder.build(); - - ByteBuf buf = Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload); - - msgMetadataBuilder.recycle(); - msgMetadata.recycle(); - - return buf; - } - //// for Batch messages - protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024; - protected static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024; + private static final int INITIAL_BATCH_BUFFER_SIZE = 1024; + private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024; - // If records stored in a batched way, turn MemoryRecords into a pulsar batched message. - public static ByteBuf recordsToByteBuf(MemoryRecords records, int size) { + @Override + public ByteBuf encode(MemoryRecords records) { long currentBatchSizeBytes = 0; int numMessagesInBatch = 0; @@ -160,9 +67,10 @@ public static ByteBuf recordsToByteBuf(MemoryRecords records, int size) { PulsarApi.CompressionType compressionType = PulsarApi.CompressionType.NONE; ByteBuf batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT - .buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES)); + .buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES)); + final int size = getMemoryRecordsCount(records); List> messages = Lists.newArrayListWithExpectedSize(size); - MessageMetadata.Builder messageMetaBuilder = MessageMetadata.newBuilder(); + PulsarApi.MessageMetadata.Builder messageMetaBuilder = PulsarApi.MessageMetadata.newBuilder(); StreamSupport.stream(records.records().spliterator(), true).forEachOrdered(record -> { MessageImpl message = recordToEntry(record); @@ -188,106 +96,6 @@ public static ByteBuf recordsToByteBuf(MemoryRecords records, int size) { msgBuilder.recycle(); } -// Iterator iterator = records.records().iterator(); -// while (iterator.hasNext()) { -// MessageImpl message = recordToEntry(iterator.next()); -// if (++numMessagesInBatch == 1) { -// sequenceId = Commands.initBatchMessageMetadata(messageMetaBuilder, message.getMessageBuilder()); -// } -// messages.add(message); -// currentBatchSizeBytes += message.getDataBuffer().readableBytes(); -// -// if (log.isDebugEnabled()) { -// log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", -// sequenceId, numMessagesInBatch, currentBatchSizeBytes); -// } -// } - -// for (MessageImpl msg : messages) { -// PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder(); -// batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, -// msg.getDataBuffer(), batchedMessageMetadataAndPayload); -// msgBuilder.recycle(); -// } - int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); - - if (PulsarApi.CompressionType.NONE != compressionType) { - messageMetaBuilder.setCompression(compressionType); - messageMetaBuilder.setUncompressedSize(uncompressedSize); - } - - messageMetaBuilder.setNumMessagesInBatch(numMessagesInBatch); - - MessageMetadata msgMetadata = messageMetaBuilder.build(); - - ByteBuf buf = Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, - msgMetadata, - batchedMessageMetadataAndPayload); - - messageMetaBuilder.recycle(); - msgMetadata.recycle(); - batchedMessageMetadataAndPayload.release(); - - return buf; - } - - public static void recordsToByteBuf(MemoryRecords records, int size, - CompletableFuture transFuture) { - long currentBatchSizeBytes = 0; - int numMessagesInBatch = 0; - - long sequenceId = -1; - - // TODO: handle different compression type - PulsarApi.CompressionType compressionType = PulsarApi.CompressionType.NONE; - - ByteBuf batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT - .buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES)); - List> messages = Lists.newArrayListWithExpectedSize(size); - MessageMetadata.Builder messageMetaBuilder = MessageMetadata.newBuilder(); - - StreamSupport.stream(records.records().spliterator(), true).forEachOrdered(record -> { - MessageImpl message = recordToEntry(record); - messages.add(message); - }); - - for (MessageImpl message : messages) { - if (++numMessagesInBatch == 1) { - sequenceId = Commands.initBatchMessageMetadata(messageMetaBuilder, message.getMessageBuilder()); - } - currentBatchSizeBytes += message.getDataBuffer().readableBytes(); - if (log.isDebugEnabled()) { - log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", - sequenceId, numMessagesInBatch, currentBatchSizeBytes); - } - - PulsarApi.MessageMetadata.Builder msgBuilder = message.getMessageBuilder(); - batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, - message.getDataBuffer(), batchedMessageMetadataAndPayload); - msgBuilder.recycle(); - } - -// Iterator iterator = records.records().iterator(); -// while (iterator.hasNext()) { -// MessageImpl message = recordToEntry(iterator.next()); -// if (++numMessagesInBatch == 1) { -// sequenceId = Commands.initBatchMessageMetadata(messageMetaBuilder, message.getMessageBuilder()); -// } -// messages.add(message); -// currentBatchSizeBytes += message.getDataBuffer().readableBytes(); -// -// if (log.isDebugEnabled()) { -// log.debug("recordsToByteBuf , sequenceId: {}, numMessagesInBatch: {}, currentBatchSizeBytes: {} ", -// sequenceId, numMessagesInBatch, currentBatchSizeBytes); -// } -// } - -// for (MessageImpl msg : messages) { -// PulsarApi.MessageMetadata.Builder msgBuilder = msg.getMessageBuilder(); -// batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, -// msg.getDataBuffer(), batchedMessageMetadataAndPayload); -// msgBuilder.recycle(); -// } int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); if (PulsarApi.CompressionType.NONE != compressionType) { @@ -297,9 +105,9 @@ public static void recordsToByteBuf(MemoryRecords records, int size, messageMetaBuilder.setNumMessagesInBatch(numMessagesInBatch); - MessageMetadata msgMetadata = messageMetaBuilder.build(); + PulsarApi.MessageMetadata msgMetadata = messageMetaBuilder.build(); - ByteBuf buf = Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, + ByteBuf buf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, msgMetadata, batchedMessageMetadataAndPayload); @@ -307,54 +115,31 @@ public static void recordsToByteBuf(MemoryRecords records, int size, msgMetadata.recycle(); batchedMessageMetadataAndPayload.release(); - transFuture.complete(buf); - } - - private static Header[] getHeadersFromMetadata(List properties) { - Header[] headers = new Header[properties.size()]; - - if (log.isDebugEnabled()) { - log.debug("getHeadersFromMetadata. Header size: {}", - properties.size()); - } - - int index = 0; - for (KeyValue kv: properties) { - headers[index] = new RecordHeader(kv.getKey(), kv.getValue().getBytes(UTF_8)); - - if (log.isDebugEnabled()) { - log.debug("index: {} kv.getKey: {}. kv.getValue: {}", - index, kv.getKey(), kv.getValue()); - } - index++; - } - - return headers; + return buf; } - // Convert entries read from BookKeeper into Kafka Records - // Entries can be batched messages, may need un-batch. - public static MemoryRecords entriesToRecords(List entries, byte magic) { + @Override + public MemoryRecords decode(List entries, byte magic) { try (ByteBufferOutputStream outputStream = new ByteBufferOutputStream(DEFAULT_FETCH_BUFFER_SIZE)) { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(outputStream, magic, - org.apache.kafka.common.record.CompressionType.NONE, - TimestampType.CREATE_TIME, - // using the first entry, index 0 as base offset - MessageIdUtils.getOffset(entries.get(0).getLedgerId(), entries.get(0).getEntryId(), 0), - RecordBatch.NO_TIMESTAMP, - RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, - RecordBatch.NO_SEQUENCE, - false, false, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - MAX_RECORDS_BUFFER_SIZE); + org.apache.kafka.common.record.CompressionType.NONE, + TimestampType.CREATE_TIME, + // using the first entry, index 0 as base offset + MessageIdUtils.getOffset(entries.get(0).getLedgerId(), entries.get(0).getEntryId(), 0), + RecordBatch.NO_TIMESTAMP, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + false, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + MAX_RECORDS_BUFFER_SIZE); entries.parallelStream().forEachOrdered(entry -> { // each entry is a batched message ByteBuf metadataAndPayload = entry.getDataBuffer(); // Uncompress the payload if necessary - MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload); + PulsarApi.MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload); CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression()); int uncompressedSize = msgMetadata.getUncompressedSize(); ByteBuf payload; @@ -369,9 +154,9 @@ public static MemoryRecords entriesToRecords(List 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), - getKeyByteBuffer(msgMetadata), - getNioBuffer(payload), - headers); + MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()), + msgMetadata.getEventTime() > 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), + getKeyByteBuffer(msgMetadata), + getNioBuffer(payload), + headers); } msgMetadata.recycle(); @@ -430,7 +215,84 @@ public static MemoryRecords entriesToRecords(List recordToEntry(Record record) { + @SuppressWarnings("unchecked") + TypedMessageBuilderImpl builder = new TypedMessageBuilderImpl(null, Schema.BYTES); + + // key + if (record.hasKey()) { + byte[] key = new byte[record.keySize()]; + record.key().get(key); + builder.keyBytes(key); + // reuse ordering key to avoid converting string < > bytes + builder.orderingKey(key); + } + + // value + if (record.hasValue()) { + byte[] value = new byte[record.valueSize()]; + record.value().get(value); + builder.value(value); + } else { + builder.value(new byte[0]); + } + + // sequence + if (record.sequence() >= 0) { + builder.sequenceId(record.sequence()); + } + + // timestamp + if (record.timestamp() >= 0) { + builder.eventTime(record.timestamp()); + builder.getMetadataBuilder().setPublishTime(record.timestamp()); + } else { + builder.getMetadataBuilder().setPublishTime(System.currentTimeMillis()); + } + + // header + for (Header h : record.headers()) { + builder.property(h.key(), + new String(h.value(), UTF_8)); + } + + return (MessageImpl) builder.getMessage(); + } + + private Header[] getHeadersFromMetadata(List properties) { + Header[] headers = new Header[properties.size()]; + + if (log.isDebugEnabled()) { + log.debug("getHeadersFromMetadata. Header size: {}", + properties.size()); + } + + int index = 0; + for (PulsarApi.KeyValue kv: properties) { + headers[index] = new RecordHeader(kv.getKey(), kv.getValue().getBytes(UTF_8)); + + if (log.isDebugEnabled()) { + log.debug("index: {} kv.getKey: {}. kv.getValue: {}", + index, kv.getKey(), kv.getValue()); + } + index++; + } + + return headers; + } + + private static ByteBuffer getKeyByteBuffer(PulsarApi.SingleMessageMetadata messageMetadata) { if (messageMetadata.hasOrderingKey()) { return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); } @@ -444,7 +306,7 @@ private static ByteBuffer getKeyByteBuffer(MessageMetadata messageMetadata) { } } - private static ByteBuffer getKeyByteBuffer(SingleMessageMetadata messageMetadata) { + private ByteBuffer getKeyByteBuffer(PulsarApi.MessageMetadata messageMetadata) { if (messageMetadata.hasOrderingKey()) { return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); } @@ -458,7 +320,7 @@ private static ByteBuffer getKeyByteBuffer(SingleMessageMetadata messageMetadata } } - public static ByteBuffer getNioBuffer(ByteBuf buffer) { + private ByteBuffer getNioBuffer(ByteBuf buffer) { if (buffer.isDirect()) { return buffer.nioBuffer(); } @@ -466,5 +328,4 @@ public static ByteBuffer getNioBuffer(ByteBuf buffer) { buffer.getBytes(buffer.readerIndex(), bytes); return ByteBuffer.wrap(bytes); } - } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java new file mode 100644 index 0000000000..1f1285b48a --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java @@ -0,0 +1,14 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.format; \ No newline at end of file From 41528a2d0161f51584d205ae301f3b32e09cd361 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 22 Dec 2020 00:29:48 +0800 Subject: [PATCH 2/7] Fix checkstyle --- .../handlers/kop/format/EntryFormatter.java | 2 +- .../kop/format/EntryFormatterFactory.java | 5 ++++ .../kop/format/PulsarEntryFormatter.java | 30 +++++++++++-------- .../handlers/kop/format/package-info.java | 2 +- 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java index 4999b6c471..71d78094eb 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java @@ -40,4 +40,4 @@ public interface EntryFormatter { * @return the Kafka records */ MemoryRecords decode(List entries, byte magic); -} \ No newline at end of file +} diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java index 6181189561..04ee614ad4 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java @@ -13,6 +13,11 @@ */ package io.streamnative.pulsar.handlers.kop.format; +/** + * Factory of EntryFormatter. + * + * @see EntryFormatter + */ public class EntryFormatterFactory { public static EntryFormatter create(final String format) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java index 0385c53d4b..3b53cfb4e9 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java @@ -41,9 +41,13 @@ import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.Commands.ChecksumType; /** @@ -70,7 +74,7 @@ public ByteBuf encode(MemoryRecords records) { .buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES)); final int size = getMemoryRecordsCount(records); List> messages = Lists.newArrayListWithExpectedSize(size); - PulsarApi.MessageMetadata.Builder messageMetaBuilder = PulsarApi.MessageMetadata.newBuilder(); + MessageMetadata.Builder messageMetaBuilder = MessageMetadata.newBuilder(); StreamSupport.stream(records.records().spliterator(), true).forEachOrdered(record -> { MessageImpl message = recordToEntry(record); @@ -105,9 +109,9 @@ public ByteBuf encode(MemoryRecords records) { messageMetaBuilder.setNumMessagesInBatch(numMessagesInBatch); - PulsarApi.MessageMetadata msgMetadata = messageMetaBuilder.build(); + MessageMetadata msgMetadata = messageMetaBuilder.build(); - ByteBuf buf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, + ByteBuf buf = Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, batchedMessageMetadataAndPayload); @@ -139,7 +143,7 @@ public MemoryRecords decode(List entries, byte magic) { ByteBuf metadataAndPayload = entry.getDataBuffer(); // Uncompress the payload if necessary - PulsarApi.MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload); + MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload); CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression()); int uncompressedSize = msgMetadata.getUncompressedSize(); ByteBuf payload; @@ -168,12 +172,12 @@ public MemoryRecords decode(List entries, byte magic) { log.debug(" processing message num - {} in batch", i); } try { - PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata + SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata .newBuilder(); ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadataBuilder, i, numMessages); - PulsarApi.SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build(); + SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.build(); Header[] headers = getHeadersFromMetadata(singleMessageMetadata.getPropertiesList()); builder.appendWithOffset( @@ -215,7 +219,7 @@ public MemoryRecords decode(List entries, byte magic) { } } - private int getMemoryRecordsCount(final MemoryRecords records) { + private static int getMemoryRecordsCount(final MemoryRecords records) { int n = 0; for (Record ignored : records.records()) { n++; @@ -226,7 +230,7 @@ private int getMemoryRecordsCount(final MemoryRecords records) { // convert kafka Record to Pulsar Message. // convert kafka Record to Pulsar Message. // called when publish received Kafka Record into Pulsar. - private MessageImpl recordToEntry(Record record) { + private static MessageImpl recordToEntry(Record record) { @SuppressWarnings("unchecked") TypedMessageBuilderImpl builder = new TypedMessageBuilderImpl(null, Schema.BYTES); @@ -270,7 +274,7 @@ private MessageImpl recordToEntry(Record record) { return (MessageImpl) builder.getMessage(); } - private Header[] getHeadersFromMetadata(List properties) { + private Header[] getHeadersFromMetadata(List properties) { Header[] headers = new Header[properties.size()]; if (log.isDebugEnabled()) { @@ -279,7 +283,7 @@ private Header[] getHeadersFromMetadata(List properties) { } int index = 0; - for (PulsarApi.KeyValue kv: properties) { + for (KeyValue kv: properties) { headers[index] = new RecordHeader(kv.getKey(), kv.getValue().getBytes(UTF_8)); if (log.isDebugEnabled()) { @@ -292,7 +296,7 @@ private Header[] getHeadersFromMetadata(List properties) { return headers; } - private static ByteBuffer getKeyByteBuffer(PulsarApi.SingleMessageMetadata messageMetadata) { + private static ByteBuffer getKeyByteBuffer(SingleMessageMetadata messageMetadata) { if (messageMetadata.hasOrderingKey()) { return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); } @@ -306,7 +310,7 @@ private static ByteBuffer getKeyByteBuffer(PulsarApi.SingleMessageMetadata messa } } - private ByteBuffer getKeyByteBuffer(PulsarApi.MessageMetadata messageMetadata) { + private static ByteBuffer getKeyByteBuffer(MessageMetadata messageMetadata) { if (messageMetadata.hasOrderingKey()) { return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); } @@ -320,7 +324,7 @@ private ByteBuffer getKeyByteBuffer(PulsarApi.MessageMetadata messageMetadata) { } } - private ByteBuffer getNioBuffer(ByteBuf buffer) { + private static ByteBuffer getNioBuffer(ByteBuf buffer) { if (buffer.isDirect()) { return buffer.nioBuffer(); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java index 1f1285b48a..16e3703426 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/package-info.java @@ -11,4 +11,4 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.streamnative.pulsar.handlers.kop.format; \ No newline at end of file +package io.streamnative.pulsar.handlers.kop.format; From 8bbcdb174d5f79d5f4d8afd08dd03bc8c65564d8 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 22 Dec 2020 10:50:08 +0800 Subject: [PATCH 3/7] Change EntryFormatter#encode API --- .../pulsar/handlers/kop/PendingProduce.java | 13 +--- .../handlers/kop/format/EntryFormatter.java | 25 +++++++- .../kop/format/PulsarEntryFormatter.java | 15 +---- .../handlers/kop/EntryFormatterTest.java | 62 +++++++++++++++++++ 4 files changed, 90 insertions(+), 25 deletions(-) create mode 100644 kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EntryFormatterTest.java diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java index a5b777c69b..0b03d93814 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java @@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.Record; import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -48,7 +47,7 @@ public PendingProduce(CompletableFuture responseFuture, this.responseFuture = responseFuture; this.topicManager = topicManager; this.partitionName = partitionName; - this.numMessages = parseNumMessages(memoryRecords); + this.numMessages = EntryFormatter.parseNumMessages(memoryRecords); this.topicFuture = topicManager.getTopic(partitionName).exceptionally(e -> { log.error("Failed to getTopic for partition '{}': {}", partitionName, e); @@ -59,7 +58,7 @@ public PendingProduce(CompletableFuture responseFuture, log.error("Failed to compute ByteBuf for partition '{}': {}", partitionName, e); return null; }); - executor.execute(() -> this.byteBufFuture.complete(entryFormatter.encode(memoryRecords))); + executor.execute(() -> byteBufFuture.complete(entryFormatter.encode(memoryRecords, numMessages))); this.offsetFuture = new CompletableFuture<>(); } @@ -117,12 +116,4 @@ public void publishMessages() { byteBuf.release(); }); } - - private static int parseNumMessages(MemoryRecords records) { - int n = 0; - for (Record ignored : records.records()) { - n++; - } - return n; - } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java index 71d78094eb..83cfe527fe 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java @@ -15,8 +15,11 @@ import io.netty.buffer.ByteBuf; import java.util.List; +import java.util.Optional; + import org.apache.bookkeeper.mledger.Entry; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; /** @@ -28,9 +31,10 @@ public interface EntryFormatter { * Encode Kafka records to a ByteBuf. * * @param records messages with Kafka's format + * @param numMessages the number of messages * @return the ByteBuf of an entry that is to be written to Bookie */ - ByteBuf encode(MemoryRecords records); + ByteBuf encode(final MemoryRecords records, final int numMessages); /** * Decode a stream of entries to Kafka records. @@ -39,5 +43,22 @@ public interface EntryFormatter { * @param magic the Kafka record batch's magic value * @return the Kafka records */ - MemoryRecords decode(List entries, byte magic); + MemoryRecords decode(final List entries, final byte magic); + + /** + * Get the number of messages from MemoryRecords. + * Since MemoryRecords doesn't provide a way to get the number of messages. We need to iterate over the whole + * MemoryRecords object. So we use a helper method to get the number of messages that can be passed to + * {@link EntryFormatter#encode(MemoryRecords, int)} and metrics related methods as well. + * + * @param records messages with Kafka's format + * @return the number of messages + */ + static int parseNumMessages(final MemoryRecords records) { + int numMessages = 0; + for (MutableRecordBatch batch : records.batches()) { + numMessages += (batch.lastOffset() - batch.baseOffset() + 1); + } + return numMessages; + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java index 3b53cfb4e9..b8e60f6e49 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java @@ -62,7 +62,7 @@ public class PulsarEntryFormatter implements EntryFormatter { private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024; @Override - public ByteBuf encode(MemoryRecords records) { + public ByteBuf encode(final MemoryRecords records, final int numMessages) { long currentBatchSizeBytes = 0; int numMessagesInBatch = 0; @@ -72,8 +72,7 @@ public ByteBuf encode(MemoryRecords records) { ByteBuf batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT .buffer(Math.min(INITIAL_BATCH_BUFFER_SIZE, MAX_MESSAGE_BATCH_SIZE_BYTES)); - final int size = getMemoryRecordsCount(records); - List> messages = Lists.newArrayListWithExpectedSize(size); + List> messages = Lists.newArrayListWithExpectedSize(numMessages); MessageMetadata.Builder messageMetaBuilder = MessageMetadata.newBuilder(); StreamSupport.stream(records.records().spliterator(), true).forEachOrdered(record -> { @@ -123,7 +122,7 @@ public ByteBuf encode(MemoryRecords records) { } @Override - public MemoryRecords decode(List entries, byte magic) { + public MemoryRecords decode(final List entries, final byte magic) { try (ByteBufferOutputStream outputStream = new ByteBufferOutputStream(DEFAULT_FETCH_BUFFER_SIZE)) { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(outputStream, magic, org.apache.kafka.common.record.CompressionType.NONE, @@ -219,14 +218,6 @@ public MemoryRecords decode(List entries, byte magic) { } } - private static int getMemoryRecordsCount(final MemoryRecords records) { - int n = 0; - for (Record ignored : records.records()) { - n++; - } - return n; - } - // convert kafka Record to Pulsar Message. // convert kafka Record to Pulsar Message. // called when publish received Kafka Record into Pulsar. diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EntryFormatterTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EntryFormatterTest.java new file mode 100644 index 0000000000..e6714ce89d --- /dev/null +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/EntryFormatterTest.java @@ -0,0 +1,62 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop; + +import static org.testng.Assert.assertEquals; + +import io.streamnative.pulsar.handlers.kop.format.EntryFormatter; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.stream.IntStream; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Tests for EntryFormatter. + */ +public class EntryFormatterTest { + + @DataProvider(name = "compressionTypes") + Object[] allCompressionTypes() { + return Arrays.stream(CompressionType.values()).map(x -> (Object) x).toArray(); + } + + @Test(dataProvider = "compressionTypes") + public void testParseNumMessages(CompressionType compressionType) { + ByteBuffer buffer = ByteBuffer.allocate(1024); + int[] batchSizes = {3, 2, 5}; + + int baseOffset = 0; + for (int batchSize : batchSizes) { + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, + compressionType, TimestampType.LOG_APPEND_TIME, baseOffset); + for (int i = 0; i < batchSize; i++) { + builder.append(0L, "a".getBytes(), "1".getBytes()); + } + baseOffset += batchSize; + // Normally the offsets of batches are continuous, here we add an extra interval just for robustness. + baseOffset += 1; + builder.close(); + } + + buffer.flip(); + final MemoryRecords records = MemoryRecords.readableRecords(buffer); + assertEquals(EntryFormatter.parseNumMessages(records), IntStream.of(batchSizes).sum()); + } +} From 283f58a18a712fc24e8fde34f3171ec9e49562e7 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 22 Dec 2020 10:56:35 +0800 Subject: [PATCH 4/7] Fix test error --- .../java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java index 081856bee0..0c9ca18013 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaApisTest.java @@ -493,7 +493,7 @@ private void produceData(KafkaProducer producer, @Test(timeOut = 20000) public void testBrokerRespectsPartitionsOrderAndSizeLimits() throws Exception { String topicName = "kopBrokerRespectsPartitionsOrderAndSizeLimits"; - int numberTopics = 5; + int numberTopics = 8; int numberPartitions = 6; int messagesPerPartition = 9; From 0b6833ba6def1770880766b9cbab6a97d4130f3f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 22 Dec 2020 11:28:39 +0800 Subject: [PATCH 5/7] Fix checkstyle --- .../streamnative/pulsar/handlers/kop/format/EntryFormatter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java index 83cfe527fe..76b4afda7e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatter.java @@ -15,7 +15,6 @@ import io.netty.buffer.ByteBuf; import java.util.List; -import java.util.Optional; import org.apache.bookkeeper.mledger.Entry; import org.apache.kafka.common.record.MemoryRecords; From bcac5b8d873f203f62353f2df1222da0b4b08c1e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 23 Dec 2020 16:40:56 +0800 Subject: [PATCH 6/7] Use enum to validata entry.format --- .../kop/format/EntryFormatterFactory.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java index 04ee614ad4..4dba4b7999 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EntryFormatterFactory.java @@ -20,11 +20,21 @@ */ public class EntryFormatterFactory { + enum EntryFormat { + PULSAR + } + public static EntryFormatter create(final String format) { - if (format.equalsIgnoreCase("pulsar")) { - return new PulsarEntryFormatter(); - } else { - throw new IllegalArgumentException("Unsupported entry.format: " + format); + try { + EntryFormat entryFormat = Enum.valueOf(EntryFormat.class, format.toUpperCase()); + switch (entryFormat) { + case PULSAR: + return new PulsarEntryFormatter(); + default: + throw new Exception("No EntryFormatter for " + entryFormat); + } + } catch (Exception e) { + throw new IllegalArgumentException("Unsupported entry.format '" + format + "': " + e.getMessage()); } } } From 803ccac407d13e5cfee2adea4b93b9d7ea5edf9a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 23 Dec 2020 20:54:37 +0800 Subject: [PATCH 7/7] Add ByteBufUtils --- .../kop/format/PulsarEntryFormatter.java | 46 ++----------- .../handlers/kop/utils/ByteBufUtils.java | 65 +++++++++++++++++++ 2 files changed, 70 insertions(+), 41 deletions(-) create mode 100644 kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java index b8e60f6e49..7a5117841b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java @@ -18,11 +18,10 @@ import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; +import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils; import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; import java.io.IOException; import java.io.UncheckedIOException; -import java.nio.ByteBuffer; -import java.util.Base64; import java.util.List; import java.util.stream.IntStream; import java.util.stream.StreamSupport; @@ -183,8 +182,8 @@ public MemoryRecords decode(final List entries, final byte magic) { MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId(), i), msgMetadata.getEventTime() > 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), - getKeyByteBuffer(singleMessageMetadata), - getNioBuffer(singleMessagePayload), + ByteBufUtils.getKeyByteBuffer(singleMessageMetadata), + ByteBufUtils.getNioBuffer(singleMessagePayload), headers); singleMessagePayload.release(); singleMessageMetadataBuilder.recycle(); @@ -199,8 +198,8 @@ public MemoryRecords decode(final List entries, final byte magic) { builder.appendWithOffset( MessageIdUtils.getOffset(entry.getLedgerId(), entry.getEntryId()), msgMetadata.getEventTime() > 0 ? msgMetadata.getEventTime() : msgMetadata.getPublishTime(), - getKeyByteBuffer(msgMetadata), - getNioBuffer(payload), + ByteBufUtils.getKeyByteBuffer(msgMetadata), + ByteBufUtils.getNioBuffer(payload), headers); } @@ -287,40 +286,5 @@ private Header[] getHeadersFromMetadata(List properties) { return headers; } - private static ByteBuffer getKeyByteBuffer(SingleMessageMetadata messageMetadata) { - if (messageMetadata.hasOrderingKey()) { - return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); - } - - String key = messageMetadata.getPartitionKey(); - if (messageMetadata.hasPartitionKeyB64Encoded()) { - return ByteBuffer.wrap(Base64.getDecoder().decode(key)); - } else { - // for Base64 not encoded string, convert to UTF_8 chars - return ByteBuffer.wrap(key.getBytes(UTF_8)); - } - } - - private static ByteBuffer getKeyByteBuffer(MessageMetadata messageMetadata) { - if (messageMetadata.hasOrderingKey()) { - return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); - } - String key = messageMetadata.getPartitionKey(); - if (messageMetadata.hasPartitionKeyB64Encoded()) { - return ByteBuffer.wrap(Base64.getDecoder().decode(key)); - } else { - // for Base64 not encoded string, convert to UTF_8 chars - return ByteBuffer.wrap(key.getBytes(UTF_8)); - } - } - - private static ByteBuffer getNioBuffer(ByteBuf buffer) { - if (buffer.isDirect()) { - return buffer.nioBuffer(); - } - final byte[] bytes = new byte[buffer.readableBytes()]; - buffer.getBytes(buffer.readerIndex(), bytes); - return ByteBuffer.wrap(bytes); - } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java new file mode 100644 index 0000000000..0450ba8963 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java @@ -0,0 +1,65 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.utils; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import io.netty.buffer.ByteBuf; +import java.nio.ByteBuffer; +import java.util.Base64; +import org.apache.pulsar.common.api.proto.PulsarApi; + + +/** + * Utils for ByteBuf operations. + */ +public class ByteBufUtils { + + public static ByteBuffer getKeyByteBuffer(PulsarApi.SingleMessageMetadata messageMetadata) { + if (messageMetadata.hasOrderingKey()) { + return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); + } + + String key = messageMetadata.getPartitionKey(); + if (messageMetadata.hasPartitionKeyB64Encoded()) { + return ByteBuffer.wrap(Base64.getDecoder().decode(key)); + } else { + // for Base64 not encoded string, convert to UTF_8 chars + return ByteBuffer.wrap(key.getBytes(UTF_8)); + } + } + + public static ByteBuffer getKeyByteBuffer(PulsarApi.MessageMetadata messageMetadata) { + if (messageMetadata.hasOrderingKey()) { + return messageMetadata.getOrderingKey().asReadOnlyByteBuffer(); + } + + String key = messageMetadata.getPartitionKey(); + if (messageMetadata.hasPartitionKeyB64Encoded()) { + return ByteBuffer.wrap(Base64.getDecoder().decode(key)); + } else { + // for Base64 not encoded string, convert to UTF_8 chars + return ByteBuffer.wrap(key.getBytes(UTF_8)); + } + } + + public static ByteBuffer getNioBuffer(ByteBuf buffer) { + if (buffer.isDirect()) { + return buffer.nioBuffer(); + } + final byte[] bytes = new byte[buffer.readableBytes()]; + buffer.getBytes(buffer.readerIndex(), bytes); + return ByteBuffer.wrap(bytes); + } +}