Skip to content

Conversation

@andrekramer1
Copy link

Fixes #3302 (a request for consumer filtering in the Pulsar broker)

Motivation

At Software AG, we have prototyped property (key / value) based message filtering on consumer subscriptions in the Pulsar broker. This feature was also requested as Issue 3302 but the mitigation suggestion of using Pulsar Functions, which imply management and runtime overheads and an extra topic, or the alternative of publishing to multiple (sub)topics are not always adequate mitigations to lack of simple consumer subscription filtering.

In Pulsar, consumers receive all messages published on a topic, even if only interested in messages with certain properties. Always delivering all messages and having the consumer client application filter means all messages must be sent over the network, processed and acknowledged remotely. Other message brokers and messaging standards (such as Java's JMS message selectors) typically allow a filter to be specified on client subscription, allowing the message broker to filter on behalf of the client subscriber and deliver a subset of the messages over the wire.
In some cases, a separate topic can be used for each subscription filter, but if there are a very large number of possible filters then capacity as well as the management and processing overheads of topics (or any Pulsar Functions) are an issue. It therefore seems sensible to extend consumer subscriptions to perform an additional level of selection that is not directly driven by the publisher and does not require additional Functions and/or topics to be created for consumers.

Our proposal is to allow the publisher to "tag" messages with one or more additional message properties that can be used to selectively filter by. The consumer (on subscription) should be able to specify which tags to filter on, specified by special consumer “meta data” (using the meta data key value pairs in consumer properties).

The implementation is itself configurable, allowing both for no filtering and for alternative property-based schemes to be implemented and configured in Pulsar brokers, using a new configuration time "ConsumerFilter" dynamic factory mechanism. Our default "tag" based scheme, is explicit about which properties are used for filtering so as to more safely use existing message properties and consumer meta data. Two types of meta data tags are provided to allow both simple AND (all match) as well as OR (any match) filter expressions on message properties.

Our implemented design enables broker side "tag" based filtering of messages on selected message properties by allowing consumers to specify a subset of message properties as being “tags” to filter on, using existing consumer (meta data) property API (and underlying protocol). Message properties which have a key beginning with the string "tag" are matched by value against the consumer subscription properties that are specified on subscription with meta data string keys starting with either "anytag" or "alltag". What follows “tag”, “anytag”, “alltag” textually is immaterial except to allow multiple key / values to be enumerated (e.g. “tag1”, “tag2” etc).

An "anytag" matches successfully if any of the messages "tag" properties has the same property value as the consumer property value. This allows "or" matching.

e.g. Say a consumer subscribes with an "anytag" property with "anytag1" of "urgent" and another with key "anytag2" and value "compressed". Then a message with property "tag0" as key and "urgent" as value will match as would a message with "tagA" / "compressed".

To subscribe the consumer:
Consumer consumer = client.newConsumer(Schema.STRING)
.topic("sometopic")
.property("anytag0", "urgent") // A filtering meta data tag.
.property("anytag1", "compressed") // Another filtering meta data tag.
.subscribe();
To send a message with a tag that matches:
Producer producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
producer.newMessage()
.property("tag0", "urgent") // tag with a matching property value.
.value("my-matching-message")
.sendAsync();

An "alltag" will only match if all other alltags in the consumer subscription meta data properties also can be matched. This allows "and" matching of multiple values.
e.g. Say a consumer subscribes with an "alltag" property with "alltag1" of "urgent" and another with key "alltag2" and value "compressed". Then a message with property "tag0" as key and "urgent" and property "tag1" of "compressed" as value will match while a message with "tag0" of "urgent" and "tag1" of "uncompressed" property values will not.

Consumer consumer = client.newConsumer(Schema.STRING)
.topic("sometopic")
.property("alltag0", "urgent") // A filtering meta tag.
.property("alltag1", "compressed") // Additional filtering meta tag.
.subscribe();
will match on the following message:
producer.newMessage()
.property("tag0", "urgent") // A tag property value.
.property("tag1", "compressed") // A second tag property value.
.value("my-matching-message")
.sendAsync();
But the following will not match:
producer.newMessage()
.property("tag0", "urgent") // A tag property value.
.property("tag1", "uncompressed") // A second tag property value.
.value("my-not-matching-message")
.sendAsync();
producer.newMessage()
.property("tag0", "urgent") // A tag property value.
.value("my-other-not-matching-message")
.sendAsync();

If the consumer subscribes with both anytags and alltags then at least one anytag and all of the alltags must match tags in the message (with overlap allowed) for the message to be passed by the filter.
The default implementation matches only on properties that have keys beginning with "tag" to help avoid unintended matching. A new Java ConsumerFilter interface and factory allows other property-based filtering schemes to be implemented and deployed.

Apart from message tag properties starting with (lowercase) "tag" and consumer tag subscription meta data properties starting with "anytag" and "alltag" there are no other requirements or conventions on tag naming and usage. For example, tags could use more meaningful keys such as "tag_priority" and namespace tag values such as values "prority_urgent" and "priority_low".
Another possibility is to use values such as "priority=urgent" and match by text on such categories or classes (with keys just enumerating "tag0","tag1" etc as in examples above).
These suggestions are a convention only and not checked or enforced in any way. In the examples above we have used "0" and "1" but other unique strings (such as "A","B" etc) could have been used.

Modifications

A new interface "ConsumerFilter" (org/apache/pulsar/broker/service/ConsumerFilter.java) with a factory (ConsumerFilterFactory.java) configured via settings in org/apache/pulsar/broker/ServiceConfiguration.java allows consumer filtering to be configured was added. Our suggested default implementation of this interface (behaviour described in detail above) is defined by the "ConsumerTagFilter" class.

In order to minimise code changes we've limited our modifications mainly to two existing classes. The consumer abstract base class org.apache.pulsar.broker.AbstractBaseDispacher is modified to use the above interface to filter both batches and single messages (conditionally only if consumer filtering is enabled) in method filterEntriesForConsumer(), keeping this method's code as similar as possible to the current implementation.

For filtering entries in a batch, we duplicated an existing method in org.apache.pulsar.client.impl.RawBatchConverted and adapted it for our needs. The new static "filter" method in this class was kept as similar as possible to the "rebatchMessage" it was based on. It's expected that on average most messages will be both batched and filtered out (on average if a filter is in effect) and we've therefore not attempted to optimise other code paths. Non-filtered usage should not be impacted except for a simple check.

A handful of other code files changes in the persistent and nonpersistent sub modules of
org.apache.pulsar.broker.service are just single point modifications to pass the consumer object through so that the consumer meta data properties are available for use in filtering.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

The org.apache.pulsar.broker.service.ConsumerFilterTest unit test tests the ConsumerFilter implementations.
Both "anytag" and "alltags" combinations are tested by simple combinations of these meta data properties.
A more comprehensive ProducerConsumerBase sub-class test
(org.apache.pulsar.client.api.ConsumerFilteringTest) tests sending and receiving messages with variations on the percentage of messages with tag properties that should match the subscription filter. All and no matches as well as 10% and 1% expected matches are tested.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
    No

  • The public API: (yes / no)
    Yes,
    Change to semantics of key/value meta data supplied by Consumer. API signature unchanged.

  • The schema: (yes / no / don't know)
    No

  • The default values of configurations: (yes / no)
    Yes, if enabled by default. Code switchable.

  • The wire protocol: (yes / no)
    No

  • The rest endpoints: (yes / no)
    No

  • The admin cli options: (yes / no)
    No

  • Anything that affects deployment: (yes / no / don't know)
    No

Documentation

Please see README_CONSUMER_FILTER in PR.

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@joefk
Copy link
Contributor

joefk commented Nov 12, 2020

What is the performance impact on broker througput with this feature? Has any tests been run for measuring this?
What additional GC load is generated by this feature?

@andrekramer1
Copy link
Author

andrekramer1 commented Nov 13, 2020 via email

@rdhabalia
Copy link
Contributor

@andrekramer1 can you test and share broker CPU performance graph with this change. may be 5K topics with each topic having 2-3 subscriptions and each consumer is having 3-5 filters. and it would be great if you can also collect other metrics such as : gc, e2e latency for other topics which don't have these filters (to validate noisy neighbor scenario).

@codelipenghui
Copy link
Contributor

@andrekramer1 We should avoid repacking the batch at the broker side. This will introduce more GC workload and need more CPU cycles to repack the message. You can see https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-raw-Message-metadata, try to avoid serialize and deserialize the message again on the server-side.

I want to describe my rough idea, If it is a batch message, we have message metadata for the batch and single message metadata for each message. So I think we can add all tags into the message metadata of the batch. For example, 2 messages in a batch and property1 for message0 is 1, property1 for message1 is 2. So that we can add property1 = [1,2] in the batch message metadata. So, the broker can filter out the batches.

@andrekramer1
Copy link
Author

andrekramer1 commented Nov 16, 2020 via email

@andrekramer1
Copy link
Author

andrekramer1 commented Nov 16, 2020 via email

@sijie
Copy link
Member

sijie commented Nov 16, 2020

Maybe an existing benchmark can be modified for this? Our testing confirmed good savings when most messages were filtered out but obviously there is an impact when a filter is in place. The situation for no filter should not be very different.

Can you share the savings and provide the details on how did you do the test and benchmark?

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of major comments:

  1. what is the performance & GC implication here? I think the community would like to see benchmark results and performance numbers around that. So it would be good to show all the benchmark results and your test methodology along with this pull request.

  2. how are you going to handle topics with end-to-end encryption enabled?

  3. how do you handle acknowledgment?

Regarding the code itself.

  1. Please use NAR class loader to load the class. See how we implemented the Offloader interface.

@andrekramer1
Copy link
Author

andrekramer1 commented Nov 17, 2020 via email

@holmes07
Copy link

Why not replace properties with message key ?

If producer batch is enabled, the messages are grouped by key, so the server does not need to parse the messages one by one,Only need to parse metadata

@andrekramer1
Copy link
Author

andrekramer1 commented Nov 26, 2020 via email

@eolivelli
Copy link
Contributor

This work is very interesting.

Do you have time to resume this discussion?

@andrekramer1
Copy link
Author

andrekramer1 commented Apr 6, 2021 via email

@cbornet
Copy link
Contributor

cbornet commented May 16, 2021

I think this is an important feature to make RabbitMQ/AMQP users that use this kind of patterns to transition to Pulsar. Would also be nice if it's integrated into Pulsar functions input.

@julienlau
Copy link

julienlau commented Jun 11, 2021

I think this is an important feature to make RabbitMQ/AMQP users that use this kind of patterns to transition to Pulsar. Would also be nice if it's integrated into Pulsar functions input.

I switched my PoC of a latency sensitive project from RabbitMQ to Pulsar for resilience matters.
RabbitMQ is very very fast and efficient at this use case if you are not using resilient queues (quorum queues).
I took a +10ms by switching to a replicated system (Pulsar) and I think that if I perform my filtering by adding a second layer of filtered topics I would experience an additionnal latency hit.

In my view, there could be 2 server side filtering consumer:

  • a simple server-side filtering only on partition_key would already be a very very interesting feature.
    I mean simple because it should be fast :

  • a more advanced filtering on the whole meta-data could also be handy, but it is generally slower... it may be a good idea to segregate both consumer type ?
    It would be like Headers Exchange of RabbitMQ
    This could be the consumer proposed on this PR

This segregation on two different consumers would allow not to serialize/deserialize too much data on the server side as advised by @codelipenghui

In my limited experience with RabbitMQ, there is almost no latency impact between a Direct exchange with no filtering and a Topic Exchange with simple routing_key filtering. However the Headers Exchange is noticeably slower.
I would be curious to have @Vanlightly opinion on this.

@github-actions
Copy link

github-actions bot commented Mar 4, 2022

@andrekramer1:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@gjanardh
Copy link

@andrekramer1 Thanks for your contribution to the consumer filtering feature. We heavily use JMS brokers for messaging and trying Pulsar for few use cases now. Support for message selectors/filters (in line with the JMS Spec) will help existing JMS users switching to Pulsar. Hoping to see this feature in a future release of Pulsar!

Consumer consumer = client.newConsumer(Schema.STRING)
.topic("sometopic")
.selector("prop = 'value' and prop2 = val2 or (prop3 like '%val3%')") // A message selector similar to JMS Message selector SQL 92 syntax. https://docs.oracle.com/javaee/7/api/javax/jms/Message.html
.subscribe();

@tisonkun
Copy link
Member

tisonkun commented Dec 8, 2022

Closed as stale and huge conflict. Please rebase and resubmit the patch if it's still relevant.

@tisonkun tisonkun closed this Dec 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Could Subscription filter message for consumer