From 99e297440b887a94e85c51ed1b22db4e22d7ec9a Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 7 Feb 2023 21:41:42 -0600 Subject: [PATCH 1/4] [fix][broker] Make authentication refresh threadsafe --- .../service/PulsarChannelInitializer.java | 14 --- .../pulsar/broker/service/ServerCnx.java | 106 +++++++++++------- .../pulsar/broker/service/ServerCnxTest.java | 14 ++- 3 files changed, 74 insertions(+), 60 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index a96625af4686c..54805756bfd41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -114,10 +114,6 @@ public PulsarChannelInitializer(PulsarService pulsar, PulsarChannelOptions opts) this.sslCtxRefresher = null; } this.brokerConf = pulsar.getConfiguration(); - - pulsar.getExecutor().scheduleAtFixedRate(safeRun(this::refreshAuthenticationCredentials), - pulsar.getConfig().getAuthenticationRefreshCheckSeconds(), - pulsar.getConfig().getAuthenticationRefreshCheckSeconds(), TimeUnit.SECONDS); } @Override @@ -152,16 +148,6 @@ protected void initChannel(SocketChannel ch) throws Exception { connections.put(ch.remoteAddress(), cnx); } - private void refreshAuthenticationCredentials() { - connections.asMap().values().forEach(cnx -> { - try { - cnx.refreshAuthenticationCredentials(); - } catch (Throwable t) { - log.warn("[{}] Failed to refresh auth credentials", cnx.clientAddress()); - } - }); - } - @VisibleForTesting protected ServerCnx newServerCnx(PulsarService pulsar, String listenerName) throws Exception { return new ServerCnx(pulsar, listenerName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d398dbba9b996..6ea637436a2e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -39,6 +39,7 @@ import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; import io.prometheus.client.Gauge; import java.io.IOException; import java.net.InetSocketAddress; @@ -199,6 +200,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { // Keep temporarily in order to verify after verifying proxy's authData private AuthData originalAuthDataCopy; private boolean pendingAuthChallengeResponse = false; + private ScheduledFuture authRefreshTask; // Max number of pending requests per connections. If multiple producers are sharing the same connection the flow // control done by a single producer might not be enough to prevent write spikes on the broker. @@ -334,6 +336,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } cnxsPerThread.get().remove(this); + if (authRefreshTask != null) { + authRefreshTask.cancel(false); + } // Connection is gone, close the producers immediately producers.forEach((__, producerFuture) -> { @@ -693,14 +698,17 @@ ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) { // complete the connect and sent newConnected command private void completeConnect(int clientProtoVersion, String clientVersion) { - if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) { - if (!isValidRoleAndOriginalPrincipal()) { - state = State.Failed; - service.getPulsarStats().recordConnectionCreateFail(); - final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles."); - NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg); - return; + if (service.isAuthenticationEnabled()) { + if (service.isAuthorizationEnabled()) { + if (!isValidRoleAndOriginalPrincipal()) { + state = State.Failed; + service.getPulsarStats().recordConnectionCreateFail(); + final ByteBuf msg = Commands.newError(-1, ServerError.AuthorizationError, "Invalid roles."); + NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg); + return; + } } + maybeScheduleAuthenticationCredentialsRefresh(); } writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize, enableSubscriptionPatternEvaluation)); state = State.Connected; @@ -799,7 +807,7 @@ public void authChallengeSuccessCallback(AuthData authChallenge, log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authMethod); } } - } catch (Exception e) { + } catch (Exception | AssertionError e) { authenticationFailed(e); } } @@ -826,7 +834,7 @@ private void authenticateOriginalData(int clientProtoVersion, String clientVersi remoteAddress, originalPrincipal); } completeConnect(clientProtoVersion, clientVersion); - } catch (Exception e) { + } catch (Exception | AssertionError e) { authenticationFailed(e); } } @@ -848,61 +856,75 @@ private void authenticationFailed(Throwable t) { NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg); } - public void refreshAuthenticationCredentials() { - AuthenticationState authState = this.originalAuthState != null ? originalAuthState : this.authState; - + /** + * Method to initialize the {@link #authRefreshTask} task. + */ + private void maybeScheduleAuthenticationCredentialsRefresh() { + assert ctx.executor().inEventLoop(); + assert authRefreshTask == null; if (authState == null) { // Authentication is disabled or there's no local state to refresh return; - } else if (getState() != State.Connected || !isActive) { + } + authRefreshTask = ctx.executor().scheduleAtFixedRate(this::refreshAuthenticationCredentials, + service.getPulsar().getConfig().getAuthenticationRefreshCheckSeconds(), + service.getPulsar().getConfig().getAuthenticationRefreshCheckSeconds(), + TimeUnit.SECONDS); + } + + private void refreshAuthenticationCredentials() { + assert ctx.executor().inEventLoop(); + AuthenticationState authState = this.originalAuthState != null ? originalAuthState : this.authState; + if (getState() != State.Connected || !isActive) { // Connection is either still being established or already closed. return; } else if (!authState.isExpired()) { // Credentials are still valid. Nothing to do at this point return; } else if (originalPrincipal != null && originalAuthState == null) { + // This case is only checked when the authState is expired because we've reached a point where + // authentication needs to be refreshed, but the protocol does not support it unless the proxy forwards + // the originalAuthData. log.info( "[{}] Cannot revalidate user credential when using proxy and" + " not forwarding the credentials. Closing connection", remoteAddress); + ctx.close(); return; } - ctx.executor().execute(SafeRun.safeRun(() -> { - log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}", - remoteAddress, originalPrincipal, this.authRole); - - if (!supportsAuthenticationRefresh()) { - log.warn("[{}] Closing connection because client doesn't support auth credentials refresh", - remoteAddress); - ctx.close(); - return; - } + if (!supportsAuthenticationRefresh()) { + log.warn("[{}] Closing connection because client doesn't support auth credentials refresh", + remoteAddress); + ctx.close(); + return; + } - if (pendingAuthChallengeResponse) { - log.warn("[{}] Closing connection after timeout on refreshing auth credentials", - remoteAddress); - ctx.close(); - return; - } + if (pendingAuthChallengeResponse) { + log.warn("[{}] Closing connection after timeout on refreshing auth credentials", + remoteAddress); + ctx.close(); + return; + } - try { - AuthData brokerData = authState.refreshAuthentication(); + log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}", + remoteAddress, originalPrincipal, this.authRole); + try { + AuthData brokerData = authState.refreshAuthentication(); - writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, - getRemoteEndpointProtocolVersion())); - if (log.isDebugEnabled()) { - log.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.", + writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, + getRemoteEndpointProtocolVersion())); + if (log.isDebugEnabled()) { + log.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.", remoteAddress, authMethod); - } + } - pendingAuthChallengeResponse = true; + pendingAuthChallengeResponse = true; - } catch (AuthenticationException e) { - log.warn("[{}] Failed to refresh authentication: {}", remoteAddress, e); - ctx.close(); - } - })); + } catch (AuthenticationException e) { + log.warn("[{}] Failed to refresh authentication: {}", remoteAddress, e); + ctx.close(); + } } private static final byte[] emptyArray = new byte[0]; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index b8032f9be3ad7..a6e71fdbb45ee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -486,10 +486,13 @@ public void testAuthChallengePrincipalChangeFails() throws Exception { when(brokerService.getAuthenticationService()).thenReturn(authenticationService); when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticationRefreshCheckSeconds(30); resetChannel(); assertTrue(channel.isActive()); assertEquals(serverCnx.getState(), State.Start); + // Don't want the keep alive task affecting which messages are handled + serverCnx.cancelKeepAliveTask(); ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", ""); channel.writeInbound(clientCommand); @@ -502,7 +505,7 @@ public void testAuthChallengePrincipalChangeFails() throws Exception { // Trigger the ServerCnx to check if authentication is expired (it is because of our special implementation) // and then force channel to run the task - serverCnx.refreshAuthenticationCredentials(); + channel.advanceTimeBy(30, TimeUnit.SECONDS); channel.runPendingTasks(); Object responseAuthChallenge1 = getResponse(); assertTrue(responseAuthChallenge1 instanceof CommandAuthChallenge); @@ -512,7 +515,7 @@ public void testAuthChallengePrincipalChangeFails() throws Exception { channel.writeInbound(authResponse1); // Trigger the ServerCnx to check if authentication is expired again - serverCnx.refreshAuthenticationCredentials(); + channel.advanceTimeBy(30, TimeUnit.SECONDS); assertTrue(channel.hasPendingTasks(), "This test assumes there are pending tasks to run."); channel.runPendingTasks(); Object responseAuthChallenge2 = getResponse(); @@ -538,10 +541,13 @@ public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception { svcConfig.setAuthenticationEnabled(true); svcConfig.setAuthenticateOriginalAuthData(true); svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); + svcConfig.setAuthenticationRefreshCheckSeconds(30); resetChannel(); assertTrue(channel.isActive()); assertEquals(serverCnx.getState(), State.Start); + // Don't want the keep alive task affecting which messages are handled + serverCnx.cancelKeepAliveTask(); ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, null, "pass.client", "pass.client", authMethodName); @@ -558,7 +564,7 @@ public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception { // Trigger the ServerCnx to check if authentication is expired (it is because of our special implementation) // and then force channel to run the task - serverCnx.refreshAuthenticationCredentials(); + channel.advanceTimeBy(30, TimeUnit.SECONDS); assertTrue(channel.hasPendingTasks(), "This test assumes there are pending tasks to run."); channel.runPendingTasks(); Object responseAuthChallenge1 = getResponse(); @@ -569,7 +575,7 @@ public void testAuthChallengeOriginalPrincipalChangeFails() throws Exception { channel.writeInbound(authResponse1); // Trigger the ServerCnx to check if authentication is expired again - serverCnx.refreshAuthenticationCredentials(); + channel.advanceTimeBy(30, TimeUnit.SECONDS); channel.runPendingTasks(); Object responseAuthChallenge2 = getResponse(); assertTrue(responseAuthChallenge2 instanceof CommandAuthChallenge); From 743d16863d709cb9e14a7fa05ea36ddb3e6fc648 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 13 Feb 2023 14:08:03 -0600 Subject: [PATCH 2/4] Improve logic state logic in refreshAuthenticationCredentials --- .../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 6ea637436a2e1..27120b3b81fc4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -875,8 +875,8 @@ private void maybeScheduleAuthenticationCredentialsRefresh() { private void refreshAuthenticationCredentials() { assert ctx.executor().inEventLoop(); AuthenticationState authState = this.originalAuthState != null ? originalAuthState : this.authState; - if (getState() != State.Connected || !isActive) { - // Connection is either still being established or already closed. + if (getState() == State.Failed) { + // Happens when an exception is thrown that causes this connection to close. return; } else if (!authState.isExpired()) { // Credentials are still valid. Nothing to do at this point From 5496c147854a7263d1d04d4aec8a97362832ec3c Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 13 Feb 2023 14:51:24 -0600 Subject: [PATCH 3/4] Remove unused imports --- .../apache/pulsar/broker/service/PulsarChannelInitializer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index 54805756bfd41..1e8e133b9fdd7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.service; -import static org.apache.bookkeeper.util.SafeRunnable.safeRun; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; @@ -31,7 +30,6 @@ import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslProvider; import java.net.SocketAddress; -import java.util.concurrent.TimeUnit; import lombok.Builder; import lombok.Data; import lombok.extern.slf4j.Slf4j; From 3425a88ca0be6d95ee73382764bd506746f2c28d Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 13 Feb 2023 14:54:02 -0600 Subject: [PATCH 4/4] Remove unused connections map from PulsarChannelInitializer --- .../broker/service/PulsarChannelInitializer.java | 14 -------------- .../broker/service/PersistentTopicE2ETest.java | 2 -- 2 files changed, 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index 1e8e133b9fdd7..5308b3c981eb4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.service; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; @@ -29,7 +27,6 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslProvider; -import java.net.SocketAddress; import lombok.Builder; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -55,15 +52,6 @@ public class PulsarChannelInitializer extends ChannelInitializer private final ServiceConfiguration brokerConf; private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder; - // This cache is used to maintain a list of active connections to iterate over them - // We keep weak references to have the cache to be auto cleaned up when the connections - // objects are GCed. - @VisibleForTesting - protected final Cache connections = Caffeine.newBuilder() - .weakKeys() - .weakValues() - .build(); - /** * @param pulsar * An instance of {@link PulsarService} @@ -142,8 +130,6 @@ protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("flowController", new FlowControlHandler()); ServerCnx cnx = newServerCnx(pulsar, listenerName); ch.pipeline().addLast("handler", cnx); - - connections.put(ch.remoteAddress(), cnx); } @VisibleForTesting diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 7506053b28d15..63f80911ae62b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1948,8 +1948,6 @@ protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().remove("handler"); PersistentTopicE2ETest.ServerCnxForTest serverCnxForTest = new PersistentTopicE2ETest.ServerCnxForTest(this.pulsar, this.opts.getListenerName()); ch.pipeline().addAfter("flowController", "testHandler", serverCnxForTest); - //override parent - connections.put(ch.remoteAddress(), serverCnxForTest); } }