Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pulsar.client.api;

import java.util.BitSet;
import org.apache.pulsar.client.internal.DefaultImplementation;

/**
* The MessageId used for a consumer that subscribes multiple topics or partitioned topics.
Expand All @@ -45,84 +45,6 @@ static TopicMessageId create(String topic, MessageId messageId) {
if (messageId instanceof TopicMessageId) {
return (TopicMessageId) messageId;
}
return new Impl(topic, messageId);
}

/**
* The simplest implementation of a TopicMessageId interface.
*/
class Impl implements MessageIdAdv, TopicMessageId {
private final String topic;
private final MessageIdAdv messageId;

public Impl(String topic, MessageId messageId) {
this.topic = topic;
this.messageId = (MessageIdAdv) messageId;
}

@Override
public byte[] toByteArray() {
return messageId.toByteArray();
}

@Override
public String getOwnerTopic() {
return topic;
}

@Override
public long getLedgerId() {
return messageId.getLedgerId();
}

@Override
public long getEntryId() {
return messageId.getEntryId();
}

@Override
public int getPartitionIndex() {
return messageId.getPartitionIndex();
}

@Override
public int getBatchIndex() {
return messageId.getBatchIndex();
}

@Override
public int getBatchSize() {
return messageId.getBatchSize();
}

@Override
public BitSet getAckSet() {
return messageId.getAckSet();
}

@Override
public MessageIdAdv getFirstChunkMessageId() {
return messageId.getFirstChunkMessageId();
}

@Override
public int compareTo(MessageId o) {
return messageId.compareTo(o);
}

@Override
public boolean equals(Object obj) {
return messageId.equals(obj);
}

@Override
public int hashCode() {
return messageId.hashCode();
}

@Override
public String toString() {
return messageId.toString();
}
return DefaultImplementation.getDefaultImplementation().newTopicMessageId(topic, messageId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.client.api.MessagePayloadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
Expand Down Expand Up @@ -252,4 +253,6 @@ static byte[] getBytes(ByteBuffer byteBuffer) {

SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, long timestamp,
Map<String, String> propertiesValue);

TopicMessageId newTopicMessageId(String topic, MessageId messageId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2345,7 +2345,7 @@ public CompletableFuture<MessageId> getLastMessageIdAsync() {
@Override
public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
return getLastMessageIdAsync()
.thenApply(msgId -> Collections.singletonList(TopicMessageId.create(topic, msgId)));
.thenApply(msgId -> Collections.singletonList(new TopicMessageIdImpl(topic, (MessageIdAdv) msgId)));
}

public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName)
throw new IOException(e);
}

MessageId messageId;
MessageIdAdv messageId;
if (idData.hasBatchIndex()) {
if (idData.hasBatchSize()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
Expand All @@ -143,7 +143,7 @@ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName)
}
if (idData.getPartition() > -1 && topicName != null) {
messageId = new TopicMessageIdImpl(
topicName.getPartition(idData.getPartition()).toString(), topicName.toString(), messageId);
topicName.getPartition(idData.getPartition()).toString(), messageId);
}

return messageId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;


import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
Expand All @@ -35,9 +34,11 @@
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessagePayloadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
Expand Down Expand Up @@ -387,4 +388,19 @@ public SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type,
Map<String, String> propertiesValue) {
return new SchemaInfoImpl(name, schema, type, timestamp, propertiesValue);
}

