Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
// list is empty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that the original intent of this code was to consider an empty list as a special case that had special logic. I don't have enough context to know why, but it might be worth reviewing #2981, where this code was first introduced. If I had to guess, it's because it doesn't make much sense to have a subscription that cannot be consumed.

If this change is accepted, the comment on this line and the one above need to be updated:

// validate if role is authorize to access subscription. (skip validatation if authorization
// list is empty)

Set<String> roles = policies.get().auth_policies
.getSubscriptionAuthentication().get(subscription);
if (roles != null && !roles.isEmpty() && !roles.contains(role)) {
if (roles == null || roles.isEmpty() || !roles.contains(role)) {
log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription);
permissionFuture.complete(false);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ public void checkPermissions() {
return;
}
} catch (Exception e) {
log.warn("[{}] Get unexpected error while autorizing [{}] {}", appId, subscription.getTopicName(),
log.warn("[{}] Get unexpected error while authorizing [{}] {}", appId, subscription.getTopicName(),
e.getMessage(), e);
}
log.info("[{}] is not allowed to consume from topic [{}] anymore", appId, subscription.getTopicName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ public void simple() throws Exception {
// tests for subscription auth mode
admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "*", EnumSet.of(AuthAction.consume));
admin.namespaces().setSubscriptionAuthMode("p1/c1/ns1", SubscriptionAuthMode.Prefix);

admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "sub1", Sets.newHashSet("role1"));
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "sub2", Sets.newHashSet("role2"));
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "role1-sub1", Sets.newHashSet("role1"));
admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "role2-sub2", Sets.newHashSet("role2"));
waitForChange();

assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,16 @@ public void testSubscriberPermission() throws Exception {
.authentication(authentication));

// (1) Create subscription name
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
// should fail with empty subscription permission
try {
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
fail("should have fail with authorization exception");
} catch (org.apache.pulsar.client.api.PulsarClientException.AuthorizationException e) {
// OK
}
superAdmin.topics().createNonPartitionedTopic(topicName);
superAdmin.topics().createSubscription(topicName, subscriptionName, MessageId.latest);

// verify tenant is able to perform all subscription-admin api
tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
Expand All @@ -228,8 +235,8 @@ public void testSubscriberPermission() throws Exception {
tenantAdmin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest);

// grant namespace-level authorization to the subscriptionRole
tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole,
Collections.singleton(AuthAction.consume));
tenantAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName,
Collections.singleton(subscriptionRole));

// subscriptionRole has namespace-level authorization
sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public void testForwardAuthData() throws Exception {
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnSubscription(namespaceName, subscriptionName,
Sets.newHashSet("proxy", "client"));

// Step 2: Run Pulsar Proxy without forwarding authData - expect Exception
ProxyConfiguration proxyConfig = new ProxyConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ public void testIncorrectRoles() throws Exception {
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnSubscription(namespaceName, subscriptionName,
Sets.newHashSet("proxy", "client"));

// Step 2: Try to use proxy Client as a normal Client - expect exception
PulsarClient proxyClient = createPulsarClient(pulsar.getBrokerServiceUrl(), proxyAuthParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ public void testProxyAuthorization() throws Exception {
admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnSubscription(namespaceName, "my-subscriber-name",
Sets.newHashSet("Proxy", "Client"));

Consumer<byte[]> consumer;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ public void testProxyAuthorization() throws Exception {
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnSubscription(namespaceName, "my-subscriber-name",
Sets.newHashSet("Proxy", "Client"));

Consumer<byte[]> consumer = proxyClient.newConsumer()
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
Expand Down Expand Up @@ -305,6 +307,8 @@ public void testTlsHostVerificationProxyToClient(boolean hostnameVerificationEna
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnSubscription(namespaceName, "my-subscriber-name",
Sets.newHashSet("Proxy", "Client"));

try {
proxyClient.newConsumer().topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
Expand Down Expand Up @@ -357,6 +361,8 @@ public void testTlsHostVerificationProxyToBroker(boolean hostnameVerificationEna
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnSubscription(namespaceName, "my-subscriber-name",
Sets.newHashSet("Proxy", "Client"));

try {
proxyClient.newConsumer().topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
Expand Down Expand Up @@ -394,6 +400,8 @@ public void tlsCiphersAndProtocols(Set<String> tlsCiphers, Set<String> tlsProtoc
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnSubscription(namespaceName, "my-subscriber-name",
Sets.newHashSet("Proxy", "Client"));

ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAuthenticationEnabled(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ public void testProxyAuthorization() throws Exception {
// excepted
admin.namespaces().grantPermissionOnNamespace(namespaceName, CLIENT_ROLE,
Sets.newHashSet(AuthAction.consume));
admin.namespaces().grantPermissionOnSubscription(namespaceName, "my-subscriber-name",
Sets.newHashSet(CLIENT_ROLE));
log.info("-- Admin permissions {} ---", admin.namespaces().getPermissions(namespaceName));
consumer = proxyClient.newConsumer()
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
Expand Down Expand Up @@ -232,6 +234,8 @@ public void testUpdatePartitionNumAndReconnect() throws Exception {
admin.topics().createPartitionedTopic(topicName, 2);
admin.topics().grantPermission(topicName, CLIENT_ROLE,
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnSubscription(namespaceName, subscriptionName,
Sets.newHashSet(CLIENT_ROLE));

Consumer<byte[]> consumer = proxyClient.newConsumer()
.topic(topicName)
Expand Down Expand Up @@ -333,9 +337,12 @@ public void testProxyAuthorizationWithPrefixSubscriptionAuthMode() throws Except
Assert.fail("should have failed with authorization error");
} catch (Exception ex) {
// excepted
String sub = CLIENT_ROLE + "-sub1";
admin.namespaces().grantPermissionOnSubscription(namespaceName, sub,
Sets.newHashSet(CLIENT_ROLE));
consumer = proxyClient.newConsumer()
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1")
.subscriptionName(CLIENT_ROLE + "-sub1").subscribe();
.subscriptionName(sub).subscribe();
}

Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
Expand Down