diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index dd814cf74e4a2..f1b780726489c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -71,7 +71,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener sslHandlerSupplier; @@ -303,7 +302,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), try { // init authn this.clientConf = createClientConfiguration(); - this.clientAuthentication = clientConf.getAuthentication(); int protocolVersion = getProtocolVersionToAdvertise(connect); // authn not enabled, complete @@ -427,10 +425,7 @@ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthentica ClientConfigurationData clientConf = new ClientConfigurationData(); clientConf.setServiceUrl(service.getServiceUrl()); ProxyConfiguration proxyConfig = service.getConfiguration(); - if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) { - clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), - proxyConfig.getBrokerClientAuthenticationParameters())); - } + clientConf.setAuthentication(this.getClientAuthentication()); if (proxyConfig.isTlsEnabledWithBroker()) { clientConf.setUseTls(true); if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) { @@ -463,7 +458,7 @@ long newRequestId() { } public Authentication getClientAuthentication() { - return clientAuthentication; + return service.getProxyClientAuthenticationPlugin(); } @Override diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index fe6362df0b013..af5b2a8ec9409 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -53,6 +53,9 @@ import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -69,6 +72,7 @@ public class ProxyService implements Closeable { private final ProxyConfiguration proxyConfig; + private final Authentication proxyClientAuthentication; private final Timer timer; private String serviceUrl; private String serviceUrlTls; @@ -163,6 +167,12 @@ public ProxyService(ProxyConfiguration proxyConfig, }); }, 60, TimeUnit.SECONDS); this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig); + if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) { + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + } else { + proxyClientAuthentication = AuthenticationDisabled.INSTANCE; + } } public void start() throws Exception { @@ -367,5 +377,9 @@ public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataS proxyConfig.getZookeeperSessionTimeoutMs()); } + public Authentication getProxyClientAuthenticationPlugin() { + return this.proxyClientAuthentication; + } + private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 2e74995ee1aa6..f6d53c8ec5357 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.common.policies.data.AuthAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -144,9 +145,11 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat } JsonObject element = JsonParser.parseString(commandData).getAsJsonObject(); + log.info("Have log of {}", element); long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString()); long currentTimeInMillis = System.currentTimeMillis(); if (expiryTimeInMillis < currentTimeInMillis) { + log.warn("Auth failed due to timeout"); throw new AuthenticationException("Authentication data has been expired"); } return element.get("entityType").getAsString(); @@ -198,10 +201,10 @@ void testAuthentication() throws Exception { String namespaceName = "my-property/my-ns"; String topicName = "persistent://my-property/my-ns/my-topic1"; String subscriptionName = "my-subscriber-name"; - // expires after 6 seconds - String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 6 * 1000); - // expires after 3 seconds - String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 3 * 1000); + // expires after 60 seconds + String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 60 * 1000); + // expires after 60 seconds + String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 60 * 1000); admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); @@ -230,16 +233,18 @@ void testAuthentication() throws Exception { proxyService.start(); final String proxyServiceUrl = proxyService.getServiceUrl(); - // Step 3: Pass correct client params + // Step 3: Pass correct client params and use multiple connections @Cleanup - PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1); + PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 3); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); - // Sleep for 4 seconds - wait for proxy auth params to expire - Thread.sleep(4 * 1000); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); - // Sleep for 3 seconds - wait for client auth parans to expire - Thread.sleep(3 * 1000); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); + + // Step 4: Ensure that all client contexts share the same auth provider + Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, "expect at least 3 clients"); + proxyService.getClientCnxs().stream().forEach((cnx) -> { + Assert.assertSame(cnx.authenticationProvider, proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication")); + }); } private void updateAdminClient() throws PulsarClientException {