diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/InternalServerCnx.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/InternalServerCnx.java index 195f256d84..152144fb2a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/InternalServerCnx.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/InternalServerCnx.java @@ -15,6 +15,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Objects; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.service.Producer; @@ -78,4 +79,24 @@ public void disableCnxAutoRead() { public void cancelPublishBufferLimiting() { // do nothing is this mock } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (o.hashCode() != this.hashCode()) { + return false; + } + ServerCnx other = (ServerCnx) o; + return Objects.equals(ctx().channel().id(), other.ctx().channel().id()); + } + + @Override + public int hashCode() { + return Objects.hash(ctx().channel().id()); + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java index 13e92518aa..5c817dc05d 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/PulsarEntryFormatter.java @@ -20,6 +20,7 @@ import io.streamnative.pulsar.handlers.kop.exceptions.KoPMessageMetadataNotFoundException; import io.streamnative.pulsar.handlers.kop.utils.ByteBufUtils; import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils; +import io.streamnative.pulsar.handlers.kop.utils.PulsarMessageBuilder; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -30,9 +31,7 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; @@ -142,8 +141,8 @@ public DecodeResult decode(final List entries, final byte magic) { // convert kafka Record to Pulsar Message. // called when publish received Kafka Record into Pulsar. private static MessageImpl recordToEntry(Record record) { - @SuppressWarnings("unchecked") - TypedMessageBuilderImpl builder = new TypedMessageBuilderImpl(null, Schema.BYTES); + + PulsarMessageBuilder builder = PulsarMessageBuilder.newBuilder(); // key if (record.hasKey()) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/PulsarMessageBuilder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/PulsarMessageBuilder.java new file mode 100644 index 0000000000..540815dab8 --- /dev/null +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/PulsarMessageBuilder.java @@ -0,0 +1,116 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.kop.utils; + +import static com.google.common.base.Preconditions.checkArgument; + +import io.netty.util.concurrent.FastThreadLocal; +import java.nio.ByteBuffer; +import java.util.Base64; +import java.util.Map; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.MessageMetadata; + +/** + * Manually build {@link MessageImpl}. + */ +public class PulsarMessageBuilder { + private static final ByteBuffer EMPTY_CONTENT = ByteBuffer.allocate(0); + private static final Schema SCHEMA = Schema.BYTES; + + private final transient MessageMetadata metadata; + private transient ByteBuffer content; + + private static final FastThreadLocal LOCAL_MESSAGE_METADATA = + new FastThreadLocal() { + @Override + protected MessageMetadata initialValue() { + return new MessageMetadata(); + } + }; + + private PulsarMessageBuilder() { + metadata = LOCAL_MESSAGE_METADATA.get(); + metadata.clear(); + this.content = EMPTY_CONTENT; + } + + public static PulsarMessageBuilder newBuilder() { + return new PulsarMessageBuilder(); + } + + public PulsarMessageBuilder keyBytes(byte[] key) { + metadata.setPartitionKey(Base64.getEncoder().encodeToString(key)); + metadata.setPartitionKeyB64Encoded(true); + return this; + } + + public PulsarMessageBuilder orderingKey(byte[] orderingKey) { + metadata.setOrderingKey(orderingKey); + return this; + } + + public PulsarMessageBuilder value(byte[] value) { + if (value == null) { + metadata.setNullValue(true); + return this; + } + this.content = ByteBuffer.wrap(SCHEMA.encode(value)); + return this; + } + + public PulsarMessageBuilder property(String name, String value) { + checkArgument(name != null, "Need Non-Null name"); + checkArgument(value != null, "Need Non-Null value for name: " + name); + metadata.addProperty() + .setKey(name) + .setValue(value); + return this; + } + + public PulsarMessageBuilder properties(Map properties) { + for (Map.Entry entry : properties.entrySet()) { + checkArgument(entry.getKey() != null, "Need Non-Null key"); + checkArgument(entry.getValue() != null, "Need Non-Null value for key: " + entry.getKey()); + metadata.addProperty() + .setKey(entry.getKey()) + .setValue(entry.getValue()); + } + + return this; + } + + public PulsarMessageBuilder eventTime(long timestamp) { + checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp); + metadata.setEventTime(timestamp); + return this; + } + + public MessageMetadata getMetadataBuilder() { + return metadata; + } + + public PulsarMessageBuilder sequenceId(long sequenceId) { + checkArgument(sequenceId >= 0); + metadata.setSequenceId(sequenceId); + return this; + } + + public Message getMessage() { + return MessageImpl.create(metadata, content, SCHEMA, null); + } + +} diff --git a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java index 5f51b175a4..4d6787cb6e 100644 --- a/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java +++ b/kafka-impl/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfigurationTest.java @@ -101,7 +101,7 @@ public void testConfigurationUtilsStream() throws Exception { printWriter.println("brokerDeleteInactiveTopicsEnabled=true"); printWriter.println("statusFilePath=/tmp/status.html"); printWriter.println("managedLedgerDefaultEnsembleSize=1"); - printWriter.println("backlogQuotaDefaultLimitGB=18"); + printWriter.println("backlogQuotaDefaultLimitBytes=18874368"); printWriter.println("clusterName=usc"); printWriter.println("brokerClientAuthenticationPlugin=test.xyz.client.auth.plugin"); printWriter.println("brokerClientAuthenticationParameters=role:my-role"); @@ -124,8 +124,8 @@ public void testConfigurationUtilsStream() throws Exception { assertNotNull(kafkaServiceConfig); assertEquals(kafkaServiceConfig.getZookeeperServers(), zkServer); - assertEquals(kafkaServiceConfig.isBrokerDeleteInactiveTopicsEnabled(), true); - assertEquals(kafkaServiceConfig.getBacklogQuotaDefaultLimitGB(), 18); + assertTrue(kafkaServiceConfig.isBrokerDeleteInactiveTopicsEnabled()); + assertEquals(kafkaServiceConfig.getBacklogQuotaDefaultLimitBytes(), 18874368); assertEquals(kafkaServiceConfig.getClusterName(), "usc"); assertEquals(kafkaServiceConfig.getBrokerClientAuthenticationParameters(), "role:my-role"); assertEquals(kafkaServiceConfig.getBrokerServicePort().get(), new Integer(7777)); diff --git a/pom.xml b/pom.xml index 17271a69b8..682705843a 100644 --- a/pom.xml +++ b/pom.xml @@ -46,7 +46,7 @@ 1.18.4 2.22.0 io.streamnative - 2.8.0.3 + 2.9.0-rc-202109241100 1.7.25 3.1.8 1.15.1