Skip to content

Conversation

@ConcurrencyPractitioner
Copy link
Contributor

Feature Request

There was a request for this feature in #3436.

Documentation

Will add to documentation a description of the new feature.

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better approach is to disable auto creation at broker side. So administrators can disable this behavior while creating the cluster.

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented Jan 27, 2019

Ok, I found that the change should be in PersistentTopicsBase L946. I will work on it from there.

@sijie
Copy link
Member

sijie commented Jan 27, 2019

@ConcurrencyPractitioner : cool.

pulsar broker uses managed ledger to store the topic partitions. there is actually a config in managed ledger createIfMissing - https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java#L71 . if auto create topic is disabled, then you can disable createIfMissing.

Hope this gives you some entries for you to check out the code.

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented Jan 27, 2019

Ok, I did some digging and here's what I found:
createIfMissing is used when seeing if a Z-Node is already built for a ledger. If a Z-Node does not exist, then a new one will be created (i.e. if createIfMissing equals true).

I found that PersistentTopicsBase will (probably?) create a new PersistentTopic if there is no partition metadata (partitionMetadata.partitions == 0 ) for that particular store. At this point, I'm not too sure what this means.

My best guess is that if there is no Z-Node for a ledger, then that means no partitions exists for it either. @sijie

At this point, I'm not too sure what connection createIfMissing has with partitionMetadata.partitions.

Edit: Ok, after some more digging. I have found that when getPartitionedTopicMetadata is called in PersistentTopicsBase, a CompletableFuture will be created. I found that if the value returned is null (i.e. if no corresponding Z-Node exists, rc == Code.NONODE.intValue()), then it indicates that a particular topic is not partitioned. So I think this basically means my inference is right.

@ConcurrencyPractitioner
Copy link
Contributor Author

Ok, there are some kinks in my understanding. Wouldn't disabling createIfMissing also mean that no non-partitioned topics will be created? I'm not too sure actually.

@sijie
Copy link
Member

sijie commented Jan 28, 2019

@ConcurrencyPractitioner

Ok, there are some kinks in my understanding. Wouldn't disabling createIfMissing also mean that no non-partitioned topics will be created? I'm not too sure actually.

Ah, you are right.

So I think in the task, you should do following:

  • provide a flag allowTopicAutoCreation, defaults to true
  • the entrypoint for creating/loading topic is BrokerService#getOrCreateTopic.
  • getOrCreateTopic will use allowTopicAutoCreation as createIfMissing.
  • the only exception is to use createIfMissing=true when BrokerService#getOrCreateTopic is called in admin rest endpoint (PersistentTopicsBase.java)
  • you need to provide a rest endpoint in PersistentTopicsBase to create non-partitioned topic
  • you need to improve createPartitionedTopic in PersistentTopicsBase to create all the partitions, because rest admin endpoint is the only entrypoint to create partitions when allowTopicAutoCreation is set to false.

Hope this clarify the tasks to do for this feature.

@sijie sijie added type/feature The PR added a new feature or issue requested a new feature area/broker labels Jan 28, 2019
Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think allowAutoTopicCreation should be a flag in broker configuration, rather than in ManagedLedgerConfig. So BrokerService would set different createIfMissing values at different scenarios based on whether allowAutoTopicCreation is set to true or not.

Also we need add new unit tests and integration tests to cover this feature.

});
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove trailing spaces

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm agree!

@ConcurrencyPractitioner
Copy link
Contributor Author

Hi @sijie

I've moved the config, but where would you suggest that we add the test? I've made some efforts and discovered a couple of candidate test locations like PulsarFunctionAdminTest and BrokerServiceTest. However, they don't seem to be testing configs related to Managed Ledger and the creation of z-nodes.

@sijie
Copy link
Member

sijie commented Feb 4, 2019

@ConcurrencyPractitioner

I think you need to verify following cases:

when autoCreateTopic is disabled, 1) you can't publish or consume from topics when topics are not explicitly created. 2) you can create partitioned and non-partitions topics from pulsar admin rest endpoint.

You can add a new test case to pulsar-broker module. There are plenty test cases using MockedPulsarServiceBaseTest, you can check them out as examples.

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented Feb 6, 2019

HI @sijie,

I'm currently using Eclipse to develop for Pulsar. What IDE do you use for Pulsar?
On a side note, do you know how to get the .class file for a .java file like ServiceConfiguration and import it into an IDE? Without it, the Getter() and Setter() methods that are lombok generated is not recognized by the IDE (for me, Eclipse) as methods which exist (so they are considered illegal syntax).

@sijie
Copy link
Member

sijie commented Feb 6, 2019

@ConcurrencyPractitioner

I am using Intellij for my java development.

For Lombok with Eclipse, I think if you run java -jar /path/to/lombok.jar, it will start a lombok installation window, you can follow the instructions to configure your IDE.

Here is one blog post about that - https://howtodoinjava.com/automation/lombok-eclipse-installation-examples/ Hope that helps.

@ConcurrencyPractitioner
Copy link
Contributor Author

ConcurrencyPractitioner commented Feb 6, 2019

Thanks for the pointers. Just wanting to confirm, is it the community version or the licensed version?

Also, I did some work on PersistentTopicsTest and attempted to add a new test.
Here's basically what it did:

    	final String nonPartitionTopic = "nonPartitionedTopic";
    	persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest);
    	List<String> subscriptions =  persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true);
        Assert.assertTrue(subscriptions.contains("test"));
        persistentTopics.deleteSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true);
        subscriptions =  persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true);
        Assert.assertTrue(subscriptions.isEmpty());  

I also modified the config value i.e. pulsar.getConfiguration().setAllowAutoTopicCreation(false)

WDYT of this?

@sijie
Copy link
Member

sijie commented Feb 6, 2019

Just wanting to confirm, is it the community version or the licensed version?

you mean Intellij? I am using the community version.

@ConcurrencyPractitioner
Copy link
Contributor Author

Retest this please.

@ConcurrencyPractitioner
Copy link
Contributor Author

@sijie Ok, added the PersistentTopicsTest. There are a couple more tests to go, but other than that, this PR should be getting close.

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConcurrencyPractitioner overall looks good to me.

I think there are two remaining tasks for this. if you can do them in a follow up pull request, that would be great.

currently we don't have any rest endpoint for creating non-partitioned topics. so if we disable auto-creation of topics, people doesn't have any mechanisms to create non-partitioned topics.

so we need to add a rest endpoint for creating non-partitioned topic. this includes both admin rest endpoint change, a commandline cli change and documentation. I am wondering if you are interested in continuing the contributions in a subsequent pull request.

}
} else if (rc == Code.NONODE.intValue()) {
// Z-node doesn't exist
log.warn("createIfMissing has value of {}", createIfMissing);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this for debugging purpose?

return getOrCreateTopic(topic, false /* use allowAutoTopicCreation */);
}

public CompletableFuture<Topic> getOrCreateTopic(String topic, boolean defaultConfig) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: defaultConfig => useAllowAutoTopicCreationSetting


public CompletableFuture<Topic> getOrCreateTopic(String topic, boolean defaultConfig) {
boolean createIfMissing = defaultConfig ? pulsar.getConfiguration().isAllowAutoTopicCreation() : true;
log.warn("The createIfMissing value had been set to {}", createIfMissing);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, can you move the logging to debug, otherwise it will be very annoying.

}

getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> {
log.warn("createIfMissing has been set to {}", createIfMissing);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.debug?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this comment or turn it to log.debug

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConcurrencyPractitioner can you also remove this log.warn statement?


public CompletableFuture<Optional<Topic>> getTopicIfExists(final String topic) {
return getTopic(topic, false /* createIfMissing */);
return getTopic(topic, false /* use allowAutoTopicCreation */);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh no. getTopicIfExists should never create a topic. That is the purpose of this method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see. so the comment of the flag here is wrong, no?

it should be /* createIfMissing */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good point. I goofed.


public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
return getTopic(topic, true /* createIfMissing */).thenApply(Optional::get);
return getOrCreateTopic(topic, false /* use allowAutoTopicCreation */);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be true?

import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Ignore;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not needed any more no?

@ConcurrencyPractitioner
Copy link
Contributor Author

Retest this please.

@sijie
Copy link
Member

sijie commented Feb 13, 2019

@ConcurrencyPractitioner I have a comment left regarding the comment. otherwise, this change is ready to go.

@ConcurrencyPractitioner
Copy link
Contributor Author

Ok @sijie Done.

}
}

service.getOrCreateTopic(topicName.toString())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need true here, it defaults to true.

log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);

service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
service.getOrCreateTopic(topicName.toString(), true).thenAccept((Topic topic) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above.

return getOrCreateTopic(topic, true /* use allowAutoTopicCreation */);
}

public CompletableFuture<Topic> getOrCreateTopic(String topic, boolean useAllowAutoTopicCreationSetting) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating a new override for getOrCreateTopic, make getTopic public.
Then the one place that calls getOrCreateTopic(..., false) can instead call getTopic(..., true), and getOrCreateTopic() can become

public CompletableFuture<Topic> getOrCreateTopic(final String topic) { 
             return getTopic(topic, pulsar.getConfiguration().isAllowAutoTopicCreation()).thenApply(Optional::get); 
}

@ConcurrencyPractitioner
Copy link
Contributor Author

About the rest endpoint @sijie I'm good on following through about creating it. Although to be fair, where would be the best place to add the command line client change?

@ConcurrencyPractitioner
Copy link
Contributor Author

Retest this please.

@sijie
Copy link
Member

sijie commented Feb 14, 2019

@ConcurrencyPractitioner it should be in the same location as createPartitionedTopic (both broker and client sides)

Copy link
Contributor

@ivankelly ivankelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. 👍

@ConcurrencyPractitioner
Copy link
Contributor Author

@sijie Should I add the endpoint in a separate PR or this one?

@sijie
Copy link
Member

sijie commented Feb 15, 2019

@ConcurrencyPractitioner let's do it in a separated PR

@sijie
Copy link
Member

sijie commented Feb 15, 2019

run integration tests

@ConcurrencyPractitioner
Copy link
Contributor Author

@sijie or @ivankelly Do you think this could be merged?

@sijie sijie merged commit 924f495 into apache:master Feb 16, 2019
@sijie
Copy link
Member

sijie commented Feb 16, 2019

@ConcurrencyPractitioner well done

sijie pushed a commit that referenced this pull request Sep 17, 2019
Master Issue:  #4926

### Motivation


Curently the partitioned-topic and non-partitioned topic is a little confuse for users. in PR #3450 we add config for auto-topic-creation.
We could leverage this config to provide some more config for auto-topic-creation.

### Modifications

- Add `allowAutoTopicCreationType` and `allowAutoTopicCreationNumPartitions` to configuration.
- Users can use both configurations when they decide to create a topic automatically.
- Add test.
- Update doc.
wolfstudy pushed a commit that referenced this pull request Nov 20, 2019
Master Issue:  #4926

Curently the partitioned-topic and non-partitioned topic is a little confuse for users. in PR #3450 we add config for auto-topic-creation.
We could leverage this config to provide some more config for auto-topic-creation.

- Add `allowAutoTopicCreationType` and `allowAutoTopicCreationNumPartitions` to configuration.
- Users can use both configurations when they decide to create a topic automatically.
- Add test.
- Update doc.

(cherry picked from commit 547c421)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/broker type/feature The PR added a new feature or issue requested a new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants