Skip to content

Conversation

@gaozhangmin
Copy link
Contributor

@gaozhangmin gaozhangmin commented Dec 9, 2021

Motivation

See #13170 .
Also, Protocols handlers like KOP should make changes to add and decrease pendingBytes from BrokerService.

Modifications

Add a LongAdder totalPendingBytes in BrokerService to record total pending message size.

Documentation

Check the box below and label this PR (if you have committer privilege).

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@github-actions
Copy link

github-actions bot commented Dec 9, 2021

@gaozhangmin:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@github-actions
Copy link

github-actions bot commented Dec 9, 2021

@gaozhangmin:Thanks for providing doc info!

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Dec 9, 2021
@wangjialing218
Copy link
Contributor

There could have a case when we have both pulsar and kafka producer send msgs:

  1. totalPendingBytes exceed maxMessagePublishBufferSizeInMB and pulsar broker and KoP both stop read from producer.
  2. Pulsar have completed all it's pending msgs, but totalPendingBytes still exceed resumeThresholdPendingBytes since KoP may hold lots of pending buffer, pulsar will not start to read from pulsar producer.
  3. KoP completed some pending msgs and totalPendingBytes become lower than resumeThresholdPendingBytes, KoP will start to read from kafka producer. But pulsar broker may still not start to read from pulsar producer because completedSendOperation will never called since all pulsar msgs are sent completed.

Could you please consider how to avoid this happen

@gaozhangmin
Copy link
Contributor Author

Why the previous design had no this problem? I'm little confused. @wangjialing218

@gaozhangmin
Copy link
Contributor Author

I got the reason, I will try fix this case.

@wangjialing218
Copy link
Contributor

As previous design,totalPendingBytes is only used by broker to record the msg bytes received from pulsar producer, so broker could resume reading from producer when totalPendingBytes become lower than resumeThresholdPendingBytes.
This PR will make totalPendingBytes shared with KoP, so this problem could happen.

@wangjialing218
Copy link
Contributor

We do not have to do LongAdder.sum() each time when add or decrease pending bytes, since it's a little heavy CPU work.
We could do sum() in another scheduler task such as 50ms each time, and then inform ServerCnx in broker and protocol handlers to stop or resume reading by listener.

@gaozhangmin gaozhangmin force-pushed the maxMessagePublishBufferSizeInMB-protocolsHandler branch 3 times, most recently from 37a6eb3 to e2a9284 Compare December 14, 2021 12:57
@gaozhangmin
Copy link
Contributor Author

@gaozhangmin gaozhangmin force-pushed the maxMessagePublishBufferSizeInMB-protocolsHandler branch 2 times, most recently from 0dbecd8 to 6b32a39 Compare December 15, 2021 11:47
Copy link
Contributor

@Jason918 Jason918 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better that we can describe more details about the downside of this solution.

private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
private Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors;

private final CopyOnWriteArrayList<Consumer<PublishBufferEvent>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer a better data structure than CopyOnWriteArrayList if we have a lot of producers.

@gaozhangmin gaozhangmin force-pushed the maxMessagePublishBufferSizeInMB-protocolsHandler branch from 6b32a39 to cacfda0 Compare December 16, 2021 08:06
@gaozhangmin
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@gaozhangmin
Copy link
Contributor Author

/pulsarbot run-failure-checks

@hangc0276
Copy link
Contributor

@gaozhangmin Thanks for your contribution.
Before Pulsar 2.8.0, the broker publish throttle policy is similar to your implementation, it has a shortcoming of suddenly exceed the limit and will lead to broker direct memory OOM. After Pulsar 2.8.0, we introduced the throttle policy by io thread, it can limit the throughput of pulsar protocol, but out of control of other protocols, like KOP.

IMO, it's better to throttle in KOP side, do you have any other ideas? @BewareMyPower

@BewareMyPower
Copy link
Contributor

@hangc0276 For now I didn't have much context. AFAIK, this PR is an implementation of idea from #12959. Could you answer hang's question? @wangjialing218

@wangjialing218
Copy link
Contributor

wangjialing218 commented Dec 20, 2021

@hangc0276 You may mentioned #7406, which introduced the throttle policy by io thread.
This PR make more effective use of memory, and could also improve the points mentioned in #7406, except this:
-- There is a delay for detecting the memory over-commit, due to the background task running periodically
As idle connection will not call LoggAdder.add, we only need to count up the bytes from active connection in LoggAdder.sum. We could do the detecting more frequently to avoid OOM, such as 10ms per time.

IMO, it's better to throttle in KOP side, do you have any other ideas?

For this point, we could make sure broker and KoP share the IO thread pool, and do same throttle policy by io thread in KoP side. @BewareMyPower
I notice there is a configuration useSeparateThreadPoolForProtocolHandlers with default value true. I wonder if there is any disadvantage when set this configuration false.

@BewareMyPower
Copy link
Contributor

I notice there is a configuration useSeparateThreadPoolForProtocolHandlers with default value true. I wonder if there is any disadvantage when set this configuration false.

The false configuration is the previous behavior that the potential deadlocks at KoP side might also block broker's IO thread.

@codelipenghui codelipenghui modified the milestones: 2.10.0, 2.11.0 Jan 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/broker doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants