From 1b6868a2c5dd612f5a7a1c1748b3edbc48d234e9 Mon Sep 17 00:00:00 2001 From: Addison Higham Date: Sun, 26 Sep 2021 14:51:27 -0600 Subject: [PATCH 1/3] Fix Pulsar Proxy to re-use authentication instance Currently, the Pulsar Proxy creates a new PulsarClientImpl with a new instance of the client authentication plugin. For certain client auth implementations, this can cause issues. For example, if a client plugin needs to generate a token and then cache and re-use it (which is very common with typical Pulsar client usage) this pattern breaks, because the client auth plugin is tied to the lifecycle of the connection and not the more "singleton" usage of the Pulsar client. Arguably, we should instead figure out how to re-use the entire Pulsar client, but that likely has more complexity, instead this "quick fix" will get one of the most obvious cases solved. --- .../pulsar/proxy/server/ProxyConnection.java | 9 ++------- .../apache/pulsar/proxy/server/ProxyService.java | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index dd814cf74e4a2..f1b780726489c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -71,7 +71,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener sslHandlerSupplier; @@ -303,7 +302,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), try { // init authn this.clientConf = createClientConfiguration(); - this.clientAuthentication = clientConf.getAuthentication(); int protocolVersion = getProtocolVersionToAdvertise(connect); // authn not enabled, complete @@ -427,10 +425,7 @@ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthentica ClientConfigurationData clientConf = new ClientConfigurationData(); clientConf.setServiceUrl(service.getServiceUrl()); ProxyConfiguration proxyConfig = service.getConfiguration(); - if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) { - clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), - proxyConfig.getBrokerClientAuthenticationParameters())); - } + clientConf.setAuthentication(this.getClientAuthentication()); if (proxyConfig.isTlsEnabledWithBroker()) { clientConf.setUseTls(true); if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) { @@ -463,7 +458,7 @@ long newRequestId() { } public Authentication getClientAuthentication() { - return clientAuthentication; + return service.getProxyClientAuthenticationPlugin(); } @Override diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index fe6362df0b013..af5b2a8ec9409 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -53,6 +53,9 @@ import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -69,6 +72,7 @@ public class ProxyService implements Closeable { private final ProxyConfiguration proxyConfig; + private final Authentication proxyClientAuthentication; private final Timer timer; private String serviceUrl; private String serviceUrlTls; @@ -163,6 +167,12 @@ public ProxyService(ProxyConfiguration proxyConfig, }); }, 60, TimeUnit.SECONDS); this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig); + if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) { + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + } else { + proxyClientAuthentication = AuthenticationDisabled.INSTANCE; + } } public void start() throws Exception { @@ -367,5 +377,9 @@ public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataS proxyConfig.getZookeeperSessionTimeoutMs()); } + public Authentication getProxyClientAuthenticationPlugin() { + return this.proxyClientAuthentication; + } + private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class); } From 0abf4fadf4061a2dbbc859920dc8e815bee17d82 Mon Sep 17 00:00:00 2001 From: Addison Higham Date: Wed, 6 Oct 2021 22:27:53 -0600 Subject: [PATCH 2/3] add test for ensuring all same auth instance --- .../pulsar/proxy/server/ProxyAuthenticationTest.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 2e74995ee1aa6..a72cdfc3da6b5 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.common.policies.data.AuthAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -240,6 +241,14 @@ void testAuthentication() throws Exception { // Sleep for 3 seconds - wait for client auth parans to expire Thread.sleep(3 * 1000); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); + + // Step 4: create another client and ensure that all client contexts share the same auth provider + @Cleanup + PulsarClient proxyClient2 = createPulsarClient(proxyServiceUrl, clientAuthParams, 1); + proxyClient2.newProducer(Schema.BYTES).topic(topicName).create(); + proxyService.getClientCnxs().stream().forEach((cnx) -> { + Assert.assertSame(cnx.authenticationProvider, proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication")); + }); } private void updateAdminClient() throws PulsarClientException { From 581e8af8ff4490d96c5be95e588ecbb714642f74 Mon Sep 17 00:00:00 2001 From: Addison Higham Date: Thu, 7 Oct 2021 00:04:45 -0600 Subject: [PATCH 3/3] Simplify ProxyAuthTest It isn't clear why this test was doing timeouts... it doesn't really seem to be testing anything as the auth token was not being refreshed and it appears the test was expected to pass (it almost looks like they were expected to fail?) This allows us to make this test faster and more reliable, as the timeouts don't really seem to be adding anything --- .../proxy/server/ProxyAuthenticationTest.java | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index a72cdfc3da6b5..f6d53c8ec5357 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -145,9 +145,11 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat } JsonObject element = JsonParser.parseString(commandData).getAsJsonObject(); + log.info("Have log of {}", element); long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString()); long currentTimeInMillis = System.currentTimeMillis(); if (expiryTimeInMillis < currentTimeInMillis) { + log.warn("Auth failed due to timeout"); throw new AuthenticationException("Authentication data has been expired"); } return element.get("entityType").getAsString(); @@ -199,10 +201,10 @@ void testAuthentication() throws Exception { String namespaceName = "my-property/my-ns"; String topicName = "persistent://my-property/my-ns/my-topic1"; String subscriptionName = "my-subscriber-name"; - // expires after 6 seconds - String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 6 * 1000); - // expires after 3 seconds - String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 3 * 1000); + // expires after 60 seconds + String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 60 * 1000); + // expires after 60 seconds + String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 60 * 1000); admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); @@ -231,21 +233,15 @@ void testAuthentication() throws Exception { proxyService.start(); final String proxyServiceUrl = proxyService.getServiceUrl(); - // Step 3: Pass correct client params + // Step 3: Pass correct client params and use multiple connections @Cleanup - PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1); + PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 3); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); - // Sleep for 4 seconds - wait for proxy auth params to expire - Thread.sleep(4 * 1000); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); - // Sleep for 3 seconds - wait for client auth parans to expire - Thread.sleep(3 * 1000); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); - // Step 4: create another client and ensure that all client contexts share the same auth provider - @Cleanup - PulsarClient proxyClient2 = createPulsarClient(proxyServiceUrl, clientAuthParams, 1); - proxyClient2.newProducer(Schema.BYTES).topic(topicName).create(); + // Step 4: Ensure that all client contexts share the same auth provider + Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, "expect at least 3 clients"); proxyService.getClientCnxs().stream().forEach((cnx) -> { Assert.assertSame(cnx.authenticationProvider, proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication")); });