This repository was archived by the owner on Jan 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 142
Return NOT_LEADER_FOR_PARTITION when managed ledger's state is wrong #1054
Merged
BewareMyPower
merged 1 commit into
streamnative:master
from
BewareMyPower:bewaremypower/handle-publish-error-gracefully
Feb 9, 2022
Merged
Return NOT_LEADER_FOR_PARTITION when managed ledger's state is wrong #1054
BewareMyPower
merged 1 commit into
streamnative:master
from
BewareMyPower:bewaremypower/handle-publish-error-gracefully
Feb 9, 2022
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cf63b6e to
33f2b54
Compare
Demogorgon314
approved these changes
Feb 9, 2022
BewareMyPower
added a commit
that referenced
this pull request
Feb 9, 2022
…1054) Fixes #1050 ### Motivation Currently `ReplicaManager#getPartitionLog` transfers exceptions thrown from `PartitionLog#appendRecords` to Kafka's error code by calling `Errors.forException` and treats the unknown error as `KAFKA_STORAGE_ERROR`. However, the exception is thrown from broker side: - Control records are sent by `ManagedLedger#asyncAddEntry`, the `ManagedLedgerException` will be thrown if failed. - Normal records are sent by `PersistentTopic#publishMessages`, the `BrokerServiceException` that wraps `ManagedLedgerException` will be thrown if failed. After a bundle unload event, some entries might still be cached in the managed ledger. In this case, all entries will be failed with `ManagedLedgerAlreadyClosedException`. However, if Kafka producer received an error other than `InvalidMetadataException`, it won't update the metadata and send records to the same broker and failed again. See https://github.com/apache/kafka/blob/7c2d6724130f8251aa255917b74d9c3ef1fe67f7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L636-L647 We should return the error code associated with `InvalidMetadataException` to notify Kafka producer it's time to find the new leader of the partition. ### Modifications - When the managed ledger's state is wrong, complete the future with `NotLeaderForPartitionException`, which implements `InvalidMetadataException`. - Add a test `testIllegalManagedLedger` to verify this error code is received when the managed ledger is closed. (cherry picked from commit 3d48a3c)
BewareMyPower
added a commit
that referenced
this pull request
Feb 9, 2022
…1054) Fixes #1050 ### Motivation Currently `ReplicaManager#getPartitionLog` transfers exceptions thrown from `PartitionLog#appendRecords` to Kafka's error code by calling `Errors.forException` and treats the unknown error as `KAFKA_STORAGE_ERROR`. However, the exception is thrown from broker side: - Control records are sent by `ManagedLedger#asyncAddEntry`, the `ManagedLedgerException` will be thrown if failed. - Normal records are sent by `PersistentTopic#publishMessages`, the `BrokerServiceException` that wraps `ManagedLedgerException` will be thrown if failed. After a bundle unload event, some entries might still be cached in the managed ledger. In this case, all entries will be failed with `ManagedLedgerAlreadyClosedException`. However, if Kafka producer received an error other than `InvalidMetadataException`, it won't update the metadata and send records to the same broker and failed again. See https://github.com/apache/kafka/blob/7c2d6724130f8251aa255917b74d9c3ef1fe67f7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L636-L647 We should return the error code associated with `InvalidMetadataException` to notify Kafka producer it's time to find the new leader of the partition. ### Modifications - When the managed ledger's state is wrong, complete the future with `NotLeaderForPartitionException`, which implements `InvalidMetadataException`. - Add a test `testIllegalManagedLedger` to verify this error code is received when the managed ledger is closed.
BewareMyPower
added a commit
that referenced
this pull request
Feb 9, 2022
…1054) Fixes #1050 ### Motivation Currently `ReplicaManager#getPartitionLog` transfers exceptions thrown from `PartitionLog#appendRecords` to Kafka's error code by calling `Errors.forException` and treats the unknown error as `KAFKA_STORAGE_ERROR`. However, the exception is thrown from broker side: - Control records are sent by `ManagedLedger#asyncAddEntry`, the `ManagedLedgerException` will be thrown if failed. - Normal records are sent by `PersistentTopic#publishMessages`, the `BrokerServiceException` that wraps `ManagedLedgerException` will be thrown if failed. After a bundle unload event, some entries might still be cached in the managed ledger. In this case, all entries will be failed with `ManagedLedgerAlreadyClosedException`. However, if Kafka producer received an error other than `InvalidMetadataException`, it won't update the metadata and send records to the same broker and failed again. See https://github.com/apache/kafka/blob/7c2d6724130f8251aa255917b74d9c3ef1fe67f7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L636-L647 We should return the error code associated with `InvalidMetadataException` to notify Kafka producer it's time to find the new leader of the partition. ### Modifications - When the managed ledger's state is wrong, complete the future with `NotLeaderForPartitionException`, which implements `InvalidMetadataException`. - Add a test `testIllegalManagedLedger` to verify this error code is received when the managed ledger is closed. (cherry picked from commit 3d48a3c)
eolivelli
pushed a commit
to eolivelli/kop
that referenced
this pull request
Feb 24, 2022
…treamnative#1054) Fixes streamnative#1050 ### Motivation Currently `ReplicaManager#getPartitionLog` transfers exceptions thrown from `PartitionLog#appendRecords` to Kafka's error code by calling `Errors.forException` and treats the unknown error as `KAFKA_STORAGE_ERROR`. However, the exception is thrown from broker side: - Control records are sent by `ManagedLedger#asyncAddEntry`, the `ManagedLedgerException` will be thrown if failed. - Normal records are sent by `PersistentTopic#publishMessages`, the `BrokerServiceException` that wraps `ManagedLedgerException` will be thrown if failed. After a bundle unload event, some entries might still be cached in the managed ledger. In this case, all entries will be failed with `ManagedLedgerAlreadyClosedException`. However, if Kafka producer received an error other than `InvalidMetadataException`, it won't update the metadata and send records to the same broker and failed again. See https://github.com/apache/kafka/blob/7c2d6724130f8251aa255917b74d9c3ef1fe67f7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L636-L647 We should return the error code associated with `InvalidMetadataException` to notify Kafka producer it's time to find the new leader of the partition. ### Modifications - When the managed ledger's state is wrong, complete the future with `NotLeaderForPartitionException`, which implements `InvalidMetadataException`. - Add a test `testIllegalManagedLedger` to verify this error code is received when the managed ledger is closed. (cherry picked from commit 3d48a3c)
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR is an important fix, and it's first included in KoP 2.9.2.2, 2.8.2.7 (not released now) and 2.10.0.1 (not released now)
Fixes #1050
Motivation
Currently
ReplicaManager#getPartitionLogtransfers exceptions thrown fromPartitionLog#appendRecordsto Kafka's error code by callingErrors.forExceptionand treats the unknown error asKAFKA_STORAGE_ERROR. However, the exception is thrown from broker side:ManagedLedger#asyncAddEntry, theManagedLedgerExceptionwill be thrown if failed.PersistentTopic#publishMessages, theBrokerServiceExceptionthat wrapsManagedLedgerExceptionwill be thrown if failed.After a bundle unload event, some entries might still be cached in the managed ledger. In this case, all entries will be failed with
ManagedLedgerAlreadyClosedException. However, if Kafka producer received an error other thanInvalidMetadataException, it won't update the metadata and send records to the same broker and failed again. See https://github.com/apache/kafka/blob/7c2d6724130f8251aa255917b74d9c3ef1fe67f7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L636-L647We should return the error code associated with
InvalidMetadataExceptionto notify Kafka producer it's time to find the new leader of the partition.Modifications
NotLeaderForPartitionException, which implementsInvalidMetadataException.testIllegalManagedLedgerto verify this error code is received when the managed ledger is closed.