-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Mailing List discussion: https://lists.apache.org/thread/vfyb607dyspv8gjogc726pp7p0ovfgzk
Motivation
About pattern subscribes of consumers, the topicsPattern and subscribeTopicMode configurations are contradictory.
For example, the topicsPattern represents only subscription to persistent topic, but the subscriptionTopicsMode represents subscription to all topic.
Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topicsPattern(pattern)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.build();Finally, all topics are subscribed. It's very confusing.
Note: subscribeTopicMode was introduced by PR-2025 to solve the issue of subscribing to non-persistent topics.
Goal
Actually, We can decide what subscriptionTopicsMode is based on the fact that the domain is included in the topicsPattern.
- If not include domain(
public/default/pattern-topic.*): subscriptionTopicsMode =AllTopics - If the domain is persistent(
persistent://public/default/pattern-topic.*): subscriptionTopicsMode =PersistentOnly - If the domain is non-persistent(
non-persistent://public/default/pattern-topic.*): subscriptionTopicsMode =NonPersistentOnly
This is more straightforward and eliminates confusion.
So, this PIP Goal is:
- Deprecation of the consumer
subscribeTopicModeconfiguration. - Modify the API docs to make them clearer for users.
- Maintain compatibility and do not introduce breaking changes.
API Changes
1. Deprecation of the consumer subscribeTopicMode configuration
index 14a94cb8286..e14a7aef748 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -567,7 +567,9 @@ public interface ConsumerBuilder<T> extends Cloneable {
*
* @param regexSubscriptionMode
* Pattern subscription mode
+ * @deprecated Please refer {@link #topicsPattern(String)} }
*/
+ @Deprecated
ConsumerBuilder<T> subscriptionTopicsMode(RegexSubscriptionMode regexSubscriptionMode);2. Modify the topicsPattern method docs.
index 14a94cb8286..32f8f7aa1eb 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
- * <p>The pattern is applied to subscribe to all topics, within a single namespace, that match the
- * pattern.
+ * The pattern can be matched to all topics in a namespace, or to only persistent or only non-persistent.
+ * - If pattern not include domain(`public/default/pattern-topic.*`): It will match to all topics.
+ * - If the domain is persistent(`persistent://public/default/pattern-topic.*`): It will only match to the persistent topic.
+ * - If the domain is non-persistent(`non-persistent://public/default/pattern-topic.*`): It will only match to the non-persistent topic.
* <p>The consumer automatically subscribes to topics created after itself.
*
* @param topicsPattern
* a regular expression to select a list of topics to subscribe to
* @return the consumer builder instance
*/
ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);Implementation
1. Before sending the lookup request, get the subscribeMode according to partternTopic.
We can modify 'convertRegexSubsciptionMode' to support conversion via the patternTopic string(according to the rules described in the goal part).
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
Lines 558 to 564 in 0f72a82
| String regex = conf.getTopicsPattern().pattern(); | |
| Mode subscriptionMode = convertRegexSubscriptionMode(conf.getRegexSubscriptionMode()); | |
| TopicName destination = TopicName.get(regex); | |
| NamespaceName namespaceName = destination.getNamespaceObject(); | |
| CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>(); | |
| lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode, regex, null) |
In order not to introduce breaking changes, We need to prioritize the subscribeMode set by the user.
Pseudocode:
String regex = conf.getTopicsPattern().pattern();
Mode subscriptionMode;
if (conf.getRegexSubscriptionMode() != null) {
subscriptionMode = convertRegexSubscriptionMode(conf.getRegexSubscriptionMode());
} else {
subscriptionMode = convertRegexSubscriptionModeByPattern(conf.getTopicsPattern());
}
TopicName destination = TopicName.get(regex);
NamespaceName namespaceName = destination.getNamespaceObject();
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode, regex, null)Note: This way. if the user has subscriptionTopicsMode configured, the "error" result described in Motivation will still appear, but this is also a compromise to maintain compatibility. After the configuration is removed in subsequent versions, it will be clearer
2. Set the RegexSubscriptionMode default value as null.
index 373d4e66c0e..c43c6b90f63 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -322,7 +322,7 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
+ "* **NonPersistentOnly**: only subscribe to non-persistent topics.\n"
+ "* **AllTopics**: subscribe to both persistent and non-persistent topics."
)
- private RegexSubscriptionMode regexSubscriptionMode = RegexSubscriptionMode.PersistentOnly;
+ private RegexSubscriptionMode regexSubscriptionMode;
This introduces inconsistent behavior from before, but I don't think it's a breaking change.
For examples:
Pattern pattern = Pattern.compile("non-persistent://my-property/my-ns/.*");
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topicsPattern(pattern)
.build();- Before this change: It will subscribe to all
persistent topicson this namespace. It's very confusing. - After this change: It will subscribe to all
non-persistent topicson this namespace.
Alternatives
No response
Anything else?
This PIP can also directly solve: #6845