diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java index a8c3a6b2ef284..0ae1a1fb5b5bf 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.pulsar.client.impl.MessageIdImpl.convertToMessageIdImpl; /** The helper class for Pulsar's message id. */ @Internal @@ -42,11 +43,14 @@ private MessageIdUtils() { public static MessageId nextMessageId(MessageId messageId) { MessageIdImpl idImpl = unwrapMessageId(messageId); - if (idImpl.getEntryId() < 0) { - return newMessageId(idImpl.getLedgerId(), 0, idImpl.getPartitionIndex()); + long ledgerId = idImpl.getLedgerId(); + long entryId = idImpl.getEntryId(); + int partitionIndex = idImpl.getPartitionIndex(); + + if (entryId < 0) { + return new MessageIdImpl(ledgerId, 0, partitionIndex); } else { - return newMessageId( - idImpl.getLedgerId(), idImpl.getEntryId() + 1, idImpl.getPartitionIndex()); + return new MessageIdImpl(ledgerId, entryId + 1, partitionIndex); } } @@ -55,17 +59,15 @@ public static MessageId nextMessageId(MessageId messageId) { * message id. We don't support the batch message for its low performance now. */ public static MessageIdImpl unwrapMessageId(MessageId messageId) { - MessageIdImpl idImpl = MessageIdImpl.convertToMessageIdImpl(messageId); + MessageIdImpl idImpl = convertToMessageIdImpl(messageId); if (idImpl instanceof BatchMessageIdImpl) { int batchSize = ((BatchMessageIdImpl) idImpl).getBatchSize(); - checkArgument(batchSize == 1, "We only support normal message id currently."); + checkArgument( + batchSize <= 1, + "We only support normal message id currently. This batch size is %d", + batchSize); } return idImpl; } - - /** Hide the message id implementation. */ - public static MessageId newMessageId(long ledgerId, long entryId, int partitionIndex) { - return new MessageIdImpl(ledgerId, entryId, partitionIndex); - } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java index 1f26e834749fa..30ddb087c5270 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java @@ -23,7 +23,6 @@ import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.impl.MessageIdImpl; import java.util.Objects; @@ -51,11 +50,12 @@ public class MessageIdStartCursor implements StartCursor { * MessageId#latest}. */ public MessageIdStartCursor(MessageId messageId, boolean inclusive) { - MessageIdImpl idImpl = unwrapMessageId(messageId); - if (MessageId.earliest.equals(idImpl) || MessageId.latest.equals(idImpl) || inclusive) { - this.messageId = idImpl; + if (MessageId.earliest.equals(messageId) + || MessageId.latest.equals(messageId) + || inclusive) { + this.messageId = unwrapMessageId(messageId); } else { - this.messageId = nextMessageId(idImpl); + this.messageId = nextMessageId(messageId); } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java index f6ff913199fd1..2632a92d34247 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java @@ -22,7 +22,6 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.impl.MessageIdImpl; import java.util.Objects; @@ -43,13 +42,10 @@ public class MessageIdStopCursor implements StopCursor { private final boolean inclusive; public MessageIdStopCursor(MessageId messageId, boolean inclusive) { - MessageIdImpl idImpl = unwrapMessageId(messageId); - checkArgument(!earliest.equals(idImpl), "MessageId.earliest is not supported."); - checkArgument( - !latest.equals(idImpl), - "MessageId.latest is not supported, use LatestMessageStopCursor instead."); + checkArgument(!earliest.equals(messageId), "MessageId.earliest is not supported."); + checkArgument(!latest.equals(messageId), "Use LatestMessageStopCursor instead."); - this.messageId = idImpl; + this.messageId = unwrapMessageId(messageId); this.inclusive = inclusive; }