Skip to content

Conversation

@eolivelli
Copy link
Contributor

@eolivelli eolivelli commented Oct 28, 2022

Motivation

Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread (that is used in case of dispatcherDispatchMessagesInSubscriptionThread set to true, that is the default value) The limit of the amount of memory retained due to reads MUST take into account also the entries coming from the Cache.

When dispatcherDispatchMessagesInSubscriptionThread is false (the behaviour of Pulsar 2.10) there is some kind of natural (but still unpredictable!!) back pressure mechanism because the thread that receives the entries from BK of the cache dispatches immediately and synchronously the entries to the consumer and releases them

Modifications

  • Add a new component (InflightReadsLimiter) that keeps track of the overall amount of memory retained due to inflight reads.
  • Add a new configuration entry managedLedgerMaxReadsInFlightSizeInMB
  • The feature is disabled by default
  • Add new metrics to track the status of the broker

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: eolivelli#19

Enrico Olivelli and others added 4 commits October 31, 2022 15:53
…m storage/cache to the write to the consumer channel)

Motivation:

Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread (that is used in case of dispatcherDispatchMessagesInSubscriptionThread set to true, that is the default value)
The limit of the amount of memory retained due to reads MUST take into account also the entries coming from the Cache.

When dispatcherDispatchMessagesInSubscriptionThread is false (the behaviour of Pulsar 2.10) there is some kind of natural (but still unpredictable!!) back pressure mechanism because the thread that receives the entries from BK of the cache dispatches immediately and synchronously the entries to the consumer and releases them

Modifications:

- Add a new component (InflightReadsLimiter) that keeps track of the overall amount of memory retained due to inflight reads.
- Add a new configuration entry managedLedgerMaxReadsInFlightSizeInMB
- The feature is disabled by default
- Add new metrics to track the values
@eolivelli eolivelli force-pushed the impl/max-inflight-requests branch from 82ab48f to e1bdef6 Compare October 31, 2022 14:53
@codelipenghui
Copy link
Contributor

Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread

@eolivelli Does it only happen when the broker has many subscriptions? For one subscription, we always trigger the new read entries operation after sending messages to consumers. We only have a single active read entries operation at a time. Is it able to reproduce?

@eolivelli
Copy link
Contributor Author

@codelipenghui you are correct, the problems arise when you have a broker with many subscriptions on the same topic (and many topics).
There is no broker level guardrail at the moment.
With this patch the memory used to handle outbound traffic is capped and the limit is independent from the number of active subscriptions on the broker

@codecov-commenter
Copy link

codecov-commenter commented Nov 10, 2022

Codecov Report

Merging #18245 (9cc5757) into master (b31c5a6) will increase coverage by 0.98%.
The diff coverage is 45.65%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #18245      +/-   ##
============================================
+ Coverage     46.98%   47.97%   +0.98%     
+ Complexity    10343     9370     -973     
============================================
  Files           692      613      -79     
  Lines         67766    58415    -9351     
  Branches       7259     6087    -1172     
============================================
- Hits          31842    28026    -3816     
+ Misses        32344    27377    -4967     
+ Partials       3580     3012     -568     
Flag Coverage Δ
unittests 47.97% <45.65%> (+0.98%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...lsar/broker/service/RedeliveryTrackerDisabled.java 50.00% <ø> (ø)
...va/org/apache/pulsar/broker/service/ServerCnx.java 49.20% <ø> (+0.51%) ⬆️
...ersistentStreamingDispatcherMultipleConsumers.java 0.00% <0.00%> (ø)
.../java/org/apache/pulsar/client/impl/ClientCnx.java 30.16% <ø> (ø)
...a/org/apache/pulsar/client/impl/TableViewImpl.java 0.00% <0.00%> (ø)
...ar/client/impl/conf/ProducerConfigurationData.java 84.70% <ø> (-0.18%) ⬇️
...va/org/apache/pulsar/client/impl/ConsumerImpl.java 15.09% <12.50%> (+0.05%) ⬆️
...sistent/PersistentDispatcherMultipleConsumers.java 58.49% <66.66%> (+1.67%) ⬆️
...ache/pulsar/broker/ManagedLedgerClientFactory.java 62.16% <100.00%> (+1.05%) ⬆️
.../pulsar/broker/service/AbstractBaseDispatcher.java 60.81% <100.00%> (+2.89%) ⬆️
... and 134 more

Copy link
Contributor

@nicoloboschi nicoloboschi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great change.
I have some minor comments.

public void readEntriesComplete(List<Entry> entries, Object ctx) {
if (!entries.isEmpty()) {
long size = entries.get(0).getLength();
estimatedEntrySize = size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the avgMessagesPerEntry from the consumer?
The RangeEntryCacheImpl.java is shared across all the topics. If calculated at the topic level, we should be able to get a more precise estimated entry size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately here we are in "managed-ledger" module and in order to get a value from the Dispatcher/Consumer I would have to change many internal APIs.

If the size of entries in the topics is similar for all the entries that I think that this is a good estimate.

@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@eolivelli eolivelli force-pushed the impl/max-inflight-requests branch from a843dff to 8efc1a1 Compare November 11, 2022 07:45
private long entryId;
ByteBuf data;

private Runnable onDeallocate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not volatile, but it seems that deallocate() could be called in other threads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the pattern we use for every recyclable object, the same comment may apply to all the other fields.

@Jason918 Jason918 merged commit 6fec66b into apache:master Nov 11, 2022
@Technoboy- Technoboy- modified the milestones: 2.12.0, 2.11.0 Nov 14, 2022
Technoboy- pushed a commit that referenced this pull request Nov 14, 2022
…m storage/cache to the write to the consumer channel) (#18245)

* InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel)

Motivation:

Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread (that is used in case of dispatcherDispatchMessagesInSubscriptionThread set to true, that is the default value)
The limit of the amount of memory retained due to reads MUST take into account also the entries coming from the Cache.

When dispatcherDispatchMessagesInSubscriptionThread is false (the behaviour of Pulsar 2.10) there is some kind of natural (but still unpredictable!!) back pressure mechanism because the thread that receives the entries from BK of the cache dispatches immediately and synchronously the entries to the consumer and releases them

Modifications:

- Add a new component (InflightReadsLimiter) that keeps track of the overall amount of memory retained due to inflight reads.
- Add a new configuration entry managedLedgerMaxReadsInFlightSizeInMB
- The feature is disabled by default
- Add new metrics to track the values

* Change error message

* checkstyle

* Fix license

* remove duplicate method after cherry-pick

* Rename onDeallocate
liangyepianzhou pushed a commit that referenced this pull request Sep 15, 2023
…m storage/cache to the write to the consumer channel) (#18245)

* InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel)

Motivation:

Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread (that is used in case of dispatcherDispatchMessagesInSubscriptionThread set to true, that is the default value)
The limit of the amount of memory retained due to reads MUST take into account also the entries coming from the Cache.

When dispatcherDispatchMessagesInSubscriptionThread is false (the behaviour of Pulsar 2.10) there is some kind of natural (but still unpredictable!!) back pressure mechanism because the thread that receives the entries from BK of the cache dispatches immediately and synchronously the entries to the consumer and releases them

Modifications:

- Add a new component (InflightReadsLimiter) that keeps track of the overall amount of memory retained due to inflight reads.
- Add a new configuration entry managedLedgerMaxReadsInFlightSizeInMB
- The feature is disabled by default
- Add new metrics to track the values

* Change error message

* checkstyle

* Fix license

* remove duplicate method after cherry-pick

* Rename onDeallocate

(cherry picked from commit 6fec66b)
liangyepianzhou added a commit to streamnative/pulsar-archived that referenced this pull request Sep 28, 2023
* InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel) (apache#18245)

* InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel)

Motivation:

Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread (that is used in case of dispatcherDispatchMessagesInSubscriptionThread set to true, that is the default value)
The limit of the amount of memory retained due to reads MUST take into account also the entries coming from the Cache.

When dispatcherDispatchMessagesInSubscriptionThread is false (the behaviour of Pulsar 2.10) there is some kind of natural (but still unpredictable!!) back pressure mechanism because the thread that receives the entries from BK of the cache dispatches immediately and synchronously the entries to the consumer and releases them

Modifications:

- Add a new component (InflightReadsLimiter) that keeps track of the overall amount of memory retained due to inflight reads.
- Add a new configuration entry managedLedgerMaxReadsInFlightSizeInMB
- The feature is disabled by default
- Add new metrics to track the values

* Change error message

* checkstyle

* Fix license

* remove duplicate method after cherry-pick

* Rename onDeallocate

(cherry picked from commit 6fec66b)
(cherry picked from commit 47c98e5)

* [fix][broker][branch-2.10] limit the memory used by reads end-to-end

(cherry picked from commit eeb80e1)

* remove gpg plugin

* remove release profile

* remove release plugin

* Revert "remove release plugin"

This reverts commit 20522ea.

* Revert "remove release profile"

This reverts commit 64627fd.

* Revert "remove gpg plugin"

This reverts commit 8054d59.

---------

Co-authored-by: Enrico Olivelli <eolivelli@apache.org>
mattisonchao pushed a commit to streamnative/pulsar-archived that referenced this pull request Dec 28, 2023
* InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel) (apache#18245)

* InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel)

Motivation:

Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread (that is used in case of dispatcherDispatchMessagesInSubscriptionThread set to true, that is the default value)
The limit of the amount of memory retained due to reads MUST take into account also the entries coming from the Cache.

When dispatcherDispatchMessagesInSubscriptionThread is false (the behaviour of Pulsar 2.10) there is some kind of natural (but still unpredictable!!) back pressure mechanism because the thread that receives the entries from BK of the cache dispatches immediately and synchronously the entries to the consumer and releases them

Modifications:

- Add a new component (InflightReadsLimiter) that keeps track of the overall amount of memory retained due to inflight reads.
- Add a new configuration entry managedLedgerMaxReadsInFlightSizeInMB
- The feature is disabled by default
- Add new metrics to track the values

