Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,11 @@ void testAuthentication() throws Exception {
proxyConfig.setForwardAuthorizationCredentials(true);
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
ProxyService proxyService = new ProxyService(proxyConfig, authenticationService);
@Cleanup
final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
proxyClientAuthentication.start();
ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication);

proxyService.start();
final String proxyServiceUrl = "pulsar://localhost:" + proxyService.getListenPort().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLContext;
Expand All @@ -40,7 +39,6 @@
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.KeyStoreParams;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.SecurityUtility;
Expand Down Expand Up @@ -87,12 +85,15 @@ class AdminProxyHandler extends ProxyServlet {

private final ProxyConfiguration config;
private final BrokerDiscoveryProvider discoveryProvider;
private final Authentication proxyClientAuthentication;
private final String brokerWebServiceUrl;
private final String functionWorkerWebServiceUrl;

AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) {
AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider,
Authentication proxyClientAuthentication) {
this.config = config;
this.discoveryProvider = discoveryProvider;
this.proxyClientAuthentication = proxyClientAuthentication;
this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getBrokerWebServiceURLTLS()
: config.getBrokerWebServiceURL();
this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getFunctionWorkerWebServiceURLTLS()
Expand Down Expand Up @@ -256,22 +257,13 @@ protected ContentProvider proxyRequestContent(HttpServletRequest request,
@Override
protected HttpClient newHttpClient() {
try {
Authentication auth = AuthenticationFactory.create(
config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters()
);

Objects.requireNonNull(auth, "No supported auth found for proxy");

auth.start();

if (config.isTlsEnabledWithBroker()) {
try {
X509Certificate[] trustCertificates = SecurityUtility
.loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath());

SSLContext sslCtx;
AuthenticationDataProvider authData = auth.getAuthData();
AuthenticationDataProvider authData = proxyClientAuthentication.getAuthData();
if (config.isBrokerClientTlsEnabledWithKeyStore()) {
KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : null;
sslCtx = KeyStoreSSLContext.createClientSslContext(
Expand Down Expand Up @@ -311,11 +303,6 @@ protected HttpClient newHttpClient() {
return new JettyHttpClient(contextFactory);
} catch (Exception e) {
LOG.error("new jetty http client exception ", e);
try {
auth.close();
} catch (IOException ioe) {
LOG.error("Failed to close the authentication service", ioe);
}
throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
Expand Down Expand Up @@ -114,8 +113,7 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection)

if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
try {
authData = AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters()).getAuthData();
authData = authentication.getAuthData();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.DnsResolverUtil;
Expand Down Expand Up @@ -158,7 +156,8 @@ public class ProxyService implements Closeable {
private boolean gracefulShutdown = true;

public ProxyService(ProxyConfiguration proxyConfig,
AuthenticationService authenticationService) throws Exception {
AuthenticationService authenticationService,
Authentication proxyClientAuthentication) throws Exception {
requireNonNull(proxyConfig);
this.proxyConfig = proxyConfig;
this.clientCnxs = Sets.newConcurrentHashSet();
Expand Down Expand Up @@ -207,12 +206,7 @@ public ProxyService(ProxyConfiguration proxyConfig,
});
}, 60, TimeUnit.SECONDS);
this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig);
if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
} else {
proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
}
this.proxyClientAuthentication = proxyClientAuthentication;
this.connectionController = new ConnectionController.DefaultConnectionController(
proxyConfig.getMaxConcurrentInboundConnections(),
proxyConfig.getMaxConcurrentInboundConnectionsPerIp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import io.prometheus.client.Gauge;
import io.prometheus.client.Gauge.Child;
import io.prometheus.client.hotspot.DefaultExports;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import lombok.Getter;
Expand All @@ -44,6 +46,10 @@
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -104,6 +110,9 @@ public class ProxyServiceStarter {

private ProxyConfiguration config;

@Getter
private Authentication proxyClientAuthentication;

@Getter
private ProxyService proxyService;

Expand Down Expand Up @@ -244,8 +253,27 @@ public static void main(String[] args) throws Exception {
public void start() throws Exception {
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(config));

if (config.getBrokerClientAuthenticationPlugin() != null) {
proxyClientAuthentication = AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters());
Objects.requireNonNull(proxyClientAuthentication, "No supported auth found for proxy");
try {
proxyClientAuthentication.start();
} catch (Exception e) {
try {
proxyClientAuthentication.close();
} catch (IOException ioe) {
log.error("Failed to close the authentication service", ioe);
}
throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
}
} else {
proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
}

// create proxy service
proxyService = new ProxyService(config, authenticationService);
proxyService = new ProxyService(config, authenticationService, proxyClientAuthentication);
// create a web-service
server = new WebServer(config, authenticationService);

Expand Down Expand Up @@ -293,7 +321,8 @@ public double get() {
}

AtomicReference<WebSocketService> webSocketServiceRef = new AtomicReference<>();
addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(), webSocketServiceRef);
addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(), webSocketServiceRef,
proxyClientAuthentication);
webSocketService = webSocketServiceRef.get();

// start web-service
Expand All @@ -311,6 +340,9 @@ public void close() {
if (webSocketService != null) {
webSocketService.close();
}
if (proxyClientAuthentication != null) {
proxyClientAuthentication.close();
}
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
} finally {
Expand All @@ -323,15 +355,17 @@ public void close() {
public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider) throws Exception {
addWebServerHandlers(server, config, service, discoveryProvider, null);
BrokerDiscoveryProvider discoveryProvider,
Authentication proxyClientAuthentication) throws Exception {
addWebServerHandlers(server, config, service, discoveryProvider, null, proxyClientAuthentication);
}

public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider,
AtomicReference<WebSocketService> webSocketServiceRef) throws Exception {
AtomicReference<WebSocketService> webSocketServiceRef,
Authentication proxyClientAuthentication) throws Exception {
// We can make 'status.html' publicly accessible without authentication since
// it does not contain any sensitive data.
server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(),
Expand All @@ -348,7 +382,8 @@ public static void addWebServerHandlers(WebServer server,
}
}

AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider,
proxyClientAuthentication);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
server.addServlet("/admin", servletHolder);
server.addServlet("/lookup", servletHolder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
Expand Down Expand Up @@ -121,6 +123,7 @@ public void close() {
private ProxyService proxyService;
private boolean useSeparateThreadPoolForProxyExtensions;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
private Authentication proxyClientAuthentication;

public SimpleProxyExtensionTestBase(boolean useSeparateThreadPoolForProxyExtensions) {
this.useSeparateThreadPoolForProxyExtensions = useSeparateThreadPoolForProxyExtensions;
Expand All @@ -142,8 +145,12 @@ protected void setup() throws Exception {
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setClusterName(configClusterName);

proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
proxyClientAuthentication.start();

proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication));
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
Expand Down Expand Up @@ -174,6 +181,9 @@ public void testBootstrapProtocolHandler() throws Exception {
protected void cleanup() throws Exception {
super.internalCleanup();
proxyService.close();
if (proxyClientAuthentication != null) {
proxyClientAuthentication.close();
}

if (tempDirectory != null) {
FileUtils.deleteDirectory(tempDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand All @@ -47,6 +49,8 @@ public class AdminProxyHandlerKeystoreTLSTest extends MockedPulsarServiceBaseTes

private final ProxyConfiguration proxyConfig = new ProxyConfiguration();

private Authentication proxyClientAuthentication;

private WebServer webServer;

private BrokerDiscoveryProvider discoveryProvider;
Expand Down Expand Up @@ -103,14 +107,18 @@ protected void setup() throws Exception {
KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW));
proxyConfig.setClusterName(configClusterName);

proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
proxyClientAuthentication.start();

resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
doReturn(report).when(discoveryProvider).nextBroker();
ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider));
ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
webServer.addServlet("/admin", servletHolder);
webServer.addServlet("/lookup", servletHolder);
webServer.start();
Expand All @@ -120,6 +128,9 @@ protected void setup() throws Exception {
@Override
protected void cleanup() throws Exception {
webServer.stop();
if (proxyClientAuthentication != null) {
proxyClientAuthentication.close();
}
super.internalCleanup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.client.api.Authentication;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.testng.Assert;
Expand All @@ -46,7 +47,7 @@ public void setupMocks() throws ServletException {
// given
HttpClient httpClient = mock(HttpClient.class);
adminProxyHandler = new AdminProxyHandler(mock(ProxyConfiguration.class),
mock(BrokerDiscoveryProvider.class)) {
mock(BrokerDiscoveryProvider.class), mock(Authentication.class)) {
@Override
protected HttpClient createHttpClient() throws ServletException {
return httpClient;
Expand Down
Loading