@Override
public TopicMessageId newTopicMessageId(String topic, MessageId messageId) {
final MessageIdAdv messageIdAdv;
if (messageId instanceof MessageIdAdv) {
messageIdAdv = (MessageIdAdv) messageId;
} else {
try {
messageIdAdv = (MessageIdAdv) MessageId.fromByteArray(messageId.toByteArray());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return new TopicMessageIdImpl(topic, messageIdAdv);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,27 @@
*/
package org.apache.pulsar.client.impl;

import java.util.BitSet;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.TopicMessageId;

public class TopicMessageIdImpl extends TopicMessageId.Impl {
public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId {

private final String topicName;
private final String ownerTopic;
private final MessageIdAdv msgId;
private final String topicName; // it's never used

public TopicMessageIdImpl(String topic, MessageIdAdv msgId) {
this.ownerTopic = topic;
this.msgId = msgId;
this.topicName = "";
}
Comment thread
BewareMyPower marked this conversation as resolved.

@Deprecated
public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) {
super(topicPartitionName, messageId);
this.msgId = (MessageIdAdv) messageId;
this.ownerTopic = topicPartitionName;
this.topicName = topicName;
}

Expand Down Expand Up @@ -55,11 +67,66 @@ public MessageId getInnerMessageId() {

@Override
public boolean equals(Object obj) {
return super.equals(obj);
return msgId.equals(obj);
}

@Override
public int hashCode() {
return super.hashCode();
return msgId.hashCode();
}

@Override
public int compareTo(MessageId o) {
return msgId.compareTo(o);
}

@Override
public byte[] toByteArray() {
return msgId.toByteArray();
}

@Override
public String getOwnerTopic() {
return ownerTopic;
}

@Override
public long getLedgerId() {
return msgId.getLedgerId();
}

@Override
public long getEntryId() {
return msgId.getEntryId();
}

@Override
public int getPartitionIndex() {
return msgId.getPartitionIndex();
}

@Override
public int getBatchIndex() {
return msgId.getBatchIndex();
}

@Override
public int getBatchSize() {
return msgId.getBatchSize();
}

@Override
public BitSet getAckSet() {
return msgId.getAckSet();
}

@Override
public MessageIdAdv getFirstChunkMessageId() {
return msgId.getFirstChunkMessageId();
}

@Override
public String toString() {
return msgId.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.EncryptionContext;

Expand All @@ -42,7 +43,7 @@ public class TopicMessageImpl<T> implements Message<T> {
this.receivedByconsumer = receivedByConsumer;

this.msg = msg;
this.messageId = new TopicMessageIdImpl(topicPartitionName, topicPartitionName, msg.getMessageId());
this.messageId = new TopicMessageIdImpl(topicPartitionName, (MessageIdAdv) msg.getMessageId());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,12 @@ public void testMessageIdImplCompareToTopicMessageId() {
MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567);
TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new BatchMessageIdImpl(123L, 345L, 566, 789));
TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new BatchMessageIdImpl(123L, 345L, 567, 789));
TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new BatchMessageIdImpl(messageIdImpl));
assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than");
assertTrue(messageIdImpl.compareTo(topicMessageId2) < 0, "Expected to be less than");
Expand All @@ -173,11 +170,9 @@ public void testBatchMessageIdImplCompareToTopicMessageId() {
BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1);
TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new MessageIdImpl(123L, 345L, 566));
TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl(
"test-topic-partition-0",
"test-topic",
new MessageIdImpl(123L, 345L, 567));
assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than");
assertTrue(messageIdImpl1.compareTo(topicMessageId2) > 0, "Expected to be greater than");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public class TopicMessageIdImplTest {
public void hashCodeTest() {
MessageIdImpl msgId1 = new MessageIdImpl(0, 0, 0);
MessageIdImpl msgId2 = new BatchMessageIdImpl(1, 1, 1, 1);
TopicMessageIdImpl topicMsgId1 = new TopicMessageIdImpl("topic-partition-1", "topic", msgId1);
TopicMessageIdImpl topic2MsgId1 = new TopicMessageIdImpl("topic2-partition-1", "topic2", msgId1);
TopicMessageIdImpl topicMsgId2 = new TopicMessageIdImpl("topic-partition-2", "topic", msgId2);
TopicMessageIdImpl topicMsgId1 = new TopicMessageIdImpl("topic-partition-1", msgId1);
TopicMessageIdImpl topic2MsgId1 = new TopicMessageIdImpl("topic2-partition-1", msgId1);
TopicMessageIdImpl topicMsgId2 = new TopicMessageIdImpl("topic-partition-2", msgId2);

assertEquals(topicMsgId1.hashCode(), topicMsgId1.hashCode());
assertEquals(topic2MsgId1.hashCode(), topic2MsgId1.hashCode());
Expand All @@ -43,9 +43,9 @@ public void hashCodeTest() {
public void equalsTest() {
MessageIdImpl msgId1 = new MessageIdImpl(0, 0, 0);
MessageIdImpl msgId2 = new BatchMessageIdImpl(1, 1, 1, 1);
TopicMessageIdImpl topicMsgId1 = new TopicMessageIdImpl("topic-partition-1", "topic", msgId1);
TopicMessageIdImpl topic2MsgId1 = new TopicMessageIdImpl("topic2-partition-1", "topic2", msgId1);
TopicMessageIdImpl topicMsgId2 = new TopicMessageIdImpl("topic-partition-2", "topic", msgId2);
TopicMessageIdImpl topicMsgId1 = new TopicMessageIdImpl("topic-partition-1", msgId1);
TopicMessageIdImpl topic2MsgId1 = new TopicMessageIdImpl("topic2-partition-1", msgId1);
TopicMessageIdImpl topicMsgId2 = new TopicMessageIdImpl("topic-partition-2", msgId2);

assertEquals(topicMsgId1, topicMsgId1);
assertEquals(topicMsgId1, topic2MsgId1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Additional helper classes to the pulsar-client-api module.
*/
package org.apache.pulsar.client.api;
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,7 @@ public void testGetMessageSequenceRefForBatchMessage() throws Exception {
assertNull(ref);

ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
new TopicMessageIdImpl("topic-0", "topic", new MessageIdImpl(ledgerId, entryId, 0))
new TopicMessageIdImpl("topic-0", new MessageIdImpl(ledgerId, entryId, 0))
);
assertNull(ref);

Expand All @@ -1584,7 +1584,7 @@ public void testGetMessageSequenceRefForBatchMessage() throws Exception {
assertEquals(ref.getBatchIdx(), batchIdx);

ref = KafkaConnectSink.getMessageSequenceRefForBatchMessage(
new TopicMessageIdImpl("topic-0", "topic", new BatchMessageIdImpl(ledgerId, entryId, 0, batchIdx))
new TopicMessageIdImpl("topic-0", new BatchMessageIdImpl(ledgerId, entryId, 0, batchIdx))
);

assertEquals(ref.getLedgerId(), ledgerId);
Expand Down
Loading