* Change error message

* checkstyle

* Fix license

* remove duplicate method after cherry-pick

* Rename onDeallocate

(cherry picked from commit 6fec66b)
(cherry picked from commit 47c98e5)

* [fix][broker][branch-2.10] limit the memory used by reads end-to-end

(cherry picked from commit eeb80e1)

* remove gpg plugin

* remove release profile

* remove release plugin

* Revert "remove release plugin"

This reverts commit 20522ea.

* Revert "remove release profile"

This reverts commit 64627fd.

* Revert "remove gpg plugin"

This reverts commit 8054d59.

---------

Co-authored-by: Enrico Olivelli <eolivelli@apache.org>
narmathaansp pushed a commit to sajithsebastian/pulsar that referenced this pull request Feb 14, 2024
…che#5920)

* InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel) (apache#18245)

* InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel)

Motivation:

Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread (that is used in case of dispatcherDispatchMessagesInSubscriptionThread set to true, that is the default value)
The limit of the amount of memory retained due to reads MUST take into account also the entries coming from the Cache.

When dispatcherDispatchMessagesInSubscriptionThread is false (the behaviour of Pulsar 2.10) there is some kind of natural (but still unpredictable!!) back pressure mechanism because the thread that receives the entries from BK of the cache dispatches immediately and synchronously the entries to the consumer and releases them

Modifications:

- Add a new component (InflightReadsLimiter) that keeps track of the overall amount of memory retained due to inflight reads.
- Add a new configuration entry managedLedgerMaxReadsInFlightSizeInMB
- The feature is disabled by default
- Add new metrics to track the values

* Change error message

* checkstyle

* Fix license

* remove duplicate method after cherry-pick

* Rename onDeallocate

(cherry picked from commit 6fec66b)
(cherry picked from commit 47c98e5)

* [fix][broker][branch-2.10] limit the memory used by reads end-to-end

(cherry picked from commit eeb80e1)

* remove gpg plugin

* remove release profile

* remove release plugin

* Revert "remove release plugin"

This reverts commit 20522ea.

* Revert "remove release profile"

This reverts commit 64627fd.

* Revert "remove gpg plugin"

This reverts commit 8054d59.

---------

Co-authored-by: Enrico Olivelli <eolivelli@apache.org>
coderzc pushed a commit to coderzc/pulsar that referenced this pull request Aug 14, 2024
…che#5920)

* InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel) (apache#18245)

* InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel)

Motivation:

Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread (that is used in case of dispatcherDispatchMessagesInSubscriptionThread set to true, that is the default value)
The limit of the amount of memory retained due to reads MUST take into account also the entries coming from the Cache.

When dispatcherDispatchMessagesInSubscriptionThread is false (the behaviour of Pulsar 2.10) there is some kind of natural (but still unpredictable!!) back pressure mechanism because the thread that receives the entries from BK of the cache dispatches immediately and synchronously the entries to the consumer and releases them

Modifications:

- Add a new component (InflightReadsLimiter) that keeps track of the overall amount of memory retained due to inflight reads.
- Add a new configuration entry managedLedgerMaxReadsInFlightSizeInMB
- The feature is disabled by default
- Add new metrics to track the values

* Change error message

* checkstyle

* Fix license

* remove duplicate method after cherry-pick

* Rename onDeallocate

(cherry picked from commit 6fec66b)
(cherry picked from commit 47c98e5)

* [fix][broker][branch-2.10] limit the memory used by reads end-to-end

(cherry picked from commit eeb80e1)

* remove gpg plugin

* remove release profile

* remove release plugin

* Revert "remove release plugin"

This reverts commit 20522ea.

* Revert "remove release profile"

This reverts commit 64627fd.

* Revert "remove gpg plugin"

This reverts commit 8054d59.

---------

Co-authored-by: Enrico Olivelli <eolivelli@apache.org>
nodece pushed a commit to nodece/pulsar that referenced this pull request Sep 10, 2024
…m storage/cache to the write to the consumer channel) (apache#18245)

* InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel)

Motivation:

Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread (that is used in case of dispatcherDispatchMessagesInSubscriptionThread set to true, that is the default value)
The limit of the amount of memory retained due to reads MUST take into account also the entries coming from the Cache.

When dispatcherDispatchMessagesInSubscriptionThread is false (the behaviour of Pulsar 2.10) there is some kind of natural (but still unpredictable!!) back pressure mechanism because the thread that receives the entries from BK of the cache dispatches immediately and synchronously the entries to the consumer and releases them

Modifications:

- Add a new component (InflightReadsLimiter) that keeps track of the overall amount of memory retained due to inflight reads.
- Add a new configuration entry managedLedgerMaxReadsInFlightSizeInMB
- The feature is disabled by default
- Add new metrics to track the values

* Change error message

* checkstyle

* Fix license

* remove duplicate method after cherry-pick

* Rename onDeallocate

(cherry picked from commit 6fec66b)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants