From 9c6aad7d2c0d2f79ddc966c9f1d2206539d8745d Mon Sep 17 00:00:00 2001 From: Yuri Mizushima Date: Wed, 31 Jul 2024 10:49:52 +0900 Subject: [PATCH 1/3] fix: reuse authentication instance in pulsar-proxy --- .../proxy/server/AdminProxyHandler.java | 23 ++-------- .../proxy/server/DirectProxyHandler.java | 4 +- .../pulsar/proxy/server/ProxyService.java | 12 ++--- .../proxy/server/ProxyServiceStarter.java | 46 ++++++++++++++++--- 4 files changed, 49 insertions(+), 36 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index caaa99c5d40cc..0108b770249a0 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -29,7 +29,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; -import java.util.Objects; import java.util.Set; import java.util.concurrent.Executor; import javax.net.ssl.SSLContext; @@ -40,7 +39,6 @@ import org.apache.pulsar.broker.web.AuthenticationFilter; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; -import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.KeyStoreParams; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.util.SecurityUtility; @@ -87,12 +85,15 @@ class AdminProxyHandler extends ProxyServlet { private final ProxyConfiguration config; private final BrokerDiscoveryProvider discoveryProvider; + private final Authentication proxyClientAuthentication; private final String brokerWebServiceUrl; private final String functionWorkerWebServiceUrl; - AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) { + AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider, + Authentication proxyClientAuthentication) { this.config = config; this.discoveryProvider = discoveryProvider; + this.proxyClientAuthentication = proxyClientAuthentication; this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getBrokerWebServiceURLTLS() : config.getBrokerWebServiceURL(); this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getFunctionWorkerWebServiceURLTLS() @@ -256,22 +257,13 @@ protected ContentProvider proxyRequestContent(HttpServletRequest request, @Override protected HttpClient newHttpClient() { try { - Authentication auth = AuthenticationFactory.create( - config.getBrokerClientAuthenticationPlugin(), - config.getBrokerClientAuthenticationParameters() - ); - - Objects.requireNonNull(auth, "No supported auth found for proxy"); - - auth.start(); - if (config.isTlsEnabledWithBroker()) { try { X509Certificate[] trustCertificates = SecurityUtility .loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath()); SSLContext sslCtx; - AuthenticationDataProvider authData = auth.getAuthData(); + AuthenticationDataProvider authData = proxyClientAuthentication.getAuthData(); if (config.isBrokerClientTlsEnabledWithKeyStore()) { KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : null; sslCtx = KeyStoreSSLContext.createClientSslContext( @@ -311,11 +303,6 @@ protected HttpClient newHttpClient() { return new JettyHttpClient(contextFactory); } catch (Exception e) { LOG.error("new jetty http client exception ", e); - try { - auth.close(); - } catch (IOException ioe) { - LOG.error("Failed to close the authentication service", ioe); - } throw new PulsarClientException.InvalidConfigurationException(e.getMessage()); } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index d63b04b6734de..4678db82c6e55 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -52,7 +52,6 @@ import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; -import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.AuthData; @@ -114,8 +113,7 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection) if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) { try { - authData = AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(), - config.getBrokerClientAuthenticationParameters()).getAuthData(); + authData = authentication.getAuthData(); } catch (PulsarClientException e) { throw new RuntimeException(e); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index ea9e4ebfaa9b8..5cf01d6668b9b 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -64,8 +64,6 @@ import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets; import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.api.AuthenticationFactory; -import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.netty.DnsResolverUtil; @@ -158,7 +156,8 @@ public class ProxyService implements Closeable { private boolean gracefulShutdown = true; public ProxyService(ProxyConfiguration proxyConfig, - AuthenticationService authenticationService) throws Exception { + AuthenticationService authenticationService, + Authentication proxyClientAuthentication) throws Exception { requireNonNull(proxyConfig); this.proxyConfig = proxyConfig; this.clientCnxs = Sets.newConcurrentHashSet(); @@ -207,12 +206,7 @@ public ProxyService(ProxyConfiguration proxyConfig, }); }, 60, TimeUnit.SECONDS); this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig); - if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) { - proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), - proxyConfig.getBrokerClientAuthenticationParameters()); - } else { - proxyClientAuthentication = AuthenticationDisabled.INSTANCE; - } + this.proxyClientAuthentication = proxyClientAuthentication; this.connectionController = new ConnectionController.DefaultConnectionController( proxyConfig.getMaxConcurrentInboundConnections(), proxyConfig.getMaxConcurrentInboundConnectionsPerIp()); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 10121e7f5d61d..7de1b65c08e56 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -29,11 +29,13 @@ import io.prometheus.client.Gauge; import io.prometheus.client.Gauge.Child; import io.prometheus.client.hotspot.DefaultExports; +import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import lombok.Getter; @@ -44,6 +46,10 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.policies.data.ClusterData; @@ -104,6 +110,8 @@ public class ProxyServiceStarter { private ProxyConfiguration config; + private Authentication proxyClientAuthentication; + @Getter private ProxyService proxyService; @@ -244,8 +252,27 @@ public static void main(String[] args) throws Exception { public void start() throws Exception { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(config)); + + if (config.getBrokerClientAuthenticationPlugin() != null) { + proxyClientAuthentication = AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(), + config.getBrokerClientAuthenticationParameters()); + Objects.requireNonNull(proxyClientAuthentication, "No supported auth found for proxy"); + try { + proxyClientAuthentication.start(); + } catch (Exception e) { + try { + proxyClientAuthentication.close(); + } catch (IOException ioe) { + log.error("Failed to close the authentication service", ioe); + } + throw new PulsarClientException.InvalidConfigurationException(e.getMessage()); + } + } else { + proxyClientAuthentication = AuthenticationDisabled.INSTANCE; + } + // create proxy service - proxyService = new ProxyService(config, authenticationService); + proxyService = new ProxyService(config, authenticationService, proxyClientAuthentication); // create a web-service server = new WebServer(config, authenticationService); @@ -293,7 +320,8 @@ public double get() { } AtomicReference webSocketServiceRef = new AtomicReference<>(); - addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(), webSocketServiceRef); + addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(), webSocketServiceRef, + proxyClientAuthentication); webSocketService = webSocketServiceRef.get(); // start web-service @@ -311,6 +339,9 @@ public void close() { if (webSocketService != null) { webSocketService.close(); } + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } catch (Exception e) { log.warn("server couldn't stop gracefully {}", e.getMessage(), e); } finally { @@ -323,15 +354,17 @@ public void close() { public static void addWebServerHandlers(WebServer server, ProxyConfiguration config, ProxyService service, - BrokerDiscoveryProvider discoveryProvider) throws Exception { - addWebServerHandlers(server, config, service, discoveryProvider, null); + BrokerDiscoveryProvider discoveryProvider, + Authentication proxyClientAuthentication) throws Exception { + addWebServerHandlers(server, config, service, discoveryProvider, null, proxyClientAuthentication); } public static void addWebServerHandlers(WebServer server, ProxyConfiguration config, ProxyService service, BrokerDiscoveryProvider discoveryProvider, - AtomicReference webSocketServiceRef) throws Exception { + AtomicReference webSocketServiceRef, + Authentication proxyClientAuthentication) throws Exception { // We can make 'status.html' publicly accessible without authentication since // it does not contain any sensitive data. server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(), @@ -348,7 +381,8 @@ public static void addWebServerHandlers(WebServer server, } } - AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider); + AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider, + proxyClientAuthentication); ServletHolder servletHolder = new ServletHolder(adminProxyHandler); server.addServlet("/admin", servletHolder); server.addServlet("/lookup", servletHolder); From 3cb0a2e26d4be42124e9e50b891a05dc3f4f46ad Mon Sep 17 00:00:00 2001 From: Yuri Mizushima Date: Thu, 1 Aug 2024 15:44:04 +0900 Subject: [PATCH 2/3] test: fix tests to address interface changes in pulsar-proxy --- .../ProxySaslAuthenticationTest.java | 6 +- .../SimpleProxyExtensionTestBase.java | 12 +++- .../AdminProxyHandlerKeystoreTLSTest.java | 13 +++- .../proxy/server/AdminProxyHandlerTest.java | 3 +- .../server/AuthedAdminProxyHandlerTest.java | 12 +++- .../server/FunctionWorkerRoutingTest.java | 10 +++- ...nvalidProxyConfigForAuthorizationTest.java | 3 +- .../server/ProxyAdditionalServletTest.java | 15 ++++- ...roxyAuthenticatedProducerConsumerTest.java | 11 +++- .../proxy/server/ProxyAuthenticationTest.java | 7 ++- .../server/ProxyConnectionThrottlingTest.java | 11 +++- .../server/ProxyDisableZeroCopyTest.java | 2 +- .../ProxyEnableHAProxyProtocolTest.java | 12 +++- .../server/ProxyForwardAuthDataTest.java | 10 +++- .../proxy/server/ProxyIsAHttpProxyTest.java | 59 +++++++++++++++---- .../server/ProxyKeyStoreTlsTransportTest.java | 12 +++- .../server/ProxyKeyStoreTlsWithAuthTest.java | 12 +++- .../ProxyKeyStoreTlsWithoutAuthTest.java | 12 +++- .../server/ProxyLookupThrottlingTest.java | 11 +++- .../proxy/server/ProxyMutualTlsTest.java | 12 +++- .../pulsar/proxy/server/ProxyParserTest.java | 11 +++- .../server/ProxyPrometheusMetricsTest.java | 15 ++++- .../proxy/server/ProxyRefreshAuthTest.java | 12 +++- .../server/ProxyRolesEnforcementTest.java | 9 ++- .../pulsar/proxy/server/ProxyStatsTest.java | 14 ++++- .../server/ProxyStuckConnectionTest.java | 12 +++- .../apache/pulsar/proxy/server/ProxyTest.java | 14 ++++- .../pulsar/proxy/server/ProxyTlsTest.java | 12 +++- .../proxy/server/ProxyTlsWithAuthTest.java | 12 +++- .../server/ProxyWithAuthorizationNegTest.java | 10 +++- .../server/ProxyWithAuthorizationTest.java | 19 +++++- .../ProxyWithExtensibleLoadManagerTest.java | 11 +++- .../server/ProxyWithJwtAuthorizationTest.java | 18 ++++-- .../ProxyWithoutServiceDiscoveryTest.java | 11 +++- .../SuperUserAuthedAdminProxyHandlerTest.java | 12 +++- .../server/UnauthedAdminProxyHandlerTest.java | 16 ++++- 36 files changed, 395 insertions(+), 58 deletions(-) diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java index a27384c989000..ca28befabc145 100644 --- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java +++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java @@ -260,7 +260,11 @@ void testAuthentication() throws Exception { proxyConfig.setForwardAuthorizationCredentials(true); AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); - ProxyService proxyService = new ProxyService(proxyConfig, authenticationService); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication); proxyService.start(); final String proxyServiceUrl = "pulsar://localhost:" + proxyService.getListenPort().get(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java index f9ace716ecd06..050199acc496d 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java @@ -26,6 +26,8 @@ import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -121,6 +123,7 @@ public void close() { private ProxyService proxyService; private boolean useSeparateThreadPoolForProxyExtensions; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; public SimpleProxyExtensionTestBase(boolean useSeparateThreadPoolForProxyExtensions) { this.useSeparateThreadPoolForProxyExtensions = useSeparateThreadPoolForProxyExtensions; @@ -142,8 +145,12 @@ protected void setup() throws Exception { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setClusterName(configClusterName); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -174,6 +181,9 @@ public void testBootstrapProtocolHandler() throws Exception { protected void cleanup() throws Exception { super.internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } if (tempDirectory != null) { FileUtils.deleteDirectory(tempDirectory); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java index 92c644b470dcd..5995d11b33b21 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java @@ -24,6 +24,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; @@ -47,6 +49,8 @@ public class AdminProxyHandlerKeystoreTLSTest extends MockedPulsarServiceBaseTes private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; + private WebServer webServer; private BrokerDiscoveryProvider discoveryProvider; @@ -103,6 +107,10 @@ protected void setup() throws Exception { KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW)); proxyConfig.setClusterName(configClusterName); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)), registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))); webServer = new WebServer(proxyConfig, new AuthenticationService( @@ -110,7 +118,7 @@ protected void setup() throws Exception { discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null); doReturn(report).when(discoveryProvider).nextBroker(); - ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider)); + ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication)); webServer.addServlet("/admin", servletHolder); webServer.addServlet("/lookup", servletHolder); webServer.start(); @@ -120,6 +128,9 @@ protected void setup() throws Exception { @Override protected void cleanup() throws Exception { webServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } super.internalCleanup(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java index becebe0059e56..4f925618e8a79 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerTest.java @@ -32,6 +32,7 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.pulsar.client.api.Authentication; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.testng.Assert; @@ -46,7 +47,7 @@ public void setupMocks() throws ServletException { // given HttpClient httpClient = mock(HttpClient.class); adminProxyHandler = new AdminProxyHandler(mock(ProxyConfiguration.class), - mock(BrokerDiscoveryProvider.class)) { + mock(BrokerDiscoveryProvider.class), mock(Authentication.class)) { @Override protected HttpClient createHttpClient() throws ServletException { return httpClient; diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java index ef58648e35a25..97bb91d924cf8 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java @@ -32,6 +32,8 @@ import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; @@ -51,6 +53,7 @@ public class AuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { private static final Logger LOG = LoggerFactory.getLogger(AuthedAdminProxyHandlerTest.class); private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; private WebServer webServer; private BrokerDiscoveryProvider discoveryProvider; private PulsarResources resource; @@ -99,6 +102,10 @@ protected void setup() throws Exception { proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH); proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName())); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)), registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))); webServer = new WebServer(proxyConfig, new AuthenticationService( @@ -107,7 +114,7 @@ protected void setup() throws Exception { LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null); doReturn(report).when(discoveryProvider).nextBroker(); - ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider)); + ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication)); webServer.addServlet("/admin", servletHolder); webServer.addServlet("/lookup", servletHolder); @@ -119,6 +126,9 @@ protected void setup() throws Exception { @Override protected void cleanup() throws Exception { webServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } super.internalCleanup(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java index db5e9e12bd2db..a07a0f082d39a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/FunctionWorkerRoutingTest.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.proxy.server; +import lombok.Cleanup; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.testng.Assert; import org.testng.annotations.Test; @@ -37,8 +40,13 @@ public void testFunctionWorkerRedirect() throws Exception { proxyConfig.setBrokerWebServiceURL(brokerUrl); proxyConfig.setFunctionWorkerWebServiceURL(functionWorkerUrl); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + BrokerDiscoveryProvider discoveryProvider = mock(BrokerDiscoveryProvider.class); - AdminProxyHandler handler = new AdminProxyHandler(proxyConfig, discoveryProvider); + AdminProxyHandler handler = new AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication); String funcUrl = handler.rewriteTarget(buildRequest("/admin/v3/functions/test/test")); Assert.assertEquals(funcUrl, String.format("%s/admin/v3/functions/%s/%s", diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java index c29bfaa964812..b7ef0855e383c 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.fail; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -33,7 +34,7 @@ void startupShouldFailWhenAuthorizationIsEnabledWithoutAuthentication() throws E proxyConfiguration.setAuthorizationEnabled(true); proxyConfiguration.setAuthenticationEnabled(false); try (ProxyService proxyService = new ProxyService(proxyConfiguration, - Mockito.mock(AuthenticationService.class))) { + Mockito.mock(AuthenticationService.class), Mockito.mock(Authentication.class))) { proxyService.start(); fail("An exception should have been thrown"); } catch (Exception e) { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java index f61a73bbf9177..e12224da37199 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java @@ -25,6 +25,8 @@ import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader; @@ -65,6 +67,7 @@ public class ProxyAdditionalServletTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private WebServer proxyWebServer; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -83,8 +86,13 @@ protected void setup() throws Exception { // this is for nar package test // addServletNar(); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); + new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)), + proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -99,7 +107,7 @@ protected void setup() throws Exception { mockAdditionalServlet(); proxyWebServer = new WebServer(proxyConfig, authService); - ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null); + ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null, proxyClientAuthentication); proxyWebServer.start(); } @@ -180,6 +188,9 @@ protected void cleanup() throws Exception { proxyService.close(); proxyWebServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java index 4083c984d9874..2a9a9f15b4568 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java @@ -35,6 +35,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -74,6 +75,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; private final String configClusterName = "test"; @BeforeMethod @@ -139,8 +141,12 @@ protected void setup() throws Exception { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setClusterName(configClusterName); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -152,6 +158,9 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { super.internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } /** diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 662b8305c0e26..7d3cf57d594df 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -235,7 +236,11 @@ void testAuthentication() throws Exception { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); @Cleanup - ProxyService proxyService = new ProxyService(proxyConfig, authenticationService); + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + @Cleanup + ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication); proxyService.start(); final String proxyServiceUrl = proxyService.getServiceUrl(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java index 78ab9bd0d9581..671e68e5c3fb7 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java @@ -27,6 +27,8 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.limiter.ConnectionController; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -46,6 +48,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest { private final int NUM_CONCURRENT_INBOUND_CONNECTION = 4; private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -60,8 +63,11 @@ protected void setup() throws Exception { proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION); proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION); proxyConfig.setClusterName(configClusterName); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -74,6 +80,9 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java index 5ddb084e3c77f..6a3992c550fd3 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java @@ -21,7 +21,7 @@ public class ProxyDisableZeroCopyTest extends ProxyTest { @Override - protected void initializeProxyConfig() { + protected void initializeProxyConfig() throws Exception { super.initializeProxyConfig(); proxyConfig.setProxyZeroCopyModeEnabled(false); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java index 413774daf2cd1..40aa8f5040556 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java @@ -22,6 +22,8 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -48,6 +50,7 @@ public class ProxyEnableHAProxyProtocolTest extends MockedPulsarServiceBaseTest private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -62,8 +65,12 @@ protected void setup() throws Exception { proxyConfig.setHaProxyProtocolEnabled(true); proxyConfig.setClusterName(configClusterName); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -77,6 +84,9 @@ protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java index 5e969ca26e4fd..9c3a69b5f4451 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java @@ -30,6 +30,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -118,7 +120,11 @@ public void testForwardAuthData() throws Exception { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); - try (ProxyService proxyService = new ProxyService(proxyConfig, authenticationService)) { + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + try (ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication)) { proxyService.start(); try (PulsarClient proxyClient = createPulsarClient(proxyService.getServiceUrl(), clientAuthParams)) { proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); @@ -134,7 +140,7 @@ public void testForwardAuthData() throws Exception { PulsarConfigurationLoader.convertFrom(proxyConfig)); @Cleanup - ProxyService proxyService = new ProxyService(proxyConfig, authenticationService); + ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication); proxyService.start(); @Cleanup diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java index 90e15ede2f436..cf587015544b7 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java @@ -33,9 +33,12 @@ import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.core.Response; +import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.eclipse.jetty.client.HttpClient; @@ -197,10 +200,14 @@ public void testSingleRedirect() throws Exception { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get(); @@ -226,10 +233,14 @@ public void testMultipleRedirect() throws Exception { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); webServer.start(); try { Response r1 = client.target(webServer.getServiceUri()).path("/server1/foobar").request().get(); @@ -257,10 +268,14 @@ public void testTryingToUseExistingPath() throws Exception { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); } @@ -276,10 +291,14 @@ public void testLongPathInProxyTo() throws Exception { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get(); @@ -303,10 +322,14 @@ public void testProxyToEndsInSlash() throws Exception { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get(); @@ -329,10 +352,14 @@ public void testLongPath() throws Exception { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/foo/bar/blah/foobar").request().get(); @@ -354,6 +381,10 @@ public void testLongUri() throws Exception { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); StringBuilder longUri = new StringBuilder("/service3/tp"); for (int i = 10 * 1024; i > 0; i = i - 11){ @@ -362,7 +393,7 @@ public void testLongUri() throws Exception { WebServer webServerMaxUriLen8k = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen8k, proxyConfig, null, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); webServerMaxUriLen8k.start(); try { Response r = client.target(webServerMaxUriLen8k.getServiceUri()).path(longUri.toString()).request().get(); @@ -374,7 +405,7 @@ public void testLongUri() throws Exception { proxyConfig.setHttpMaxRequestHeaderSize(12 * 1024); WebServer webServerMaxUriLen12k = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen12k, proxyConfig, null, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); webServerMaxUriLen12k.start(); try { Response r = client.target(webServerMaxUriLen12k.getServiceUri()).path(longUri.toString()).request().get(); @@ -395,10 +426,14 @@ public void testPathEndsInSlash() throws Exception { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/ui/foobar").request().get(); @@ -427,10 +462,14 @@ public void testStreaming() throws Exception { ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); AuthenticationService authService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); webServer.start(); HttpClient httpClient = new HttpClient(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java index 5671c527f68f9..8aa5581a0fe46 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java @@ -24,6 +24,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -40,6 +42,7 @@ public class ProxyKeyStoreTlsTransportTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeMethod @@ -87,9 +90,13 @@ protected void setup() throws Exception { proxyConfig.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH); proxyConfig.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -103,6 +110,9 @@ protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } protected PulsarClient newClient() throws Exception { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java index 99fb8c03a819f..2c6d080bf2c0f 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java @@ -33,6 +33,8 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -54,6 +56,7 @@ public class ProxyKeyStoreTlsWithAuthTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeMethod @@ -88,9 +91,13 @@ protected void setup() throws Exception { providers.add(AuthenticationProviderTls.class.getName()); proxyConfig.setAuthenticationProviders(providers); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -104,6 +111,9 @@ protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } protected PulsarClient internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java index 1dcebda7935d7..3a20273b8c067 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java @@ -29,6 +29,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -50,6 +52,7 @@ public class ProxyKeyStoreTlsWithoutAuthTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeMethod @@ -76,8 +79,12 @@ protected void setup() throws Exception { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setClusterName(configClusterName); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -109,6 +116,9 @@ protected PulsarClient internalSetUpForClient(boolean addCertificates, String lo protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index a9017404d0e9f..4d12fdd77e763 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -31,6 +31,8 @@ import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -53,6 +55,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { private final int NUM_CONCURRENT_INBOUND_CONNECTION = 5; private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeMethod(alwaysRun = true) @@ -69,7 +72,10 @@ protected void setup() throws Exception { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -84,6 +90,9 @@ protected void cleanup() throws Exception { if (proxyService != null) { proxyService.close(); } + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test(groups = "quarantine") diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java index fae44c00ada42..ab428c31b7fd9 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java @@ -26,6 +26,8 @@ import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -48,6 +50,7 @@ public class ProxyMutualTlsTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -68,8 +71,12 @@ protected void setup() throws Exception { proxyConfig.setTlsAllowInsecureConnection(false); proxyConfig.setClusterName(configClusterName); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -83,6 +90,9 @@ protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java index 1a9459619ebe9..583ab7000e54f 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java @@ -31,6 +31,8 @@ import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -62,6 +64,7 @@ public class ProxyParserTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -75,9 +78,12 @@ protected void setup() throws Exception { proxyConfig.setClusterName(configClusterName); //enable full parsing feature proxyConfig.setProxyLogLevel(Optional.ofNullable(2)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -93,6 +99,9 @@ protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java index b692987d17af6..4dd7bc981e59b 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java @@ -42,6 +42,8 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.awaitility.Awaitility; @@ -59,6 +61,7 @@ public class ProxyPrometheusMetricsTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private WebServer proxyWebServer; private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -72,8 +75,13 @@ protected void setup() throws Exception { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setClusterName(TEST_CLUSTER); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); + new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)), + proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -86,7 +94,7 @@ protected void setup() throws Exception { PulsarConfigurationLoader.convertFrom(proxyConfig)); proxyWebServer = new WebServer(proxyConfig, authService); - ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null); + ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null, proxyClientAuthentication); proxyWebServer.start(); } @@ -109,6 +117,9 @@ protected void cleanup() throws Exception { if (proxyService != null) { proxyService.close(); } + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } /** diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java index d06cf4201ff6f..bdabfecaa439d 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java @@ -35,6 +35,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.ClientCnx; @@ -57,6 +59,7 @@ public class ProxyRefreshAuthTest extends ProducerConsumerBase { private ProxyService proxyService; private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override protected void doInitConf() throws Exception { @@ -127,9 +130,13 @@ protected void setup() throws Exception { properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); proxyConfig.setProperties(properties); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); } @AfterClass(alwaysRun = true) @@ -137,6 +144,9 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { super.internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } private void startProxy(boolean forwardAuthData) throws Exception { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java index a1ffc13ee9350..883b725e15dd2 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.Set; import javax.naming.AuthenticationException; +import lombok.Cleanup; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; @@ -35,6 +36,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -219,9 +221,14 @@ public void testIncorrectRoles() throws Exception { providers.add(BasicAuthenticationProvider.class.getName()); proxyConfig.setAuthenticationProviders(providers); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + try (ProxyService proxyService = new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))) { + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)) { proxyService.start(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java index 2866c6c26907c..86d572702f3b1 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java @@ -38,6 +38,8 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -61,6 +63,7 @@ public class ProxyStatsTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private WebServer proxyWebServer; private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -76,8 +79,12 @@ protected void setup() throws Exception { // enable full parsing feature proxyConfig.setProxyLogLevel(Optional.of(2)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); + new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -90,7 +97,7 @@ protected void setup() throws Exception { PulsarConfigurationLoader.convertFrom(proxyConfig)); proxyWebServer = new WebServer(proxyConfig, authService); - ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null); + ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null, proxyClientAuthentication); proxyWebServer.start(); } @@ -109,6 +116,9 @@ protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); proxyWebServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } /** diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java index 6e66008c15aef..30c6e45654ba0 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java @@ -28,6 +28,8 @@ import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.Message; @@ -56,6 +58,7 @@ public class ProxyStuckConnectionTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig; + private Authentication proxyClientAuthentication; private SocatContainer socatContainer; private String brokerServiceUriSocat; @@ -81,6 +84,10 @@ protected void setup() throws Exception { proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); proxyConfig.setClusterName(configClusterName); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + startProxyService(); // use the same port for subsequent restarts proxyConfig.setServicePort(proxyService.getListenPort()); @@ -88,7 +95,7 @@ protected void setup() throws Exception { private void startProxyService() throws Exception { proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig))) { + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication) { @Override protected LookupProxyHandler newLookupProxyHandler(ProxyConnection proxyConnection) { return new TestLookupProxyHandler(this, proxyConnection); @@ -107,6 +114,9 @@ protected void cleanup() throws Exception { if (proxyService != null) { proxyService.close(); } + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } if (socatContainer != null) { socatContainer.close(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index e1e49f9e8c5f2..e101eb4ff7a2b 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -38,6 +38,8 @@ import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -74,6 +76,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { protected ProxyService proxyService; protected ProxyConfiguration proxyConfig = new ProxyConfiguration(); + protected Authentication proxyClientAuthentication; @Data @ToString @@ -94,7 +97,7 @@ protected void setup() throws Exception { initializeProxyConfig(); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -102,12 +105,16 @@ protected void setup() throws Exception { proxyService.start(); } - protected void initializeProxyConfig() { + protected void initializeProxyConfig() throws Exception { proxyConfig.setServicePort(Optional.ofNullable(0)); proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setClusterName(configClusterName); + + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); } @Override @@ -116,6 +123,9 @@ protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java index 4e300d39741c3..0f0dc30b62096 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java @@ -27,6 +27,8 @@ import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -45,6 +47,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @Override @BeforeClass @@ -63,8 +66,12 @@ protected void setup() throws Exception { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setClusterName(configClusterName); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -78,6 +85,9 @@ protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java index 16f610d6d0a3a..42b5ae178d3b0 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java @@ -27,6 +27,8 @@ import org.apache.pulsar.broker.auth.MockOIDCIdentityProvider; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; @@ -38,6 +40,7 @@ public class ProxyTlsWithAuthTest extends MockedPulsarServiceBaseTest { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; private MockOIDCIdentityProvider server; @@ -75,8 +78,12 @@ protected void setup() throws Exception { " \"privateKey\":\"file://" + tempFile.getAbsolutePath() + "\"}"); proxyConfig.setClusterName(configClusterName); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -89,6 +96,9 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } server.stop(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java index cf9ad5831ec0a..92a54aa12fda2 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java @@ -34,6 +34,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -73,6 +74,7 @@ public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase { private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @BeforeMethod @Override @@ -138,7 +140,10 @@ protected void setup() throws Exception { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication)); proxyService.start(); } @@ -148,6 +153,9 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { super.internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } /** diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java index bc96c7ea51041..51f42ea077165 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -87,6 +88,7 @@ public class ProxyWithAuthorizationTest extends ProducerConsumerBase { private ProxyService proxyService; private WebServer webServer; private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @DataProvider(name = "hostnameVerification") public Object[][] hostnameVerificationCodecProvider() { @@ -230,7 +232,10 @@ protected void setup() throws Exception { AuthenticationService authService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, authService)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, authService, proxyClientAuthentication)); proxyService.setGracefulShutdown(false); webServer = new WebServer(proxyConfig, authService); } @@ -241,11 +246,14 @@ protected void cleanup() throws Exception { super.internalCleanup(); proxyService.close(); webServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } private void startProxy() throws Exception { proxyService.start(); - ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, null); + ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, null, proxyClientAuthentication); webServer.start(); } @@ -459,10 +467,15 @@ public void tlsCiphersAndProtocols(Set tlsCiphers, Set tlsProtoc proxyConfig.setTlsProtocols(tlsProtocols); proxyConfig.setTlsCiphers(tlsCiphers); + @Cleanup + final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + @Cleanup ProxyService proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); proxyService.setGracefulShutdown(false); try { proxyService.start(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java index d3c05fec721b0..3567c8264f1a3 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java @@ -49,6 +49,8 @@ import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; @@ -75,6 +77,7 @@ public class ProxyWithExtensibleLoadManagerTest extends MultiBrokerBaseTest { private static final int TEST_TIMEOUT_MS = 30_000; + private Authentication proxyClientAuthentication; private ProxyService proxyService; @Override @@ -150,8 +153,11 @@ private String getDstBrokerLookupUrl(TopicName topicName) throws Exception { @BeforeMethod(alwaysRun = true) public void proxySetup() throws Exception { var proxyConfig = initializeProxyConfig(); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) .createConfigurationMetadataStore(); @@ -163,6 +169,9 @@ public void proxyCleanup() throws Exception { if (proxyService != null) { proxyService.close(); } + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test(timeOut = TEST_TIMEOUT_MS) diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java index 5fb3e04682421..63929ee72e446 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java @@ -39,6 +39,7 @@ import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; @@ -83,6 +84,7 @@ public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase { private ProxyService proxyService; private WebServer webServer; private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @BeforeMethod @Override @@ -130,7 +132,10 @@ protected void setup() throws Exception { AuthenticationService authService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, authService)); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, authService, proxyClientAuthentication)); webServer = new WebServer(proxyConfig, authService); } @@ -140,11 +145,14 @@ protected void cleanup() throws Exception { super.internalCleanup(); proxyService.close(); webServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } private void startProxy() throws Exception { proxyService.start(); - ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, null); + ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, null, proxyClientAuthentication); webServer.start(); } @@ -425,7 +433,7 @@ void testGetStatus() throws Exception { PulsarConfigurationLoader.convertFrom(proxyConfig)); final WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); webServer.start(); @Cleanup final Client client = javax.ws.rs.client.ClientBuilder @@ -450,7 +458,7 @@ void testGetMetrics() throws Exception { proxyConfig.setAuthenticateMetricsEndpoint(false); WebServer webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); webServer.start(); @Cleanup Client client = javax.ws.rs.client.ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class)); @@ -463,7 +471,7 @@ void testGetMetrics() throws Exception { proxyConfig.setAuthenticateMetricsEndpoint(true); webServer = new WebServer(proxyConfig, authService); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, - registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); + registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)), proxyClientAuthentication); webServer.start(); try { Response r = client.target(webServer.getServiceUri()).path("/metrics").request().get(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java index 9d9490e74b5ad..885064b8e7404 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java @@ -33,6 +33,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -57,6 +58,7 @@ public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase { private static final String CLUSTER_NAME = "without-service-discovery"; private ProxyService proxyService; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; @BeforeMethod @@ -122,9 +124,13 @@ protected void setup() throws Exception { proxyConfig.setAuthenticationProviders(providers); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication)); proxyService.start(); } @@ -134,6 +140,9 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { super.internalCleanup(); proxyService.close(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } /** diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java index 57522186c8f16..71025ed484f7c 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java @@ -32,6 +32,8 @@ import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; @@ -47,6 +49,7 @@ public class SuperUserAuthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; private WebServer webServer; private BrokerDiscoveryProvider discoveryProvider; private PulsarResources resource; @@ -94,6 +97,10 @@ protected void setup() throws Exception { proxyConfig.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH); proxyConfig.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName())); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)), registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))); webServer = new WebServer(proxyConfig, new AuthenticationService( @@ -102,7 +109,7 @@ protected void setup() throws Exception { LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null); doReturn(report).when(discoveryProvider).nextBroker(); - ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider)); + ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication)); webServer.addServlet("/admin", servletHolder); webServer.addServlet("/lookup", servletHolder); @@ -114,6 +121,9 @@ protected void setup() throws Exception { @Override protected void cleanup() throws Exception { webServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } super.internalCleanup(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java index fe8b1f45385e4..0b597b933544a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java @@ -35,6 +35,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -49,6 +51,7 @@ public class UnauthedAdminProxyHandlerTest extends MockedPulsarServiceBaseTest { private final String STATUS_FILE_PATH = "./src/test/resources/vip_status.html"; private ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Authentication proxyClientAuthentication; private WebServer webServer; private BrokerDiscoveryProvider discoveryProvider; private AdminProxyWrapper adminProxyHandler; @@ -77,13 +80,17 @@ protected void setup() throws Exception { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setClusterName(configClusterName); + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + proxyClientAuthentication.start(); + webServer = new WebServer(proxyConfig, new AuthenticationService( PulsarConfigurationLoader.convertFrom(proxyConfig))); resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)), registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))); discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource))); - adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider); + adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider, proxyClientAuthentication); ServletHolder servletHolder = new ServletHolder(adminProxyHandler); webServer.addServlet("/admin", servletHolder); webServer.addServlet("/lookup", servletHolder); @@ -101,6 +108,9 @@ protected void setup() throws Exception { protected void cleanup() throws Exception { internalCleanup(); webServer.stop(); + if (proxyClientAuthentication != null) { + proxyClientAuthentication.close(); + } } @Test @@ -128,8 +138,8 @@ public void testVipStatus() throws Exception { static class AdminProxyWrapper extends AdminProxyHandler { String rewrittenUrl; - AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) { - super(config, discoveryProvider); + AdminProxyWrapper(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider, Authentication proxyClientAuthentication) { + super(config, discoveryProvider, proxyClientAuthentication); } @Override From 04df95bb44c959f79ca8c596aac11299b6c55819 Mon Sep 17 00:00:00 2001 From: Yuri Mizushima Date: Thu, 1 Aug 2024 17:59:44 +0900 Subject: [PATCH 3/3] test: add tests for org.apache.pulsar.proxy.server.ProxyServiceStarter#start --- .../proxy/server/ProxyServiceStarter.java | 1 + .../proxy/server/ProxyServiceStarterTest.java | 91 +++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 7de1b65c08e56..a5504cac100a4 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -110,6 +110,7 @@ public class ProxyServiceStarter { private ProxyConfiguration config; + @Getter private Authentication proxyClientAuthentication; @Getter diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java index 0b9b6f17d1254..d96d2cd1f6e9c 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -20,16 +20,22 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Future; +import java.util.function.Consumer; import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.websocket.data.ProducerMessage; import org.eclipse.jetty.client.HttpClient; @@ -160,4 +166,89 @@ public String getResponse() throws InterruptedException { } } + @Test + public void testProxyClientAuthentication() throws Exception { + final Consumer initConfig = (proxyConfig) -> { + proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + proxyConfig.setBrokerWebServiceURL(pulsar.getWebServiceAddress()); + proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setWebSocketServiceEnabled(true); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); + proxyConfig.setClusterName(configClusterName); + }; + + + + ProxyServiceStarter serviceStarter = new ProxyServiceStarter(ARGS, null, true); + initConfig.accept(serviceStarter.getConfig()); + // ProxyServiceStarter will throw an exception when Authentication#start is failed + serviceStarter.getConfig().setBrokerClientAuthenticationPlugin(ExceptionAuthentication1.class.getName()); + try { + serviceStarter.start(); + fail("ProxyServiceStarter should throw an exception when Authentication#start is failed"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("ExceptionAuthentication1#start")); + assertTrue(serviceStarter.getProxyClientAuthentication() instanceof ExceptionAuthentication1); + } + + serviceStarter = new ProxyServiceStarter(ARGS, null, true); + initConfig.accept(serviceStarter.getConfig()); + // ProxyServiceStarter will throw an exception when Authentication#start and Authentication#close are failed + serviceStarter.getConfig().setBrokerClientAuthenticationPlugin(ExceptionAuthentication2.class.getName()); + try { + serviceStarter.start(); + fail("ProxyServiceStarter should throw an exception when Authentication#start and Authentication#close are failed"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("ExceptionAuthentication2#start")); + assertTrue(serviceStarter.getProxyClientAuthentication() instanceof ExceptionAuthentication2); + } + } + + public static class ExceptionAuthentication1 implements Authentication { + + @Override + public String getAuthMethodName() { + return "org.apache.pulsar.proxy.server.ProxyConfigurationTest.ExceptionAuthentication1"; + } + + @Override + public void configure(Map authParams) { + // no-op + } + + @Override + public void start() throws PulsarClientException { + throw new PulsarClientException("ExceptionAuthentication1#start"); + } + + @Override + public void close() throws IOException { + // no-op + } + } + + public static class ExceptionAuthentication2 implements Authentication { + + @Override + public String getAuthMethodName() { + return "org.apache.pulsar.proxy.server.ProxyConfigurationTest.ExceptionAuthentication2"; + } + + @Override + public void configure(Map authParams) { + // no-op + } + + @Override + public void start() throws PulsarClientException { + throw new PulsarClientException("ExceptionAuthentication2#start"); + } + + @Override + public void close() throws IOException { + throw new IOException("ExceptionAuthentication2#close"); + } + } + }