-
Notifications
You must be signed in to change notification settings - Fork 142
[FEATURE] Implementation idempotence by reusing the Pulsar message deduplication #1007
Description
Motivation
Currently, the KoP use Kafka's implantation to check duplicate message, but it is hard to support MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION > 1.
However, we should reuse the Pulsar message duplication check in KoP by mapping baseSequence to sequenceId and lastSequence to highestSequenceId.
Goal
Support MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION > 1, and reuse Pulsar message deuplication.
API Changes
PartitionLog
/**
* Publish a non-control message, it will check the message deduplication.
*/
private CompletableFuture<Long> publishNormalMessage(final PersistentTopic persistentTopic,
final ByteBuf byteBuf,
final int numMessages,
final LogAppendInfo appendInfo) {
// ...
}
/**
* Publish a control message, this method will not check message deduplication.
* Because control messages don't have a sequence numbers.
*/
private CompletableFuture<Long> publishControlMessage(final PersistentTopic persistentTopic,
final ByteBuf byteBuf,
final int numMessages) {
// ...
}MessagePublishContext
The check message is duplicated or not we should check ledgerId and entryId is -1 or not. Then we return a Errors.DUPLICATE_SEQUENCE_NUMBER.
@Override
public void completed(Exception exception, long ledgerId, long entryId) {
// ...
if (exception != null) {
offsetFuture.completeExceptionally(exception);
} else {
// duplicated message
if (ledgerId == -1 && entryId == -1) {
offsetFuture.completeExceptionally(Errors.DUPLICATE_SEQUENCE_NUMBER.exception());
return;
}
// ...
offsetFuture.complete(baseOffset);
}
recycle();
}Implementation
If we want to use MessageDeduplication as the backend of the message deduplication check of KoP, we should pass the sequenceId and highestSequenceId. In Kafka, there use baseSequence and lastSequence as batch message sequence checks, in the same batch, the baseSequence and lastSequence should be sequential, so we can calculate before publishing the message to storage.
Pulsar is using producer name to persist sequenced, in KoP we want to follow the Kafka behavior. So we need to use a name role to build a producer name and pass it to MessageDeduplication.
Because Kafka will reuse PID when the transaction ID is the same but will increase the producer Enoch. So we need to ensure the producer name is not the same.
So the producer name role is ${PID_PREFIX}-${producerId}-${producerEpoch}.
Need to discuss
Kafka stores 5 sequence metadata called BatchMetadata in mutable.Queue[BatchMetadata], we can just use the last sequence number to check the sequence, there is no reason to store 5 sequence metadata, this is we should discuss.
Kafka side code link:
https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/log/ProducerStateManager.scala#L90
Compatibility
This change is compatible with previous versions.