Skip to content

PIP-229: Add a common interface to get fields of the MessageIdData #18950

@BewareMyPower

Description

@BewareMyPower

Motivation

MessageIdData is defined in PulsarApi.proto:

message MessageIdData {
required uint64 ledgerId = 1;
required uint64 entryId = 2;
optional int32 partition = 3 [default = -1];
optional int32 batch_index = 4 [default = -1];
repeated int64 ack_set = 5;
optional int32 batch_size = 6;
// For the chunk message id, we need to specify the first chunk message id.
optional MessageIdData first_chunk_message_id = 7;
}

However, there is no common interface to get the specific field like ledger id and entry id. These details might be not much useful to application users, but they are important to developers of Pulsar and its ecosystems. Currently, we can only access the specific implementations directly. So there are a lot of unnecessary type assumptions checks like

if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
this.ledgerId = batchMessageId.getLedgerId();
this.entryId = batchMessageId.getEntryId();
this.batchIndex = batchMessageId.getBatchIndex();
this.partitionIndex = batchMessageId.partitionIndex;
} else if (messageId instanceof MessageIdImpl) {
MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
this.ledgerId = messageIdImpl.getLedgerId();
this.entryId = messageIdImpl.getEntryId();
this.partitionIndex = messageIdImpl.partitionIndex;
} else if (messageId instanceof TopicMessageIdImpl) {

And for TopicMessageIdImpl, we have to check the MessageId is a TopicMessageIdImpl and then call the getInnerMessageId() method:

checkArgument(messageId instanceof TopicMessageIdImpl);

MessageId innerId = topicMessageId.getInnerMessageId();

Another problem is that when users want to create a MessageId used in seek or acknowledge, they have to create instances of these implementations defined in pulsar-client module and the impl package. Any API change to these implementations could bring a breaking change. However, it should be allowed to make some modifications on the impl package, otherwise differing api and impl would be meaningless.

Goal

All the problems are all caused by the lack of abstraction of MessageIdData. There is no interface to represent the MessageIdData. This proposal aims at adding a common interface to access the fields of MessageIdData so that all usages of msgId instanceof XXXImpl could be simplified to something like (MessageIdAdv) msgId

API Changes

Introduce a new interface to represent a MessageIdData.

package org.apache.pulsar.client.api;

/**
 * The {@link MessageId} interface provided for advanced users.
 * <p>
 * It supports retrieving any field of {@link org.apache.pulsar.common.api.proto.MessageIdData}, which is generated
 * from `PulsarApi.proto`.
 */
public interface MessageIdAdv extends MessageId {

    long getLedgerId();

    long getEntryId();

    default int getPartition() {
        return -1;
    }

    default int getBatchIndex() {
        return -1;
    }

    default @Nullable BitSet getAckSet() {
        return null;
    }

    default int getBatchSize() {
        return 0;
    }

    default @Nullable MessageIdAdv getFirstChunkMessageId() {
        return null;
    }

    default boolean isBatch() {
        return getBatchIndex() >= 0 && getBatchSize() > 0;
    }
}

Since the aimed developers are Pulsar core developers, it's added in the pulsar-common module (PulsarApi.proto is also in this module), not the pulsar-client-api module.

To avoid naming conflicts with proto.MessageIdData, the interface name just adds the PulsarApi prefix to represent it's a representation of the message Id defined in PulsarApi.proto.

Only getLedgerId and getEntryId are required because when seeking to a specific position, the partition field is not needed. For example, if users want to create its own implementation for seek or acknowledge, they can create an implementation like:

    @AllArgsConstructor
    private static class NonBatchedMessageId implements MessageIdAdv {
        // For non-batched message id in a single topic, only ledger id and entry id are required

        private final long ledgerId;
        private final long entryId;

        @Override
        public byte[] toByteArray() {
            return new byte[0]; // dummy implementation
        }

        @Override
        public long getLedgerId() {
            return ledgerId;
        }

        @Override
        public long getEntryId() {
            return entryId;
        }
    }

Implementation

Most modifications are replacing the msgId instanceof XXXImpl with (MessageIdAdv) msgId. And some methods like TopicMessageIdImpl#getInnerMessageId will be marked as deprecated. They might need to be retained for one or more major releases for compatibility.

There is a point that since we use a BitSet to represent the ack set, which is a long array defined in PulsarApi.proto.

repeated int64 ack_set = 5;

We have to deprecated the BatchMessageAcker, which is just a wrapper of a BitSet and the batch size. After that, we no longer needs to acknowledge one message of the batch like:

if (msgId instanceof BatchMessageIdImpl) {
    ((BatchMessageIdImpl) msgId).getAcker().ackIndividual();
}

Use the getAckSet() API and clear the specific bits of the BitSet instead.

Alternatives

Add the getters to the MessageId directly. This idea was denied from the discussion here: https://lists.apache.org/thread/rdkqnkohbmkjjs61hvoqplhhngr0b0sd

Anything else?

No response

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions