Skip to content

Conversation

@seeday
Copy link
Contributor

@seeday seeday commented Jul 22, 2020

Motivation

Broker-side filtering of messages can reduce the network load significantly when running certain types of highly-filtered jobs. Example usecases are flink filter-pushdown, displaying data over websocket, and other network-limited actions.

So far, the list of available filters is pretty small, but this framework should make writing new ones relatively easy.

Modifications

I've modified the pulsar protocol such that clients can send a spec for filtering to the broker. The broker's Consumer class builds a filter from the arguments sent as well as the class name. I've gone with reflection because that makes for easy extension and customization of the available filters.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • the new tests in MessageFilteringTest/SimpleTypedMessageProducerConsumerTest should pass
  • all other tests should be unaffected by the changes

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
    • a new function is added to the consumer to allow for constructing a message filter
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)
    • a new field was added to commandsubscribe to send info about the message filter
  • The rest endpoints: (yes / no)
  • The admin cli options: (yes / no)
  • Anything that affects deployment: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    • once this is in a more stable state, an entry in the actual docs is probably necessary.
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@seeday
Copy link
Contributor Author

seeday commented Jul 22, 2020

Hi all.

This is my first large pulsar PR, and given the contents of it I've got quite a few questions/discussion points about the implementation (in no particular order)

  • my biggest worry is that the way I've done filtering (acking filtered-out messages on the broker) fails in some strange case
  • additionally, the performance of running the filter inside the jetty thread-loop might be questionable, but I feel like running it in the consumer send function would be worse (as I think that's single threaded?).
  • reflection to create the filter classes isn't pretty, but it allows for easy user extension. Is that reason enough to have it?
  • also, what kind of filters should pulsar itself be in the business of providing? I only created a few to prove out the feature, but they also cover a relatively large amount of my use-case.
  • Is there a better way to send over arguments to the filter? I used KeyValue since it was convenient, but sending serialized objects of some sort would be much nicer (and necessary for flink integration, as far as I can tell)

@merlimat
Copy link
Contributor

For changes that introduce new features or protocol changes, it's recommended to submit a PIP (design doc) for discussion to the mailing list. PTAL for examples at https://github.com/apache/pulsar/wiki

@seeday
Copy link
Contributor Author

seeday commented Jul 22, 2020

For changes that introduce new features or protocol changes, it's recommended to submit a PIP (design doc) for discussion to the mailing list. PTAL for examples at https://github.com/apache/pulsar/wiki

You know, "should I open a PIP too" should have made my question list. I'll start putting one together.

@seeday
Copy link
Contributor Author

seeday commented Jul 22, 2020

I've drawn a PIP up but couldn't figure out how to actually contribute it. It doesn't look like you can do wiki PRs.

@sijie
Copy link
Member

sijie commented Jul 22, 2020

@seeday You can put the PIP in a Google Doc or a gist and send an email to dev@ mailing list. Once the thread is started, one of the committers will take the content and add it to Pulsar wiki.

@github-actions
Copy link

github-actions bot commented Mar 4, 2022

@seeday:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants