-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[pulsar-broker] namespace resources use metadata-store api #9351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1afcb3d to
3c4bb85
Compare
cd0d738 to
2bc71f9
Compare
|
/pulsarbot run-failure-checks |
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work.
Code really looks cleaner
I left a couple of questions
| if (maxPartitions > 0 && autoTopicCreationOverride.defaultNumPartitions > maxPartitions) { | ||
| throw new RestException(Status.NOT_ACCEPTABLE, | ||
| "Number of partitions should be less than or equal to " + maxPartitions); | ||
| if (autoTopicCreationOverride != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this if statement was not present before, can it be null here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I merged two methods which had duplicate code:
internalRemoveAutoTopicCreation and internalSetAutoSubscriptionCreation
| } | ||
|
|
||
| private void clearCache() { | ||
| // (MetadataCacheImpl<ClusterData>) pulsar.getPulsarResources(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove comment
| if (ex.getCause() instanceof BadVersionException) { | ||
| // if resource is updated by other than metadata-cache then metadata-cache will get bad-version | ||
| // exception. so, try to invalidate the cache and try one more time. | ||
| objCache.synchronous().invalidate(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call here this synchronous operation ?
do we risk a deadlock ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, it just cleans up the cache synchronously.
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
61001b2 to
38481a9
Compare
| @Override | ||
| public CompletableFuture<Void> readModifyUpdateOrCreate(String path, Function<Optional<T>, T> modifyFunction) { | ||
| return objCache.get(path) | ||
| return executeWithRetry(() -> objCache.get(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is already in master, can you merge or rebase?
| final String namespace = namespaceName.toString(); | ||
| final String policyPath = AdminResource.path(POLICIES, namespace); | ||
| Policies policies = policiesCache().get(policyPath) | ||
| Policies policies = namespaceResources().get(policyPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ideally namespace resources should take a namespace name and internally resolve the path on metadata store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, make sense. I will try to handle in another PR so, I can handle unit-test failures related to this change separately.
| } catch (KeeperException.NodeExistsException e) { | ||
| log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName); | ||
| } catch (NotFoundException e) { | ||
| log.error("[{}] namespace already exists {}", clientAppId(), namespaceName, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception and the log message seem to have diverged now
| * @param property the property name | ||
| * @return the list of namespaces | ||
| */ | ||
| protected List<String> getListOfNamespaces(String property) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| protected List<String> getListOfNamespaces(String property) throws Exception { | |
| protected List<String> getListOfNamespaces(String tenant) throws Exception { |
fix build fix api Fix test: fix test fix test
|
/pulsarbot run-failure-checks |
Motivation
This PR is on top of #9338, with this change, namespace-resources will start using metadata-api and remove zk-dependency.