-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Impl/port message metadata dispatcher prs #17698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
eolivelli
wants to merge
788
commits into
apache:master
from
datastax:impl/port-message-metadata-dispatcher-prs
Closed
Impl/port message metadata dispatcher prs #17698
eolivelli
wants to merge
788
commits into
apache:master
from
datastax:impl/port-message-metadata-dispatcher-prs
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apache#16338) (cherry picked from commit 4dd4ba5) (cherry picked from commit 45fd4ab)
…utoSubscribePatternConsumer (apache#16375) (cherry picked from commit bd2c3e4) (cherry picked from commit d6ff922)
…t.setup (apache#16383) (cherry picked from commit ea45f2f) (cherry picked from commit f5beb2b)
…lDeleteSchema (apache#16381) (cherry picked from commit aabd5d0) (cherry picked from commit 83be76f)
…oviders to the client (apache#16334) * [improve][java-client] Support passing existing scheduled executor providers to the client ### Motivation apache#16236 introduced a new scheduled executor but does not support passing the existing scheduled executor providers like apache#12037. * Apply comment. (cherry picked from commit f159f62) (cherry picked from commit 4562d25)
…th more than one IO thread (apache#16336) (cherry picked from commit bdda1eb) (cherry picked from commit 1649ef4)
* Exclude the Netty Reactive Stream from asynchttpclient --- *Motivation* We upgrade the Netty Reactive Stream in the PR apache#15990, but the asynchttpclient still uses it. We should use our project dependency to address the CVE. * Add the related dependency to the sub module (cherry picked from commit f9e89ed) (cherry picked from commit b5479ee)
…apache#16386) (cherry picked from commit 1f470a0) (cherry picked from commit 9ee69c0)
…ng result (apache#16285) Fixes apache#15976 ### Motivation Currently even if the producer, consumer, or reader failed to create, it would still be added to the producers or consumers in `Client`. `Client::close` first closes the internal producers and consumers, if the producers or consumers to close include failed producers or consumers, `Client::close` would return `ResultAlreadyClosed`. Even worse, closing a failed partitioned producer might stuck. It also makes the Python test `test_listener_name_client` flaky because `client.close()` will throw an exception if the underlying `Client::close` call in C++ client doesn't return `ResultOk`. ### Modifications - Only adding the created producer or consumer to the internal list of `Client` after the creation succeeded. - Add `ClientTest.testWrongListener` to verify when producer, consumer, reader failed to create, the internal producer list and consumer list are both empty. And `client.close()` would return `ResultOk`. (cherry picked from commit e23d312) (cherry picked from commit 88b1636)
) ### Motivation `ManagedCursorImpl` maintains a field `waitingReadOp` as the cache of the `OpReadEntry` created in `asyncReadEntriesOrWait` when there are no more entries to read. However, there are two cases that the created `OpReadEntry` are not recycled: 1. `asyncReadEntriesOrWait` is called repeatedly when `waitingReadOp` is not null and there are no more entries. The new created `OpReadEntry` cannot pass the CAS check but it's not recycled. 2. `cancelPendingReadRequest` is called. The `waitingReadOp` is just set with null and the previous reference is not recycled. ### Modifications For the two cases described above, recycle the `OpReadEntry` objects. ### Verifying this change Add `testOpReadEntryRecycle` to reproduce the corner cases and verify the count of `recycle()` calls. (cherry picked from commit 6cec62e) (cherry picked from commit 7c37e56)
…ache#16436) (cherry picked from commit 52f7f13) (cherry picked from commit d60928f)
* fix No such ledger exception * move currentLedgerEntries and currentLedgerSize * move ledgers.put to operationComplete * fix ledgers.put * use ledgersTmp to write zookkeeper,after sucess,update ldgers * update updateLedgersListAfterRollover * put lh to ledgersTmp * extend buildManagedLedgerInfo * getManagedLedgerInfo(newLedger) after get metadataMutex (cherry picked from commit 2e5fbbc) (cherry picked from commit 41c0cf3)
…ache#16514) (cherry picked from commit 22ae18b)
…tive mode failures
* Support shrinkage in TripleLongPriorityQueue * Add unit test * Remove unused code * style * Address comments (cherry picked from commit 88b10e0)
…to >1.5GB (apache#16490) * Fixed error when delayed messages trackers state grows to >1.5GB * Fixed spotbugs issues * Fixed javadocs * In the constructor, ensure all segments after the first one are of max size * Use poll to figure out where the test is stuck * Added SegmentedLongArray specific unit test * Removed unused imports (cherry picked from commit eca6c4a)
apache#16533) (cherry picked from commit e6eec02)
…E-2022-2047 (apache#16520) * [fix][security] Upgrade to Jetty to 9.4.48.v20220622 to get rid of CVE-2022-2047 * suppress CVE-2022-2191 - false positive * Revert "suppress CVE-2022-2191 - false positive" This reverts commit ab4601f. (cherry picked from commit 6872ac3) (cherry picked from commit eb5e6af)
…re entries (apache#16400) ### Motivation Related issue: streamnative/kop#1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Throw a `ConcurrentWaitCallbackException` instead of a raw `ManagedLedgerException` when there are more wait callbacks. Then check this exception type and skip the following steps in `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`. (cherry picked from commit 5ec4e3d)
…ead (dispatcherDispatchMessagesInSubscriptionThread)
…MessageTimeout (only release branches) (apache#16570) ### Motivation `TransactionEndToEndTest#testSendTxnMessageTimeout` fails on releases branch after apache#16519 has been cherry-picked. ``` java.lang.AssertionError: expected [true] but found [false] at org.testng.Assert.fail(Assert.java:99) at org.testng.Assert.failNotEquals(Assert.java:1037) at org.testng.Assert.assertTrue(Assert.java:45) at org.testng.Assert.assertTrue(Assert.java:55) at org.apache.pulsar.client.impl.TransactionEndToEndTest.testSendTxnMessageTimeout(TransactionEndToEndTest.java:1099) ``` The reason is that the mock setup must be slightly different since the `ProducerImpl` is not exactly the same between master and branch-2.10. (cherry picked from commit c83bede)
…che#16448) (cherry picked from commit ac41cfe)
…figurable (dispatcherFilterRescheduledMessageDelay) (apache#16602) (cherry picked from commit 807a283)
…pache#17045) (cherry picked from commit e0ff3d7) (cherry picked from commit eece41b)
…sgs (apache#17093) (cherry picked from commit 4b98b23) (cherry picked from commit 102735f)
…pic (apache#17180) (cherry picked from commit 1faf497) (cherry picked from commit 4c6417a)
(cherry picked from commit 45c37f3)
…ception: subscription already exists for topic (apache#17488) (cherry picked from commit 12c75bd)
…ytes schema (apache#17523) (cherry picked from commit 2ed5614) (cherry picked from commit d189846)
…sOpenCacheSetEnabled=true (apache#17465) (cherry picked from commit 09edcce) (cherry picked from commit a88ac65)
(cherry picked from commit 0787755)
…s` greater than `maxUnackedMessages` (apache#17483) (cherry picked from commit d7943a5) (cherry picked from commit 011e121)
apache#17562) (cherry picked from commit 5aa1f11) (cherry picked from commit c844c20)
…sConsumerImpl instead (apache#16969) ### Motivation The code of the C++ client is still relatively old, after apache#1365, the java client used `MultiTopicConsumerImpl` instead of `PartitionedConsumerImpl`. ### Modifications - Delete PartitionedConsumerImpl, use MultiTopicsConsumerImpl instead. - For MultiTopicConsumerImpl, support seek message and topic partition listener feature. (cherry picked from commit 3a3ae23) (cherry picked from commit 23da64b)
Fixes apache#17392 ### Motivation All timers in `ProducerImpl` are `std::shared_ptr` objects that can be reset with `nullptr` in `ProducerImpl::cancelTimers`. It could lead to null pointer access in some cases. See apache#17392 (comment) for the analysis. Generally it's not necessary to hold a nullable pointer to the timer. However, to resolve the cyclic reference issue, apache#5246 reset the shared pointer to reduce the reference count manually. It's not a good solution because we have to perform null check for timers everywhere. The null check still has some race condition issue like: Thread 1: ```c++ if (timer) { // [1] timer is not nullptr timer->async_wait(/* ... */); // [3] timer is null now, see [2] below } ``` Thread 2: ```c++ timer.reset(); // [2] ``` The best solution is to capture `weak_ptr` in timer's callback and call `lock()` to check if the referenced object is still valid. ### Modifications - Change the type of `sendTimer_` and `batchTimer_` to `deadline_timer`, not a `shared_ptr`. - Use `PeriodicTask` instead of the `deadline_timer` for token refresh. - Migrate `weak_from_this()` method from C++17 and capture `weak_from_this()` instead of `shared_from_this()` in callbacks. ### Verifying this change Run the `testResendViaSendCallback` for many times and we can see it won't fail after this patch. ```bash ./tests/main --gtest_filter='BasicEndToEndTest.testResendViaSendCallback' --gtest_repeat=30 ``` (cherry picked from commit 7d6f394) (cherry picked from commit 25b691b)
…ed (apache#17543) (cherry picked from commit 99b52eb) (cherry picked from commit 5cc6eeb)
…broker (apache#17658) (cherry picked from commit 55656b6)
…th a relative path (apache#17675) * [improve][cli] Pulsar shell: allow to create a new config (--file) with a relative path * win * checkstlye (cherry picked from commit d7c09be)
… instead (apache#15967) apache#7266 introduced the `EntryWrapper` to store the `Entry` object and the associated `MessageMetadata` if it exists. However, the `getEntry` field of `EntryWrapper` is never used. There is no need to allocate memory for `EntryWrapper`, even if it's allocated from the recycler pool. - Calculate the remaining messages without creating `EntryWrapper` object, just iterate over the parsed message metadata list. - Pass an optional `MessageMetadata` array to `filterEntriesForConsumer` and add the JavaDocs for these two parameters. After that, Remove unused `EntryWrapper` and `updateEntryWrapperWithMetadata`. This PR uses functional programming style to make code more simple and clear. (cherry picked from commit c33b12d)
…peekMessageMetadata` (apache#15983) apache#15967 removed the `EntryWrapper`, which holds an `Entry` instance that is never used. Instead, after the refactoring, the `MessageMetadata` array is useless, see https://github.com/apache/pulsar/blob/298a573295f845e46f8a55cee366b6db63e997c2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L517-L519 Each `MessageMetadata` instance in the array is returned by `peekMessageMetadata`, whose returned value references a thread local object `Commands#LOCAL_MESSAGE_METADATA`. It brings a problem that if multiple entries were read, all `MessageMetadata` elements in the array reference the same object. However, accidentally, the wrong invocation of `Optional#orElse` saves it. See https://github.com/apache/pulsar/blob/298a573295f845e46f8a55cee366b6db63e997c2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L133-L134 Each time `peekMessageMetadata` is called, the thread local message metadata will be updated. Unlike `orElseGet`, the expression in `orElse` is always called no matter if the optional is empty. This behavior change increases the invocations count of `peekMessageMetadata` and the `metadataArray` cache became redundant. - Use `orElseGet` instead of `orElse` in `AbstractBaseDispatcher`. - Add a new static method `Commands#peekAndCopyMessageMetadata` that returns a `MessageMetadata` instance allocated from heap memory. - Call `peekAndCopyMessageMetadata` to cache all message metadata instances in `PersistentDispatcherMultipleConsumers`. It's hard to add tests. As I've explained before, apache#15967 only degrades the performance and doesn't affect the correctness. (cherry picked from commit 36690f5)
|
@eolivelli Please provide a correct documentation label for your PR. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
#15967
#15983
Do not squash