Skip to content

Conversation

@eolivelli
Copy link
Contributor

Motivation

Server side filters may be very heavy weight, and currently they are running (very often) in the same thread that handles writes and also all the subscriptions.

We want to improve performances in presence of heavy server side filters:

  • subscriptions should not impact each other
  • dispatching should not impact writes

Modifications

Add a new flag, dispatcherDispatchMessagesInSubscriptionThread, enabled by default, to dispatch messages to consumers in a Shared subscription using a separate thread than the BK executor thread.

With this patch we choose a thread depending on the topic + subscription name and we run the actual dispatching of messages (sendMessagesToConsumers) in such thread.

Verifying this change

I did some manual testing with a dummy filter that slows down (Thread.sleep(100)) consumption and even with a simply workload (50k msg/s) I have verified that:

  • with this patch the noise over other subscription is smaller (without this change a second, non filtered subscription worked at 10% speed)
  • the impact on writes is smaller

@eolivelli eolivelli self-assigned this Jul 14, 2022
@eolivelli eolivelli added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Jul 14, 2022
@eolivelli eolivelli added this to the 2.11.0 milestone Jul 14, 2022
@apache apache deleted a comment from github-actions bot Jul 14, 2022
@eolivelli eolivelli changed the title Shared subscription: run filters in a separate (per-subscription) thread (dispatcherDispatchMessagesInSubscriptionThread) [enh] Broker - Shared subscription: run filters in a separate (per-subscription) thread (dispatcherDispatchMessagesInSubscriptionThread) Jul 14, 2022
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

dispatcherMaxReadBatchSize=100

# Dispatch messages and execute broker side filters in a per-subscription thread
dispatcherDispatchMessagesInSubscriptionThread=true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep it false by default? Most of the users don't have broker-side filters. Enabled by default will introduce more thread switching.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am adding this flag only to rollback in case of problem with particular workloads.

We are doing performance testing, I believe that this new behaviour is generally better, not only for the case of server side filters, but especially in these cases:

  • multiple subscriptions on the same topic
  • concurrent produce and consume

}

protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to have a check and only apply the lock if dispatcherDispatchMessagesInSubscriptionThread is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendMessagesToConsumers is usually called by readEntriesComplete, that is already synchronized

public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {

the JVM is smart enough to understand that there is no need for extra work for synchronized

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, I think you could add a synchronized (this) block around the runnable's call to this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no difference.
I think that this is not a problem.

@codelipenghui do you agree ?

Comment on lines 528 to 530
topic.getBrokerService().getTopicOrderedExecutor()
.executeOrdered(name,
safeRun(() -> sendMessagesToConsumers(readType, entries)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have a pinned executor instead of selecting the executor for each operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an interesting point.
Let me investigate.
I am not sure about the threadpool that we can use here as "pinned", I would like to pick a thread from "TopicOrderedExecutor"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui I have pinned the Subscription to a thread.
I think that this is what you suggested
thanks

// sendMessagesToConsumers is responsible for running broker-side filters
// that may be quite expensive
topic.getBrokerService().getTopicOrderedExecutor()
.executeOrdered(name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. do we care about potential (unchecked) RejectedExecutionException here?
  2. we may need to prevent this code from submitting unlimited amount of runnables into that executor / handle long backlog.

Copy link
Contributor Author

@eolivelli eolivelli Jul 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we care about potential (unchecked) RejectedExecutionException here?

we usually don't deal with RejectedExecutionException in Dispatcher code.
in any case even if the execution is rejected the Dispatcher will read more entries for some other reasons

we may need to prevent this code from submitting unlimited amount of runnables into that executor / handle long backlog.

This shouldn't be a problem because there is a "natural" backpressure mechanism here, we trigger this code when new entries are read from storage (BK/Offloaders) but we request new reads from Storage after dispatching them to Consumers.
So usually there is only one of this tasks that is pending for the subscription.

@github-actions github-actions bot removed the doc-not-needed Your PR changes do not impact docs label Jul 15, 2022
@github-actions
Copy link

@eolivelli Please provide a correct documentation label for your PR.
Instructions see Pulsar Documentation Label Guide.

@eolivelli eolivelli force-pushed the impl/shared-thread-per-subscription branch from 019493d to 7d80f8b Compare July 15, 2022 09:13
@eolivelli eolivelli requested review from codelipenghui and dlg99 July 15, 2022 09:22
@eolivelli eolivelli force-pushed the impl/shared-thread-per-subscription branch from 709c3ad to 5c1cf84 Compare July 20, 2022 07:42
…ersistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
@eolivelli
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@codelipenghui codelipenghui modified the milestones: 2.11.0, 2.12.0 Jul 26, 2022
@codelipenghui codelipenghui merged commit abe5d15 into apache:master Jul 26, 2022
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jul 26, 2022
… (per-subscription) thread (dispatcherDispatchMessagesInSubscriptionThread) (apache#16603)"

Fixes apache#16802

This reverts commit abe5d15.
BewareMyPower added a commit to BewareMyPower/pulsar that referenced this pull request Jul 27, 2022
### Motivation

See apache#16802 and the discussion in
apache#16803. Before reverting apache#16603,
disabling the `dispatcherDispatchMessagesInSubscriptionThread` option
first.

### Modifications

Change the default value of
`dispatcherDispatchMessagesInSubscriptionThread` to false.
@mattisonchao
Copy link
Member

mattisonchao commented Aug 4, 2022

This enhancement may fix the StackOverFlow exception.

2022-08-01T15:30:08,243+0000 [pulsar-41-1] WARN  org.apache.bookkeeper.mledger.impl.EntryCacheImpl - failed to read entries for 467924-467924:46
java.lang.StackOverflowError: null
	at jdk.internal.org.objectweb.asm.ClassWriter.newStringishItem(ClassWriter.java:1189) ~[?:?]
	at jdk.internal.org.objectweb.asm.MethodWriter.visitTypeInsn(MethodWriter.java:866) ~[?:?]
	at jdk.internal.org.objectweb.asm.MethodVisitor.visitTypeInsn(MethodVisitor.java:430) ~[?:?]
	at java.lang.invoke.TypeConvertingMethodAdapter.cast(TypeConvertingMethodAdapter.java:188) ~[?:?]
	at java.lang.invoke.TypeConvertingMethodAdapter.convertType(TypeConvertingMethodAdapter.java:241) ~[?:?]
	at java.lang.invoke.InnerClassLambdaMetafactory$ForwardingMethodGenerator.convertArgumentTypes(InnerClassLambdaMetafactory.java:499) ~[?:?]
	at java.lang.invoke.InnerClassLambdaMetafactory$ForwardingMethodGenerator.generate(InnerClassLambdaMetafactory.java:472) ~[?:?]
	at java.lang.invoke.InnerClassLambdaMetafactory.spinInnerClass(InnerClassLambdaMetafactory.java:294) ~[?:?]
	at java.lang.invoke.InnerClassLambdaMetafactory.buildCallSite(InnerClassLambdaMetafactory.java:195) ~[?:?]
	at java.lang.invoke.LambdaMetafactory.metafactory(LambdaMetafactory.java:329) ~[?:?]
	at java.lang.invoke.BootstrapMethodInvoker.invoke(BootstrapMethodInvoker.java:127) ~[?:?]
	at java.lang.invoke.CallSite.makeSite(CallSite.java:307) ~[?:?]
	at java.lang.invoke.MethodHandleNatives.linkCallSiteImpl(MethodHandleNatives.java:258) ~[?:?]
	at java.lang.invoke.MethodHandleNatives.linkCallSite(MethodHandleNatives.java:248) ~[?:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$12.readEntryFailed(ManagedCursorImpl.java:1398) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry(EntryCacheImpl.java:195) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1982) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1897) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.lambda$asyncReplayEntries$11(ManagedCursorImpl.java:1413) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:?]
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) ~[?:?]
	at java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2739) ~[?:?]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[?:?]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[?:?]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) ~[?:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1407) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntriesInOrder(PersistentDispatcherMultipleConsumers.java:405) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:255) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:636) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:513) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$12.readEntryComplete(ManagedCursorImpl.java:1388) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry0(EntryCacheImpl.java:209) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry(EntryCacheImpl.java:188) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1982) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1897) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.lambda$asyncReplayEntries$11(ManagedCursorImpl.java:1413) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:?]
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) ~[?:?]
	at java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2739) ~[?:?]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[?:?]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[?:?]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) ~[?:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1407) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntriesInOrder(PersistentDispatcherMultipleConsumers.java:405) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:255) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:636) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:513) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$12.readEntryComplete(ManagedCursorImpl.java:1388) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry0(EntryCacheImpl.java:209) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry(EntryCacheImpl.java:188) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1982) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1897) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.lambda$asyncReplayEntries$11(ManagedCursorImpl.java:1413) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:?]
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) ~[?:?]
	at java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2739) ~[?:?]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[?:?]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[?:?]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) ~[?:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1407) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntriesInOrder(PersistentDispatcherMultipleConsumers.java:405) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:255) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:636) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:513) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$12.readEntryComplete(ManagedCursorImpl.java:1388) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry0(EntryCacheImpl.java:209) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.asyncReadEntry(EntryCacheImpl.java:188) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1982) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:1897) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.lambda$asyncReplayEntries$11(ManagedCursorImpl.java:1413) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:?]
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) ~[?:?]
	at java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2739) ~[?:?]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[?:?]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[?:?]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) ~[?:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1407) ~[pulsar-managed-ledger-2.10.1.1.jar:2.10.1.1]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntriesInOrder(PersistentDispatcherMultipleConsumers.java:405) ~[pulsar-pulsar-broker-2.10.1.1.jar:2.10.1.1]

@mattisonchao
Copy link
Member

And this PR has bug, the fix PR at here: #16812

@mattisonchao
Copy link
Member

Hi, @gaoran10
I don't think cherry-picking the feature with a new configuration to another old branch is a good idea.

@github-actions
Copy link

github-actions bot commented Aug 8, 2022

@eolivelli Please provide a correct documentation label for your PR.
Instructions see Pulsar Documentation Label Guide.

@momo-jun
Copy link
Contributor

@eolivelli just wanted to check in - any docs required to be updated for this improvement?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants