diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index cdc57df30118a..6b9f83e45e28a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -120,7 +120,7 @@ public CompletableFuture canConsumeAsync(TopicName topicName, String ro // list is empty) Set roles = policies.get().auth_policies .getSubscriptionAuthentication().get(subscription); - if (roles != null && !roles.isEmpty() && !roles.contains(role)) { + if (roles == null || roles.isEmpty() || !roles.contains(role)) { log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription); permissionFuture.complete(false); return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index bef85174b36c1..1f5862cc45c18 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -653,7 +653,7 @@ public void checkPermissions() { return; } } catch (Exception e) { - log.warn("[{}] Get unexpected error while autorizing [{}] {}", appId, subscription.getTopicName(), + log.warn("[{}] Get unexpected error while authorizing [{}] {}", appId, subscription.getTopicName(), e.getMessage(), e); } log.info("[{}] is not allowed to consume from topic [{}] anymore", appId, subscription.getTopicName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index dcdc602985b64..82d7c33b9d23e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -202,6 +202,11 @@ public void simple() throws Exception { // tests for subscription auth mode admin.namespaces().grantPermissionOnNamespace("p1/c1/ns1", "*", EnumSet.of(AuthAction.consume)); admin.namespaces().setSubscriptionAuthMode("p1/c1/ns1", SubscriptionAuthMode.Prefix); + + admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "sub1", Sets.newHashSet("role1")); + admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "sub2", Sets.newHashSet("role2")); + admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "role1-sub1", Sets.newHashSet("role1")); + admin.namespaces().grantPermissionOnSubscription("p1/c1/ns1", "role2-sub2", Sets.newHashSet("role2")); waitForChange(); assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "role1", null)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index e346086ce25b3..b43e315039fdd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -209,9 +209,16 @@ public void testSubscriberPermission() throws Exception { .authentication(authentication)); // (1) Create subscription name - Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) - .subscribe(); - consumer.close(); + // should fail with empty subscription permission + try { + Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) + .subscribe(); + fail("should have fail with authorization exception"); + } catch (org.apache.pulsar.client.api.PulsarClientException.AuthorizationException e) { + // OK + } + superAdmin.topics().createNonPartitionedTopic(topicName); + superAdmin.topics().createSubscription(topicName, subscriptionName, MessageId.latest); // verify tenant is able to perform all subscription-admin api tenantAdmin.topics().skipAllMessages(topicName, subscriptionName); @@ -228,8 +235,8 @@ public void testSubscriberPermission() throws Exception { tenantAdmin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest); // grant namespace-level authorization to the subscriptionRole - tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, subscriptionRole, - Collections.singleton(AuthAction.consume)); + tenantAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName, + Collections.singleton(subscriptionRole)); // subscriptionRole has namespace-level authorization sub1Admin.topics().resetCursor(topicName, subscriptionName, 10); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java index cf61dac0e6b68..c8cef005ea735 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java @@ -98,6 +98,8 @@ public void testForwardAuthData() throws Exception { Sets.newHashSet(AuthAction.consume, AuthAction.produce)); admin.namespaces().grantPermissionOnNamespace(namespaceName, "client", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); + admin.namespaces().grantPermissionOnSubscription(namespaceName, subscriptionName, + Sets.newHashSet("proxy", "client")); // Step 2: Run Pulsar Proxy without forwarding authData - expect Exception ProxyConfiguration proxyConfig = new ProxyConfiguration(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java index 9ae3fbc09f3ff..ee8d94f84562b 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java @@ -193,6 +193,8 @@ public void testIncorrectRoles() throws Exception { Sets.newHashSet(AuthAction.consume, AuthAction.produce)); admin.namespaces().grantPermissionOnNamespace(namespaceName, "client", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); + admin.namespaces().grantPermissionOnSubscription(namespaceName, subscriptionName, + Sets.newHashSet("proxy", "client")); // Step 2: Try to use proxy Client as a normal Client - expect exception PulsarClient proxyClient = createPulsarClient(pulsar.getBrokerServiceUrl(), proxyAuthParams); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java index 5d05867d4fffd..4fbe6b714ae30 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java @@ -178,6 +178,8 @@ public void testProxyAuthorization() throws Exception { admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy", Sets.newHashSet(AuthAction.produce)); admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); + admin.namespaces().grantPermissionOnSubscription(namespaceName, "my-subscriber-name", + Sets.newHashSet("Proxy", "Client")); Consumer consumer; try { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java index 14c72881b2994..f3c6c4f0d02ae 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java @@ -251,6 +251,8 @@ public void testProxyAuthorization() throws Exception { Sets.newHashSet(AuthAction.consume, AuthAction.produce)); admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); + admin.namespaces().grantPermissionOnSubscription(namespaceName, "my-subscriber-name", + Sets.newHashSet("Proxy", "Client")); Consumer consumer = proxyClient.newConsumer() .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1") @@ -305,6 +307,8 @@ public void testTlsHostVerificationProxyToClient(boolean hostnameVerificationEna Sets.newHashSet(AuthAction.consume, AuthAction.produce)); admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); + admin.namespaces().grantPermissionOnSubscription(namespaceName, "my-subscriber-name", + Sets.newHashSet("Proxy", "Client")); try { proxyClient.newConsumer().topic("persistent://my-property/proxy-authorization/my-ns/my-topic1") @@ -357,6 +361,8 @@ public void testTlsHostVerificationProxyToBroker(boolean hostnameVerificationEna Sets.newHashSet(AuthAction.consume, AuthAction.produce)); admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); + admin.namespaces().grantPermissionOnSubscription(namespaceName, "my-subscriber-name", + Sets.newHashSet("Proxy", "Client")); try { proxyClient.newConsumer().topic("persistent://my-property/proxy-authorization/my-ns/my-topic1") @@ -394,6 +400,8 @@ public void tlsCiphersAndProtocols(Set tlsCiphers, Set tlsProtoc Sets.newHashSet(AuthAction.consume, AuthAction.produce)); admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); + admin.namespaces().grantPermissionOnSubscription(namespaceName, "my-subscriber-name", + Sets.newHashSet("Proxy", "Client")); ProxyConfiguration proxyConfig = new ProxyConfiguration(); proxyConfig.setAuthenticationEnabled(true); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java index 693e4ca5db9d6..cb5dee3aa0af8 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java @@ -159,6 +159,8 @@ public void testProxyAuthorization() throws Exception { // excepted admin.namespaces().grantPermissionOnNamespace(namespaceName, CLIENT_ROLE, Sets.newHashSet(AuthAction.consume)); + admin.namespaces().grantPermissionOnSubscription(namespaceName, "my-subscriber-name", + Sets.newHashSet(CLIENT_ROLE)); log.info("-- Admin permissions {} ---", admin.namespaces().getPermissions(namespaceName)); consumer = proxyClient.newConsumer() .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1") @@ -232,6 +234,8 @@ public void testUpdatePartitionNumAndReconnect() throws Exception { admin.topics().createPartitionedTopic(topicName, 2); admin.topics().grantPermission(topicName, CLIENT_ROLE, Sets.newHashSet(AuthAction.consume, AuthAction.produce)); + admin.namespaces().grantPermissionOnSubscription(namespaceName, subscriptionName, + Sets.newHashSet(CLIENT_ROLE)); Consumer consumer = proxyClient.newConsumer() .topic(topicName) @@ -333,9 +337,12 @@ public void testProxyAuthorizationWithPrefixSubscriptionAuthMode() throws Except Assert.fail("should have failed with authorization error"); } catch (Exception ex) { // excepted + String sub = CLIENT_ROLE + "-sub1"; + admin.namespaces().grantPermissionOnSubscription(namespaceName, sub, + Sets.newHashSet(CLIENT_ROLE)); consumer = proxyClient.newConsumer() .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1") - .subscriptionName(CLIENT_ROLE + "-sub1").subscribe(); + .subscriptionName(sub).subscribe(); } Producer producer = proxyClient.newProducer(Schema.BYTES)