Skip to content

Conversation

@gordeevbr
Copy link
Contributor

Motivation

Please see issue #2009 for a detailed bug report.

In our use case we require using java client with pattern subscription to read from a set of non-persistent topics. Unfortunately, right now this feature doesn't work. After researching the cause for this I have found out that under the hood the client is requesting a list of topics by namespace from the server and then filters them out by pattern and subscribes to them. The method in Pulsar broker NamespaceService class that is responsible for searching for required topics only uses ledgers, thus returning only persistent topics to the client. The goal of this pull request is to provide a solution for that problem.

Modifications

This pull request updates getListOfTopics method of NamespaceService class to also include active non-pesistent topics from local broker cache inside the multiLayerTopicsMap collection of BrokerService in the result.

Result

As a result, requesting a list of topics by namespace using the HTTP API or binary API (and thus via the clients) will add non-persistent topics to search result, allowing pattern subscription to be used with non-persistent topics.

Considerations

  1. Since this method pulls non-persistent topics from local broker cache, this probably means that this solution will only work for Pulsar installations with a single broker. And if there are multiple brokers, results might be inconsistent. Unfortunately I don't really know if non-persistent themselves work in multi-broker setups. I have recently asked on Slack if non-persistent topics are being replicated in any way and @merlimat's response was that they don't. Also it seems to be that some other methods that are working with non-persistent topics are using this very same collection.

  2. It seems to me that unit tests have made sure that Java client can work with this setup, but this might still be a breaking change for other clients or if applications working with this API are not expecting non-persistent topics in result.

  3. I have made sure that old unit tests inside the pulsar-broker subproject are still working and updated some old tests for this particular use case. Are there any more tests that I can add.

Overall, we really need this and I would appreciate if maintainers could share their opinion. Thanks in advance.

@gordeevbr gordeevbr changed the title Allowed non-pesistent topics to be retrieved along with persistent ones. Allow non-pesistent topics to be retrieved along with persistent ones Jun 24, 2018
@gordeevbr gordeevbr changed the title Allow non-pesistent topics to be retrieved along with persistent ones Allow non-pesistent topics to be retrieved along with persistent ones from the "GetTopicsOfNamespace" method Jun 24, 2018
@sijie sijie requested a review from jiazhai June 25, 2018 05:08
@sijie sijie added this to the 2.2.0-incubating milestone Jun 25, 2018
@sijie
Copy link
Member

sijie commented Jun 25, 2018

@jiazhai please review this when you have time.

@gordeevbr
Copy link
Contributor Author

Just noticed integration tests are failing. Is this my fault? Should I fix that?

@srkukarni
Copy link
Contributor

retest this please

}

// Non-persistent topics don't have managed ledges so we have to retrieve them from local cache.
synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
Copy link
Contributor

@merlimat merlimat Jun 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only answer with the non-persistent topics served by the broker that it's handling the particular request. However, topics for a single namespace can be spread across multiple brokers.

We would need to do something like this ( https://github.com/apache/incubator-pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java#L170 ) to get the correct list of non-persistent topics.

I think the best option is to actually use the same API to get the list of non-persistent topics through HTTP. GET /admin/v2/non-persistent/{tenant}/{namespace}. That should take care of everything.

@merlimat
Copy link
Contributor

Unfortunately I don't really know if non-persistent themselves work in multi-broker setups. I have recently asked on Slack if non-persistent topics are being replicated in any way and @merlimat's response was that they don't. Also it seems to be that some other methods that are working with non-persistent topics are using this very same collection.

  • A single topic (or partition of a topic) is assigned to a single broker, at a given point in time.
  • Topics belonging to a namespace can be assigned to different brokers (that is true for both persistent and non-persistent topics)
  • Persistent topics use BookKeeper to persist data and maintain state --> When a broker crashes, the topic can move immediately to a different broker and restart with the same exact state.
  • Non-persistent topics do not store any kind of state durably. The messages are kept for a very short amount of time in memory in the broker and dropped if they cannot be delivered immediately. The data for a single non-persistent topic is not stored in multiple machines. If the broker crashes, some messages will be lost and the topic will continue on the new broker.

final String topicName2 = "persistent://my-property/my-ns/topic-2-" + key;
final String topicName3 = "persistent://my-property/my-ns/topic-3-" + key;
List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
final String topicName4 = "non-persistent://my-property/my-ns/topic-4-" + key;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems some space alignment issue.

String topicName1 = "persistent://my-property/my-ns/pattern-topic-1-" + key;
String topicName2 = "persistent://my-property/my-ns/pattern-topic-2-" + key;
String topicName3 = "persistent://my-property/my-ns/pattern-topic-3-" + key;
final String topicName4 = "non-persistent://my-property/my-ns/pattern-topic-4-" + key;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here.

Copy link
Member

@jiazhai jiazhai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix. Test case looks good. As @merlimat point out, there is better way to get the list.

@gordeevbr
Copy link
Contributor Author

Thanks for reviewing this everyone.
@merlimat we expected this feature to work through Pulsar Java client. I guess to use that API we will have to write our own client logic that queries for non-persistent topics first. I will take a look.
@jiazhai I must have forgotten my IDE is configured to use 2 whitespaces.

@merlimat
Copy link
Contributor

@merlimat we expected this feature to work through Pulsar Java client. I guess to use that API we will have to write our own client logic that queries for non-persistent topics first. I will take a look.

Yes, I was meaning that in broker side we should use the client API (there are already several cases that already do that). Since a single broker will not have all the topics, and that non-persistent topics don't have additional metadata, the client API call is the only option to get the correct list of non-persistent topics.

An additional improvement could be to only make this call if we know the client is interested in non-persistent topics and skip it otherwise.

@gordeevbr gordeevbr changed the title Allow non-pesistent topics to be retrieved along with persistent ones from the "GetTopicsOfNamespace" method [WiP] Allow non-pesistent topics to be retrieved along with persistent ones from the "GetTopicsOfNamespace" method Jun 30, 2018
@gordeevbr
Copy link
Contributor Author

@merlimat @jiazhai I have updated this feature as you suggested.
I haven't yet added tests for this or fixed any old ones, just made sure it compiles. I will test this ASAP, just wanted to make sure this functionality is OK before I go any further.

This is what was done in the latest commits:

  • Additional property "Mode" was added to "CommandGetTopicsOfNamespace" entity of Pulsar binary API via Protobuf. This is a enum that has 3 values: "Persistent", "Non-Persistent", or "All". Value "Persistent" was set to be the default one.

  • Broker binary protocol server code was updated to retrieve topics from lookup based on this enum. More importantly, broker will attempt to retrieve a list of non-persistent topics from another cluster if necessary. A pool of Pulsar Clients was made for that purpose inside NamespaceService.

  • I have attempted to do the same for HTTP APIs, but I have only done so for API V1. I am not sure what to do about API V2, as it seems to use a somewhat more complicated functionality.

  • Client code was updated to support this. When building a consumer it is now possible to specify to which kind of topics you want to subscribe when using pattern subscription.

Will be glad to receive your feedback.

@sijie
Copy link
Member

sijie commented Jul 9, 2018

@merlimat @jiazhai can you review this PR and give @gordeevbr some feedback?

@sijie
Copy link
Member

sijie commented Aug 2, 2018

ping @merlimat and @jiazhai . lets give some feedback to @gordeevbr , so that we can move forward with the change here.

@jiazhai
Copy link
Member

jiazhai commented Aug 6, 2018

retest this please

import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.policies.data.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should avoid using * in import.

List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics();
List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();

assertEquals(topics.size(), 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test failed at here.
Before patternconsumer created, the producer had been created. By design, it should have 7 topics (1+2+3+1)?

Copy link
Member

@jiazhai jiazhai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gordeevbr The mode change looks good to me, Please go ahead.

FYI. Seems this caller also need fix:

2018-08-05\T\16:42:17.037 [ERROR] /home/jenkins/jenkins-slave/workspace/pulsar_precommit_cpp/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java:[335,31] method newGetTopicsOfNamespaceRequest in class org.apache.pulsar.common.api.Commands cannot be applied to given types;
2018-08-05\T\16:42:17.037 [ERROR]   required: java.lang.String,long,org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode
2018-08-05\T\16:42:17.037 [ERROR]   found: java.lang.String,long
2018-08-05\T\16:42:17.037 [ERROR]   reason: actual and formal argument lists differ in length

@gordeevbr
Copy link
Contributor Author

Thanks for your feedback.
Yes, I believe I have messed up some tests with this. I shall fix this as soon as possible.

@gordeevbr
Copy link
Contributor Author

@jiazhai @merlimat @sijie

I have fixed old tests that were failing, added some new tests to test suites where it was possible, updated features to be consistent with HTTP Lookup, fixed all found issues, and updated with master branch.

There's not much new in these commits, mostly fixes and tests.

I think it should be ready now. I am, of course, ready to update this PR if deemed necessary.

@sijie sijie changed the title [WiP] Allow non-pesistent topics to be retrieved along with persistent ones from the "GetTopicsOfNamespace" method Allow non-pesistent topics to be retrieved along with persistent ones from the "GetTopicsOfNamespace" method Sep 4, 2018
@sijie
Copy link
Member

sijie commented Sep 4, 2018

@gordeevbr cool, thanks. I removed "[wip]". so other people know this PR is ready to review.

Copy link
Member

@jiazhai jiazhai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, Thanks for the work.

@gordeevbr
Copy link
Contributor Author

Would you mind reruning these tests? I'm quite sure some of these were working just now.

@sijie
Copy link
Member

sijie commented Sep 23, 2018

run java8 tests

@sijie
Copy link
Member

sijie commented Sep 24, 2018

retest this please

@sijie
Copy link
Member

sijie commented Sep 24, 2018

run java8 tests

@sijie sijie merged commit 6907135 into apache:master Sep 25, 2018
@gordeevbr
Copy link
Contributor Author

Great. Thank you very much!

@gordeevbr gordeevbr deleted the GetListOfTopicsNonPersistent branch September 25, 2018 06:11
codelipenghui pushed a commit that referenced this pull request Jun 26, 2022
)

### Motivation

Since #2025 has added query param - `Mode`,  but the CLI is still not supported.

#15410 has introduced to filter system topic. 

So this patch is supported the above query params for Namespace#getTopics.
@VijayRohra
Copy link

This is not working for me in Pulsar 2.11.1. I am able to receive messages from non-persistent topics belonging to single broker only.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants