errorCounts() {
return errorCounts(error);
@@ -96,6 +115,8 @@ public Struct toStruct(short version) {
struct.set(ERROR_CODE, error.code());
struct.set(ERROR_MESSAGE, errorMessage);
struct.set(SASL_AUTH_BYTES_KEY_NAME, saslAuthBytes);
+ if (version > 0)
+ struct.set(SESSION_LIFETIME_MS, sessionLifetimeMs);
return struct;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index fb74f5faa95e4..02a4261761991 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -25,6 +25,7 @@
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.ReauthenticationContext;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayer;
@@ -43,6 +44,7 @@
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.kerberos.KerberosError;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,27 +59,49 @@
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
import java.util.Set;
public class SaslClientAuthenticator implements Authenticator {
-
+ /**
+ * The internal state transitions for initial authentication of a channel are
+ * declared in order, starting with {@link #SEND_APIVERSIONS_REQUEST} and ending
+ * in either {@link #COMPLETE} or {@link #FAILED}.
+ *
+ * Re-authentication of a channel starts with the state
+ * {@link #REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE} and then flows to
+ * {@link #REAUTH_SEND_HANDSHAKE_REQUEST} followed by
+ * {@link #REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE} and then
+ * {@value #REAUTH_INITIAL}; after that the flow joins the authentication flow
+ * at the {@link #INTERMEDIATE} state and ends at either {@link #COMPLETE} or
+ * {@link #FAILED}.
+ */
public enum SaslState {
- SEND_APIVERSIONS_REQUEST, // Initial state: client sends ApiVersionsRequest in this state
- RECEIVE_APIVERSIONS_RESPONSE, // Awaiting ApiVersionsResponse from server
- SEND_HANDSHAKE_REQUEST, // Received ApiVersionsResponse, send SaslHandshake request
- RECEIVE_HANDSHAKE_RESPONSE, // Awaiting SaslHandshake request from server
- INITIAL, // Initial state starting SASL token exchange for configured mechanism, send first token
- INTERMEDIATE, // Intermediate state during SASL token exchange, process challenges and send responses
- CLIENT_COMPLETE, // Sent response to last challenge. If using SaslAuthenticate, wait for authentication status from server, else COMPLETE
- COMPLETE, // Authentication sequence complete. If using SaslAuthenticate, this state implies successful authentication.
- FAILED // Failed authentication due to an error at some stage
+ SEND_APIVERSIONS_REQUEST, // Initial state for authentication: client sends ApiVersionsRequest in this state when authenticating
+ RECEIVE_APIVERSIONS_RESPONSE, // Awaiting ApiVersionsResponse from server
+ SEND_HANDSHAKE_REQUEST, // Received ApiVersionsResponse, send SaslHandshake request
+ RECEIVE_HANDSHAKE_RESPONSE, // Awaiting SaslHandshake response from server when authenticating
+ INITIAL, // Initial authentication state starting SASL token exchange for configured mechanism, send first token
+ INTERMEDIATE, // Intermediate state during SASL token exchange, process challenges and send responses
+ CLIENT_COMPLETE, // Sent response to last challenge. If using SaslAuthenticate, wait for authentication status from server, else COMPLETE
+ COMPLETE, // Authentication sequence complete. If using SaslAuthenticate, this state implies successful authentication.
+ FAILED, // Failed authentication due to an error at some stage
+ REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE, // Initial state for re-authentication: process ApiVersionsResponse from original authentication
+ REAUTH_SEND_HANDSHAKE_REQUEST, // Processed original ApiVersionsResponse, send SaslHandshake request as part of re-authentication
+ REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE, // Awaiting SaslHandshake response from server when re-authenticating, and may receive other, in-flight responses sent prior to start of re-authentication as well
+ REAUTH_INITIAL, // Initial re-authentication state starting SASL token exchange for configured mechanism, send first token
}
private static final Logger LOG = LoggerFactory.getLogger(SaslClientAuthenticator.class);
private static final short DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER = -1;
+ private static final Random RNG = new Random();
private final Subject subject;
private final String servicePrincipal;
@@ -89,6 +113,8 @@ public enum SaslState {
private final Map configs;
private final String clientPrincipalName;
private final AuthenticateCallbackHandler callbackHandler;
+ private final Time time;
+ private final ReauthInfo reauthInfo;
// buffers used in `authenticate`
private NetworkReceive netInBuffer;
@@ -113,7 +139,8 @@ public SaslClientAuthenticator(Map configs,
String host,
String mechanism,
boolean handshakeRequestEnable,
- TransportLayer transportLayer) {
+ TransportLayer transportLayer,
+ Time time) {
this.node = node;
this.subject = subject;
this.callbackHandler = callbackHandler;
@@ -124,6 +151,8 @@ public SaslClientAuthenticator(Map configs,
this.transportLayer = transportLayer;
this.configs = configs;
this.saslAuthenticateVersion = DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER;
+ this.time = time;
+ this.reauthInfo = new ReauthInfo();
try {
setSaslState(handshakeRequestEnable ? SaslState.SEND_APIVERSIONS_REQUEST : SaslState.INITIAL);
@@ -163,7 +192,6 @@ private SaslClient createSaslClient() {
* followed by N bytes representing the opaque payload.
*/
public void authenticate() throws IOException {
- short saslHandshakeVersion = 0;
if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
return;
@@ -179,16 +207,13 @@ public void authenticate() throws IOException {
if (apiVersionsResponse == null)
break;
else {
- saslHandshakeVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion;
- ApiVersion authenticateVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id);
- if (authenticateVersion != null)
- saslAuthenticateVersion((short) Math.min(authenticateVersion.maxVersion, ApiKeys.SASL_AUTHENTICATE.latestVersion()));
+ saslAuthenticateVersion(apiVersionsResponse);
+ reauthInfo.apiVersionsResponseReceivedFromBroker = apiVersionsResponse;
setSaslState(SaslState.SEND_HANDSHAKE_REQUEST);
// Fall through to send handshake request with the latest supported version
}
case SEND_HANDSHAKE_REQUEST:
- SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(saslHandshakeVersion);
- send(handshakeRequest.toSend(node, nextRequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version())));
+ sendHandshakeRequest(reauthInfo.apiVersionsResponseReceivedFromBroker);
setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
break;
case RECEIVE_HANDSHAKE_RESPONSE:
@@ -201,7 +226,32 @@ public void authenticate() throws IOException {
// Fall through and start SASL authentication using the configured client mechanism
}
case INITIAL:
- sendSaslClientToken(new byte[0], true);
+ sendInitialToken();
+ setSaslState(SaslState.INTERMEDIATE);
+ break;
+ case REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE:
+ saslAuthenticateVersion(reauthInfo.apiVersionsResponseFromOriginalAuthentication);
+ setSaslState(SaslState.REAUTH_SEND_HANDSHAKE_REQUEST); // Will set immediately
+ // Fall through to send handshake request with the latest supported version
+ case REAUTH_SEND_HANDSHAKE_REQUEST:
+ sendHandshakeRequest(reauthInfo.apiVersionsResponseFromOriginalAuthentication);
+ setSaslState(SaslState.REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE);
+ break;
+ case REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE:
+ handshakeResponse = (SaslHandshakeResponse) receiveKafkaResponse();
+ if (handshakeResponse == null)
+ break;
+ handleSaslHandshakeResponse(handshakeResponse);
+ setSaslState(SaslState.REAUTH_INITIAL); // Will set immediately
+ /*
+ * Fall through and start SASL authentication using the configured client
+ * mechanism. Note that we have to either fall through or add a loop to enter
+ * the switch statement again. We will fall through to avoid adding the loop and
+ * therefore minimize the changes to authentication-related code due to the
+ * changes related to re-authentication.
+ */
+ case REAUTH_INITIAL:
+ sendInitialToken();
setSaslState(SaslState.INTERMEDIATE);
break;
case INTERMEDIATE:
@@ -229,6 +279,46 @@ public void authenticate() throws IOException {
}
}
+ private void sendHandshakeRequest(ApiVersionsResponse apiVersionsResponse) throws IOException {
+ SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(
+ apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion);
+ send(handshakeRequest.toSend(node, nextRequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version())));
+ }
+
+ private void sendInitialToken() throws IOException {
+ sendSaslClientToken(new byte[0], true);
+ }
+
+ @Override
+ public void reauthenticate(ReauthenticationContext reauthenticationContext) throws IOException {
+ SaslClientAuthenticator previousSaslClientAuthenticator = (SaslClientAuthenticator) Objects
+ .requireNonNull(reauthenticationContext).previousAuthenticator();
+ ApiVersionsResponse apiVersionsResponseFromOriginalAuthentication = previousSaslClientAuthenticator.reauthInfo
+ .apiVersionsResponse();
+ previousSaslClientAuthenticator.close();
+ reauthInfo.reauthenticating(apiVersionsResponseFromOriginalAuthentication,
+ reauthenticationContext.reauthenticationBeginNanos());
+ NetworkReceive netInBufferFromChannel = reauthenticationContext.networkReceive();
+ netInBuffer = netInBufferFromChannel;
+ setSaslState(SaslState.REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE); // Will set immediately
+ authenticate();
+ }
+
+ @Override
+ public List getAndClearResponsesReceivedDuringReauthentication() {
+ return reauthInfo.getAndClearResponsesReceivedDuringReauthentication();
+ }
+
+ @Override
+ public Long clientSessionReauthenticationTimeNanos() {
+ return reauthInfo.clientSessionReauthenticationTimeNanos;
+ }
+
+ @Override
+ public Long reauthenticationLatencyMs() {
+ return reauthInfo.reauthenticationLatencyMs();
+ }
+
private RequestHeader nextRequestHeader(ApiKeys apiKey, short version) {
String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
currentRequestHeader = new RequestHeader(apiKey, version, clientId, correlationId++);
@@ -241,8 +331,11 @@ protected SaslHandshakeRequest createSaslHandshakeRequest(short version) {
}
// Visible to override for testing
- protected void saslAuthenticateVersion(short version) {
- this.saslAuthenticateVersion = version;
+ protected void saslAuthenticateVersion(ApiVersionsResponse apiVersionsResponse) {
+ ApiVersion authenticateVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id);
+ if (authenticateVersion != null)
+ this.saslAuthenticateVersion = (short) Math.min(authenticateVersion.maxVersion,
+ ApiKeys.SASL_AUTHENTICATE.latestVersion());
}
private void setSaslState(SaslState saslState) {
@@ -252,8 +345,17 @@ private void setSaslState(SaslState saslState) {
this.pendingSaslState = null;
this.saslState = saslState;
LOG.debug("Set SASL client state to {}", saslState);
- if (saslState == SaslState.COMPLETE)
- transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
+ if (saslState == SaslState.COMPLETE) {
+ reauthInfo.setAuthenticationEndAndSessionReauthenticationTimes(time.nanoseconds());
+ if (!reauthInfo.reauthenticating())
+ transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
+ else
+ /*
+ * Re-authentication is triggered by a write, so we have to make sure that
+ * pending write is actually sent.
+ */
+ transportLayer.addInterestOps(SelectionKey.OP_WRITE);
+ }
}
}
@@ -337,6 +439,9 @@ private byte[] receiveToken() throws IOException {
String errMsg = response.errorMessage();
throw errMsg == null ? error.exception() : error.exception(errMsg);
}
+ long sessionLifetimeMs = response.sessionLifetimeMs();
+ if (sessionLifetimeMs > 0L)
+ reauthInfo.positiveSessionLifetimeMs = sessionLifetimeMs;
return Utils.readBytes(response.saslAuthBytes());
} else
return null;
@@ -384,6 +489,9 @@ private boolean flushNetOutBuffer() throws IOException {
}
private AbstractResponse receiveKafkaResponse() throws IOException {
+ if (netInBuffer == null)
+ netInBuffer = new NetworkReceive(node);
+ NetworkReceive receive = netInBuffer;
try {
byte[] responseBytes = receiveResponseOrToken();
if (responseBytes == null)
@@ -394,6 +502,19 @@ private AbstractResponse receiveKafkaResponse() throws IOException {
return response;
}
} catch (SchemaException | IllegalArgumentException e) {
+ /*
+ * Account for the fact that during re-authentication there may be responses
+ * arriving for requests that were sent in the past.
+ */
+ if (reauthInfo.reauthenticating()) {
+ /*
+ * It didn't match the current request header, so it must be unrelated to
+ * re-authentication. Save it so it can be processed later.
+ */
+ receive.payload().rewind();
+ reauthInfo.pendingAuthenticatedReceives.add(receive);
+ return null;
+ }
LOG.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens");
setSaslState(SaslState.FAILED);
throw new IllegalSaslStateException("Invalid SASL mechanism response, server may be expecting a different protocol", e);
@@ -436,4 +557,81 @@ static final String firstPrincipal(Subject subject) {
}
}
+ /**
+ * Information related to re-authentication
+ */
+ private static class ReauthInfo {
+ public ApiVersionsResponse apiVersionsResponseFromOriginalAuthentication;
+ public long reauthenticationBeginNanos;
+ public List pendingAuthenticatedReceives = new ArrayList<>();
+ public ApiVersionsResponse apiVersionsResponseReceivedFromBroker;
+ public Long positiveSessionLifetimeMs;
+ public long authenticationEndNanos;
+ public Long clientSessionReauthenticationTimeNanos;
+
+ public void reauthenticating(ApiVersionsResponse apiVersionsResponseFromOriginalAuthentication,
+ long reauthenticationBeginNanos) {
+ this.apiVersionsResponseFromOriginalAuthentication = Objects
+ .requireNonNull(apiVersionsResponseFromOriginalAuthentication);
+ this.reauthenticationBeginNanos = reauthenticationBeginNanos;
+ }
+
+ public boolean reauthenticating() {
+ return apiVersionsResponseFromOriginalAuthentication != null;
+ }
+
+ public ApiVersionsResponse apiVersionsResponse() {
+ return reauthenticating() ? apiVersionsResponseFromOriginalAuthentication
+ : apiVersionsResponseReceivedFromBroker;
+ }
+
+ /**
+ * Return the (always non-null but possibly empty) NetworkReceive responses that
+ * arrived during re-authentication that are unrelated to re-authentication, if
+ * any. These correspond to requests sent prior to the beginning of
+ * re-authentication; the requests were made when the channel was successfully
+ * authenticated, and the responses arrived during the re-authentication
+ * process.
+ *
+ * @return the (always non-null but possibly empty) NetworkReceive responses
+ * that arrived during re-authentication that are unrelated to
+ * re-authentication, if any
+ */
+ public List getAndClearResponsesReceivedDuringReauthentication() {
+ if (pendingAuthenticatedReceives.isEmpty())
+ return Collections.emptyList();
+ List retval = pendingAuthenticatedReceives;
+ pendingAuthenticatedReceives = new ArrayList<>();
+ return retval;
+ }
+
+ public void setAuthenticationEndAndSessionReauthenticationTimes(long nowNanos) {
+ authenticationEndNanos = nowNanos;
+ long sessionLifetimeMsToUse = 0;
+ if (positiveSessionLifetimeMs != null) {
+ // pick a random percentage between 85% and 95% for session re-authentication
+ double pctWindowFactorToTakeNetworkLatencyAndClockDriftIntoAccount = 0.85;
+ double pctWindowJitterToAvoidReauthenticationStormAcrossManyChannelsSimultaneously = 0.10;
+ double pctToUse = pctWindowFactorToTakeNetworkLatencyAndClockDriftIntoAccount + RNG.nextDouble()
+ * pctWindowJitterToAvoidReauthenticationStormAcrossManyChannelsSimultaneously;
+ sessionLifetimeMsToUse = (long) (positiveSessionLifetimeMs.longValue() * pctToUse);
+ clientSessionReauthenticationTimeNanos = authenticationEndNanos + 1000 * 1000 * sessionLifetimeMsToUse;
+ LOG.debug(
+ "Finished {} with session expiration in {} ms and session re-authentication on or after {} ms",
+ authenticationOrReauthenticationText(), positiveSessionLifetimeMs, sessionLifetimeMsToUse);
+ } else
+ LOG.debug("Finished {} with no session expiration and no session re-authentication",
+ authenticationOrReauthenticationText());
+ }
+
+ public Long reauthenticationLatencyMs() {
+ return reauthenticating()
+ ? Long.valueOf(Math.round((authenticationEndNanos - reauthenticationBeginNanos) / 1000.0 / 1000.0))
+ : null;
+ }
+
+ private String authenticationOrReauthenticationText() {
+ return reauthenticating() ? "re-authentication" : "authentication";
+ }
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslInternalConfigs.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslInternalConfigs.java
new file mode 100644
index 0000000000000..c1793ebc3192b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslInternalConfigs.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+
+public class SaslInternalConfigs {
+ /**
+ * The server (broker) specifies a positive session length in milliseconds to a
+ * SASL client when {@link BrokerSecurityConfigs#CONNECTIONS_MAX_REAUTH_MS} is
+ * positive as per KIP
+ * 368: Allow SASL Connections to Periodically Re-Authenticate. The session
+ * length is the minimum of the configured value and any session length implied
+ * by the credential presented during authentication. The lifetime defined by
+ * the credential, in terms of milliseconds since the epoch, is available via a
+ * negotiated property on the SASL Server instance, and that value can be
+ * converted to a session length by subtracting the time at which authentication
+ * occurred. This variable defines the negotiated property key that is used to
+ * communicate the credential lifetime in milliseconds since the epoch.
+ */
+ public static final String CREDENTIAL_LIFETIME_MS_SASL_NEGOTIATED_PROPERTY_KEY = "CREDENTIAL.LIFETIME.MS";
+
+ private SaslInternalConfigs() {
+ // empty
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 4db1971b3776a..2d2ca2520b180 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -30,6 +30,7 @@
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.ReauthenticationContext;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -54,6 +55,7 @@
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSCredential;
@@ -75,25 +77,40 @@
import java.nio.channels.SelectionKey;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
+import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
public class SaslServerAuthenticator implements Authenticator {
-
// GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms
static final int MAX_RECEIVE_SIZE = 524288;
private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+ /**
+ * The internal state transitions for initial authentication of a channel on the
+ * server side are declared in order, starting with {@link #INITIAL_REQUEST} and
+ * ending in either {@link #COMPLETE} or {@link #FAILED}.
+ *
+ * Re-authentication of a channel on the server side starts with the state
+ * {@link #REAUTH_PROCESS_HANDSHAKE}. It may then flow to
+ * {@link #REAUTH_BAD_MECHANISM} before a transition to {@link #FAILED} if
+ * re-authentication is attempted with a mechanism different than the original
+ * one; otherwise it joins the authentication flow at the {@link #AUTHENTICATE}
+ * state and likewise ends at either {@link #COMPLETE} or {@link #FAILED}.
+ */
private enum SaslState {
- INITIAL_REQUEST, // May be GSSAPI token, SaslHandshake or ApiVersions
+ INITIAL_REQUEST, // May be GSSAPI token, SaslHandshake or ApiVersions for authentication
HANDSHAKE_OR_VERSIONS_REQUEST, // May be SaslHandshake or ApiVersions
HANDSHAKE_REQUEST, // After an ApiVersions request, next request must be SaslHandshake
AUTHENTICATE, // Authentication tokens (SaslHandshake v1 and above indicate SaslAuthenticate headers)
COMPLETE, // Authentication completed successfully
- FAILED // Authentication failed
+ FAILED, // Authentication failed
+ REAUTH_PROCESS_HANDSHAKE, // Initial state for re-authentication, processes SASL handshake request
+ REAUTH_BAD_MECHANISM, // When re-authentication requested with wrong mechanism, generate exception
}
private final SecurityProtocol securityProtocol;
@@ -105,6 +122,9 @@ private enum SaslState {
private final Map configs;
private final KafkaPrincipalBuilder principalBuilder;
private final Map callbackHandlers;
+ private final Map connectionsMaxReauthMsByMechanism;
+ private final Time time;
+ private final ReauthInfo reauthInfo;
// Current SASL state
private SaslState saslState = SaslState.INITIAL_REQUEST;
@@ -129,7 +149,9 @@ public SaslServerAuthenticator(Map configs,
KerberosShortNamer kerberosNameParser,
ListenerName listenerName,
SecurityProtocol securityProtocol,
- TransportLayer transportLayer) {
+ TransportLayer transportLayer,
+ Map connectionsMaxReauthMsByMechanism,
+ Time time) {
this.callbackHandlers = callbackHandlers;
this.connectionId = connectionId;
this.subjects = subjects;
@@ -137,6 +159,9 @@ public SaslServerAuthenticator(Map configs,
this.securityProtocol = securityProtocol;
this.enableKafkaSaslAuthenticateHeaders = false;
this.transportLayer = transportLayer;
+ this.connectionsMaxReauthMsByMechanism = connectionsMaxReauthMsByMechanism;
+ this.time = time;
+ this.reauthInfo = new ReauthInfo();
this.configs = configs;
@SuppressWarnings("unchecked")
@@ -149,6 +174,8 @@ public SaslServerAuthenticator(Map configs,
throw new IllegalArgumentException("Callback handler not specified for SASL mechanism " + mechanism);
if (!subjects.containsKey(mechanism))
throw new IllegalArgumentException("Subject cannot be null for SASL mechanism " + mechanism);
+ LOG.debug("{} for mechanism={}: {}", BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, mechanism,
+ connectionsMaxReauthMsByMechanism.get(mechanism));
}
// Note that the old principal builder does not support SASL, so we do not need to pass the
@@ -224,52 +251,58 @@ private SaslServer createSaslKerberosServer(final AuthenticateCallbackHandler sa
*/
@Override
public void authenticate() throws IOException {
- if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
- return;
-
- if (saslServer != null && saslServer.isComplete()) {
- setSaslState(SaslState.COMPLETE);
- return;
- }
-
- if (netInBuffer == null) netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, connectionId);
-
- netInBuffer.readFrom(transportLayer);
-
- if (netInBuffer.complete()) {
+ if (saslState != SaslState.REAUTH_PROCESS_HANDSHAKE) {
+ if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
+ return;
+
+ if (saslServer != null && saslServer.isComplete()) {
+ setSaslState(SaslState.COMPLETE);
+ return;
+ }
+
+ // allocate on heap (as opposed to any socket server memory pool)
+ if (netInBuffer == null) netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, connectionId);
+
+ netInBuffer.readFrom(transportLayer);
+ if (!netInBuffer.complete())
+ return;
netInBuffer.payload().rewind();
- byte[] clientToken = new byte[netInBuffer.payload().remaining()];
- netInBuffer.payload().get(clientToken, 0, clientToken.length);
- netInBuffer = null; // reset the networkReceive as we read all the data.
- try {
- switch (saslState) {
- case HANDSHAKE_OR_VERSIONS_REQUEST:
- case HANDSHAKE_REQUEST:
- handleKafkaRequest(clientToken);
- break;
- case INITIAL_REQUEST:
- if (handleKafkaRequest(clientToken))
- break;
- // For default GSSAPI, fall through to authenticate using the client token as the first GSSAPI packet.
- // This is required for interoperability with 0.9.0.x clients which do not send handshake request
- case AUTHENTICATE:
- handleSaslToken(clientToken);
- // When the authentication exchange is complete and no more tokens are expected from the client,
- // update SASL state. Current SASL state will be updated when outgoing writes to the client complete.
- if (saslServer.isComplete())
- setSaslState(SaslState.COMPLETE);
- break;
- default:
+ }
+ byte[] clientToken = new byte[netInBuffer.payload().remaining()];
+ netInBuffer.payload().get(clientToken, 0, clientToken.length);
+ netInBuffer = null; // reset the networkReceive as we read all the data.
+ try {
+ switch (saslState) {
+ case REAUTH_PROCESS_HANDSHAKE:
+ case HANDSHAKE_OR_VERSIONS_REQUEST:
+ case HANDSHAKE_REQUEST:
+ handleKafkaRequest(clientToken);
+ break;
+ case REAUTH_BAD_MECHANISM:
+ throw new SaslAuthenticationException(reauthInfo.badMechanismErrorMessage);
+ case INITIAL_REQUEST:
+ if (handleKafkaRequest(clientToken))
break;
- }
- } catch (AuthenticationException e) {
- // Exception will be propagated after response is sent to client
- setSaslState(SaslState.FAILED, e);
- } catch (Exception e) {
- // In the case of IOExceptions and other unexpected exceptions, fail immediately
- saslState = SaslState.FAILED;
- throw e;
+ // For default GSSAPI, fall through to authenticate using the client token as the first GSSAPI packet.
+ // This is required for interoperability with 0.9.0.x clients which do not send handshake request
+ case AUTHENTICATE:
+ handleSaslToken(clientToken);
+ // When the authentication exchange is complete and no more tokens are expected from the client,
+ // update SASL state. Current SASL state will be updated when outgoing writes to the client complete.
+ if (saslServer.isComplete())
+ setSaslState(SaslState.COMPLETE);
+ break;
+ default:
+ break;
}
+ } catch (AuthenticationException e) {
+ // Exception will be propagated after response is sent to client
+ setSaslState(SaslState.FAILED, e);
+ } catch (Exception e) {
+ // In the case of IOExceptions and other unexpected exceptions, fail immediately
+ saslState = SaslState.FAILED;
+ LOG.debug("Failed during {}: {}", reauthInfo.authenticationOrReauthenticationText(), e.getMessage());
+ throw e;
}
}
@@ -301,6 +334,38 @@ public void close() throws IOException {
saslServer.dispose();
}
+ @Override
+ public void reauthenticate(ReauthenticationContext reauthenticationContext) throws IOException {
+ NetworkReceive saslHandshakeReceive = reauthenticationContext.networkReceive();
+ if (saslHandshakeReceive == null)
+ throw new IllegalArgumentException(
+ "Invalid saslHandshakeReceive in server-side re-authentication context: null");
+ SaslServerAuthenticator previousSaslServerAuthenticator = (SaslServerAuthenticator) reauthenticationContext.previousAuthenticator();
+ reauthInfo.reauthenticating(previousSaslServerAuthenticator.saslMechanism,
+ previousSaslServerAuthenticator.principal(), reauthenticationContext.reauthenticationBeginNanos());
+ previousSaslServerAuthenticator.close();
+ netInBuffer = saslHandshakeReceive;
+ LOG.debug("Beginning re-authentication: {}", this);
+ netInBuffer.payload().rewind();
+ setSaslState(SaslState.REAUTH_PROCESS_HANDSHAKE);
+ authenticate();
+ }
+
+ @Override
+ public Long serverSessionExpirationTimeNanos() {
+ return reauthInfo.sessionExpirationTimeNanos;
+ }
+
+ @Override
+ public Long reauthenticationLatencyMs() {
+ return reauthInfo.reauthenticationLatencyMs();
+ }
+
+ @Override
+ public boolean connectedClientSupportsReauthentication() {
+ return reauthInfo.connectedClientSupportsReauthentication;
+ }
+
private void setSaslState(SaslState saslState) {
setSaslState(saslState, null);
}
@@ -311,7 +376,7 @@ private void setSaslState(SaslState saslState, AuthenticationException exception
pendingException = exception;
} else {
this.saslState = saslState;
- LOG.debug("Set SASL server state to {}", saslState);
+ LOG.debug("Set SASL server state to {} during {}", saslState, reauthInfo.authenticationOrReauthenticationText());
this.pendingSaslState = null;
this.pendingException = null;
if (exception != null)
@@ -347,6 +412,8 @@ private InetAddress clientAddress() {
private void handleSaslToken(byte[] clientToken) throws IOException {
if (!enableKafkaSaslAuthenticateHeaders) {
byte[] response = saslServer.evaluateResponse(clientToken);
+ if (reauthInfo.reauthenticating() && saslServer.isComplete())
+ reauthInfo.ensurePrincipalUnchanged(principal());
if (response != null) {
netOutBuffer = new NetworkSend(connectionId, ByteBuffer.wrap(response));
flushNetOutBufferAndUpdateInterestOps();
@@ -369,13 +436,24 @@ private void handleSaslToken(byte[] clientToken) throws IOException {
// This should not normally occur since clients typically check supported versions using ApiVersionsRequest
throw new UnsupportedVersionException("Version " + version + " is not supported for apiKey " + apiKey);
}
+ /*
+ * The client sends multiple SASL_AUTHENTICATE requests, and the client is known
+ * to support the required version if any one of them indicates it supports that
+ * version.
+ */
+ if (!reauthInfo.connectedClientSupportsReauthentication)
+ reauthInfo.connectedClientSupportsReauthentication = version > 0;
SaslAuthenticateRequest saslAuthenticateRequest = (SaslAuthenticateRequest) requestAndSize.request;
try {
byte[] responseToken = saslServer.evaluateResponse(Utils.readBytes(saslAuthenticateRequest.saslAuthBytes()));
+ if (reauthInfo.reauthenticating() && saslServer.isComplete())
+ reauthInfo.ensurePrincipalUnchanged(principal());
// For versions with SASL_AUTHENTICATE header, send a response to SASL_AUTHENTICATE request even if token is empty.
ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken);
- sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE, null, responseBuf));
+ long sessionLifetimeMs = !saslServer.isComplete() ? 0L
+ : reauthInfo.calcCompletionTimesAndReturnSessionLifetimeMs();
+ sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE, null, responseBuf, sessionLifetimeMs));
} catch (SaslAuthenticationException e) {
buildResponseOnAuthenticateFailure(requestContext,
new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()));
@@ -386,7 +464,10 @@ private void handleSaslToken(byte[] clientToken) throws IOException {
// Handle retriable Kerberos exceptions as I/O exceptions rather than authentication exceptions
throw e;
} else {
- String errorMessage = "Authentication failed due to invalid credentials with SASL mechanism " + saslMechanism;
+ String errorMessage = "Authentication failed during "
+ + reauthInfo.authenticationOrReauthenticationText()
+ + " due to invalid credentials with SASL mechanism " + saslMechanism + ": "
+ + e.getMessage();
sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED,
errorMessage));
throw new SaslAuthenticationException(errorMessage, e);
@@ -414,7 +495,7 @@ private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, Auth
if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE)
throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
- LOG.debug("Handling Kafka request {}", apiKey);
+ LOG.debug("Handling Kafka request {} during {}", apiKey, reauthInfo.authenticationOrReauthenticationText());
RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(),
@@ -446,7 +527,8 @@ private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, Auth
} else
throw e;
}
- if (clientMechanism != null) {
+ if (clientMechanism != null && (!reauthInfo.reauthenticating()
+ || reauthInfo.saslMechanismUnchanged(clientMechanism))) {
createSaslServer(clientMechanism);
setSaslState(SaslState.AUTHENTICATE);
}
@@ -517,4 +599,116 @@ private void sendKafkaResponse(Send send) throws IOException {
netOutBuffer = send;
flushNetOutBufferAndUpdateInterestOps();
}
+
+ /**
+ * Information related to re-authentication
+ */
+ private class ReauthInfo {
+ public String previousSaslMechanism;
+ public KafkaPrincipal previousKafkaPrincipal;
+ public long reauthenticationBeginNanos;
+ public Long sessionExpirationTimeNanos;
+ public boolean connectedClientSupportsReauthentication;
+ public long authenticationEndNanos;
+ public String badMechanismErrorMessage;
+
+ public void reauthenticating(String previousSaslMechanism, KafkaPrincipal previousKafkaPrincipal,
+ long reauthenticationBeginNanos) {
+ this.previousSaslMechanism = Objects.requireNonNull(previousSaslMechanism);
+ this.previousKafkaPrincipal = Objects.requireNonNull(previousKafkaPrincipal);
+ this.reauthenticationBeginNanos = reauthenticationBeginNanos;
+ }
+
+ public boolean reauthenticating() {
+ return previousSaslMechanism != null;
+ }
+
+ public String authenticationOrReauthenticationText() {
+ return reauthenticating() ? "re-authentication" : "authentication";
+ }
+
+ public void ensurePrincipalUnchanged(KafkaPrincipal reauthenticatedKafkaPrincipal) throws SaslAuthenticationException {
+ if (!previousKafkaPrincipal.equals(reauthenticatedKafkaPrincipal)) {
+ throw new SaslAuthenticationException(String.format(
+ "Cannot change principals during re-authentication from %s.%s: %s.%s",
+ previousKafkaPrincipal.getPrincipalType(), previousKafkaPrincipal.getName(),
+ reauthenticatedKafkaPrincipal.getPrincipalType(), reauthenticatedKafkaPrincipal.getName()));
+ }
+ }
+
+ /*
+ * We define the REAUTH_BAD_MECHANISM state because the failed re-authentication
+ * metric does not get updated if we send back an error immediately upon the
+ * start of re-authentication.
+ */
+ public boolean saslMechanismUnchanged(String clientMechanism) {
+ if (previousSaslMechanism.equals(clientMechanism))
+ return true;
+ badMechanismErrorMessage = String.format(
+ "SASL mechanism '%s' requested by client is not supported for re-authentication of mechanism '%s'",
+ clientMechanism, previousSaslMechanism);
+ LOG.debug(badMechanismErrorMessage);
+ setSaslState(SaslState.REAUTH_BAD_MECHANISM);
+ return false;
+ }
+
+ private long calcCompletionTimesAndReturnSessionLifetimeMs() {
+ long retvalSessionLifetimeMs = 0L;
+ long authenticationEndMs = time.milliseconds();
+ authenticationEndNanos = time.nanoseconds();
+ Long credentialExpirationMs = (Long) saslServer
+ .getNegotiatedProperty(SaslInternalConfigs.CREDENTIAL_LIFETIME_MS_SASL_NEGOTIATED_PROPERTY_KEY);
+ Long connectionsMaxReauthMs = connectionsMaxReauthMsByMechanism.get(saslMechanism);
+ if (credentialExpirationMs != null || connectionsMaxReauthMs != null) {
+ if (credentialExpirationMs == null)
+ retvalSessionLifetimeMs = zeroIfNegative(connectionsMaxReauthMs.longValue());
+ else if (connectionsMaxReauthMs == null)
+ retvalSessionLifetimeMs = zeroIfNegative(credentialExpirationMs.longValue() - authenticationEndMs);
+ else
+ retvalSessionLifetimeMs = zeroIfNegative(
+ Math.min(credentialExpirationMs.longValue() - authenticationEndMs,
+ connectionsMaxReauthMs.longValue()));
+ if (retvalSessionLifetimeMs > 0L)
+ sessionExpirationTimeNanos = Long
+ .valueOf(authenticationEndNanos + 1000 * 1000 * retvalSessionLifetimeMs);
+ }
+ if (credentialExpirationMs != null) {
+ if (sessionExpirationTimeNanos != null)
+ LOG.debug(
+ "Authentication complete; session max lifetime from broker config={} ms, credential expiration={} ({} ms); session expiration = {} ({} ms), sending {} ms to client",
+ connectionsMaxReauthMs, new Date(credentialExpirationMs),
+ Long.valueOf(credentialExpirationMs.longValue() - authenticationEndMs),
+ new Date(authenticationEndMs + retvalSessionLifetimeMs), retvalSessionLifetimeMs,
+ retvalSessionLifetimeMs);
+ else
+ LOG.debug(
+ "Authentication complete; session max lifetime from broker config={} ms, credential expiration={} ({} ms); no session expiration, sending 0 ms to client",
+ connectionsMaxReauthMs, new Date(credentialExpirationMs),
+ Long.valueOf(credentialExpirationMs.longValue() - authenticationEndMs));
+ } else {
+ if (sessionExpirationTimeNanos != null)
+ LOG.debug(
+ "Authentication complete; session max lifetime from broker config={} ms, no credential expiration; session expiration = {} ({} ms), sending {} ms to client",
+ connectionsMaxReauthMs, new Date(authenticationEndMs + retvalSessionLifetimeMs),
+ retvalSessionLifetimeMs, retvalSessionLifetimeMs);
+ else
+ LOG.debug(
+ "Authentication complete; session max lifetime from broker config={} ms, no credential expiration; no session expiration, sending 0 ms to client",
+ connectionsMaxReauthMs);
+ }
+ return retvalSessionLifetimeMs;
+ }
+
+ public Long reauthenticationLatencyMs() {
+ if (!reauthenticating())
+ return null;
+ // record at least 1 ms if there is some latency
+ long latencyNanos = authenticationEndNanos - reauthenticationBeginNanos;
+ return latencyNanos == 0L ? 0L : Math.max(1L, Long.valueOf(Math.round(latencyNanos / 1000.0 / 1000.0)));
+ }
+
+ private long zeroIfNegative(long value) {
+ return Math.max(0L, value);
+ }
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
index db332b4153517..8735f49d07e15 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServer.java
@@ -32,6 +32,7 @@
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.authenticator.SaslInternalConfigs;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
@@ -118,7 +119,8 @@ public Object getNegotiatedProperty(String propName) {
throw new IllegalStateException("Authentication exchange has not completed");
if (NEGOTIATED_PROPERTY_KEY_TOKEN.equals(propName))
return tokenForNegotiatedProperty;
-
+ if (SaslInternalConfigs.CREDENTIAL_LIFETIME_MS_SASL_NEGOTIATED_PROPERTY_KEY.equals(propName))
+ return tokenForNegotiatedProperty.lifetimeMs();
return extensions.map().get(propName);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
index 217dab751632e..f6286a60f15f6 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramSaslServer.java
@@ -33,6 +33,7 @@
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.security.authenticator.SaslInternalConfigs;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.ScramCredentialCallback;
import org.apache.kafka.common.security.scram.ScramLoginModule;
@@ -74,6 +75,7 @@ enum State {
private ScramExtensions scramExtensions;
private ScramCredential scramCredential;
private String authorizationId;
+ private Long tokenExpiryTimestamp;
public ScramSaslServer(ScramMechanism mechanism, Map props, CallbackHandler callbackHandler) throws NoSuchAlgorithmException {
this.mechanism = mechanism;
@@ -115,10 +117,12 @@ public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthen
if (tokenCallback.tokenOwner() == null)
throw new SaslException("Token Authentication failed: Invalid tokenId : " + username);
this.authorizationId = tokenCallback.tokenOwner();
+ this.tokenExpiryTimestamp = tokenCallback.tokenExpiryTimestamp();
} else {
credentialCallback = new ScramCredentialCallback();
callbackHandler.handle(new Callback[]{nameCallback, credentialCallback});
this.authorizationId = username;
+ this.tokenExpiryTimestamp = null;
}
this.scramCredential = credentialCallback.scramCredential();
if (scramCredential == null)
@@ -181,7 +185,8 @@ public String getMechanismName() {
public Object getNegotiatedProperty(String propName) {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
-
+ if (SaslInternalConfigs.CREDENTIAL_LIFETIME_MS_SASL_NEGOTIATED_PROPERTY_KEY.equals(propName))
+ return tokenExpiryTimestamp; // will be null if token not used
if (SUPPORTED_EXTENSIONS.contains(propName))
return scramExtensions.map().get(propName);
else
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.java
index 63c2b17743543..1af38e9d41801 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.java
@@ -28,6 +28,7 @@
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.ScramCredentialCallback;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCredentialCallback;
@@ -58,6 +59,9 @@ else if (callback instanceof DelegationTokenCredentialCallback) {
DelegationTokenCredentialCallback tokenCallback = (DelegationTokenCredentialCallback) callback;
tokenCallback.scramCredential(tokenCache.credential(saslMechanism, username));
tokenCallback.tokenOwner(tokenCache.owner(username));
+ TokenInformation tokenInfo = tokenCache.token(username);
+ if (tokenInfo != null)
+ tokenCallback.tokenExpiryTimestamp(tokenInfo.expiryTimestamp());
} else if (callback instanceof ScramCredentialCallback) {
ScramCredentialCallback sc = (ScramCredentialCallback) callback;
sc.scramCredential(credentialCache.get(username));
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internals/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internals/DelegationTokenCredentialCallback.java
index 44409fe2d9c21..5d9eee986a1bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internals/DelegationTokenCredentialCallback.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internals/DelegationTokenCredentialCallback.java
@@ -20,6 +20,7 @@
public class DelegationTokenCredentialCallback extends ScramCredentialCallback {
private String tokenOwner;
+ private Long tokenExpiryTimestamp;
public void tokenOwner(String tokenOwner) {
this.tokenOwner = tokenOwner;
@@ -28,4 +29,12 @@ public void tokenOwner(String tokenOwner) {
public String tokenOwner() {
return tokenOwner;
}
+
+ public void tokenExpiryTimestamp(Long tokenExpiryTimestamp) {
+ this.tokenExpiryTimestamp = tokenExpiryTimestamp;
+ }
+
+ public Long tokenExpiryTimestamp() {
+ return tokenExpiryTimestamp;
+ }
}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
index e6c0eda6f4168..578020ec16a64 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -28,6 +28,7 @@
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
@@ -50,6 +51,15 @@ public static NioEchoServer createEchoServer(ListenerName listenerName, Security
return server;
}
+ public static NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol,
+ AbstractConfig serverConfig, CredentialCache credentialCache,
+ int failedAuthenticationDelayMs, Time time, DelegationTokenCache tokenCache) throws Exception {
+ NioEchoServer server = new NioEchoServer(listenerName, securityProtocol, serverConfig, "localhost",
+ null, credentialCache, failedAuthenticationDelayMs, time, tokenCache);
+ server.start();
+ return server;
+ }
+
public static Selector createSelector(ChannelBuilder channelBuilder, Time time) {
return new Selector(5000, new Metrics(), time, "MetricGroup", channelBuilder, new LogContext());
}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 68b3f9dc1124e..fc02fe6c784e1 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -22,13 +22,13 @@
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import java.io.IOException;
@@ -40,9 +40,12 @@
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
@@ -52,6 +55,19 @@
*
*/
public class NioEchoServer extends Thread {
+ public enum MetricType {
+ TOTAL, RATE, AVG, MAX;
+
+ private final String metricNameSuffix;
+
+ private MetricType() {
+ metricNameSuffix = "-" + name().toLowerCase(Locale.ROOT);
+ }
+
+ public String metricNameSuffix() {
+ return metricNameSuffix;
+ }
+ }
private static final double EPS = 0.0001;
@@ -67,7 +83,8 @@ public class NioEchoServer extends Thread {
private volatile int numSent = 0;
private volatile boolean closeKafkaChannels;
private final DelegationTokenCache tokenCache;
-
+ private final Time time;
+
public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config,
String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache, Time time) throws Exception {
this(listenerName, securityProtocol, config, serverHost, channelBuilder, credentialCache, 100, time);
@@ -76,6 +93,13 @@ public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtoco
public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config,
String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache,
int failedAuthenticationDelayMs, Time time) throws Exception {
+ this(listenerName, securityProtocol, config, serverHost, channelBuilder, credentialCache, 100, time,
+ new DelegationTokenCache(ScramMechanism.mechanismNames()));
+ }
+
+ public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config,
+ String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache,
+ int failedAuthenticationDelayMs, Time time, DelegationTokenCache tokenCache) throws Exception {
super("echoserver");
setDaemon(true);
serverSocketChannel = ServerSocketChannel.open();
@@ -85,7 +109,7 @@ public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtoco
this.socketChannels = Collections.synchronizedList(new ArrayList());
this.newChannels = Collections.synchronizedList(new ArrayList());
this.credentialCache = credentialCache;
- this.tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames());
+ this.tokenCache = tokenCache;
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
for (String mechanism : ScramMechanism.mechanismNames()) {
if (credentialCache.cache(mechanism, ScramCredential.class) == null)
@@ -93,10 +117,11 @@ public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtoco
}
}
if (channelBuilder == null)
- channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, false, securityProtocol, config, credentialCache, tokenCache);
+ channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, false, securityProtocol, config, credentialCache, tokenCache, time);
this.metrics = new Metrics();
this.selector = new Selector(10000, failedAuthenticationDelayMs, metrics, time, "MetricGroup", channelBuilder, new LogContext());
acceptorThread = new AcceptorThread();
+ this.time = time;
}
public int port() {
@@ -111,7 +136,6 @@ public DelegationTokenCache tokenCache() {
return tokenCache;
}
- @SuppressWarnings("deprecation")
public double metricValue(String name) {
for (Map.Entry entry : metrics.metrics().entrySet()) {
if (entry.getKey().name().equals(name))
@@ -122,29 +146,52 @@ public double metricValue(String name) {
public void verifyAuthenticationMetrics(int successfulAuthentications, final int failedAuthentications)
throws InterruptedException {
- waitForMetric("successful-authentication", successfulAuthentications);
- waitForMetric("failed-authentication", failedAuthentications);
+ waitForMetrics("successful-authentication", successfulAuthentications,
+ EnumSet.of(MetricType.TOTAL, MetricType.RATE));
+ waitForMetrics("failed-authentication", failedAuthentications, EnumSet.of(MetricType.TOTAL, MetricType.RATE));
+ }
+
+ public void verifyReauthenticationMetrics(int successfulReauthentications, final int failedReauthentications)
+ throws InterruptedException {
+ waitForMetrics("successful-reauthentication", successfulReauthentications,
+ EnumSet.of(MetricType.TOTAL, MetricType.RATE));
+ waitForMetrics("failed-reauthentication", failedReauthentications,
+ EnumSet.of(MetricType.TOTAL, MetricType.RATE));
+ waitForMetrics("successful-authentication-no-reauth", 0, EnumSet.of(MetricType.TOTAL));
+ waitForMetrics("reauthentication-latency", Math.signum(successfulReauthentications),
+ EnumSet.of(MetricType.MAX, MetricType.AVG));
+ }
+
+ public void verifyAuthenticationNoReauthMetric(int successfulAuthenticationNoReauths) throws InterruptedException {
+ waitForMetrics("successful-authentication-no-reauth", successfulAuthenticationNoReauths,
+ EnumSet.of(MetricType.TOTAL));
}
public void waitForMetric(String name, final double expectedValue) throws InterruptedException {
- final String totalName = name + "-total";
- final String rateName = name + "-rate";
- if (expectedValue == 0.0) {
- assertEquals(expectedValue, metricValue(totalName), EPS);
- assertEquals(expectedValue, metricValue(rateName), EPS);
- } else {
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return Math.abs(metricValue(totalName) - expectedValue) <= EPS;
- }
- }, "Metric not updated " + totalName);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return metricValue(rateName) > 0.0;
- }
- }, "Metric not updated " + rateName);
+ waitForMetrics(name, expectedValue, EnumSet.of(MetricType.TOTAL, MetricType.RATE));
+ }
+
+ public void waitForMetrics(String namePrefix, final double expectedValue, Set metricTypes)
+ throws InterruptedException {
+ long maxAggregateWaitMs = 15000;
+ long startMs = time.milliseconds();
+ for (MetricType metricType : metricTypes) {
+ long currentElapsedMs = time.milliseconds() - startMs;
+ long thisMaxWaitMs = maxAggregateWaitMs - currentElapsedMs;
+ String metricName = namePrefix + metricType.metricNameSuffix();
+ if (expectedValue == 0.0)
+ assertEquals(
+ "Metric not updated " + metricName + " expected:<" + expectedValue + "> but was:<"
+ + metricValue(metricName) + ">",
+ metricType == MetricType.MAX ? Double.NEGATIVE_INFINITY : 0d, metricValue(metricName), EPS);
+ else if (metricType == MetricType.TOTAL)
+ TestUtils.waitForCondition(() -> Math.abs(metricValue(metricName) - expectedValue) <= EPS,
+ thisMaxWaitMs, () -> "Metric not updated " + metricName + " expected:<" + expectedValue
+ + "> but was:<" + metricValue(metricName) + ">");
+ else
+ TestUtils.waitForCondition(() -> metricValue(metricName) > 0.0, thisMaxWaitMs,
+ () -> "Metric not updated " + metricName + " expected: but was:<"
+ + metricValue(metricName) + ">");
}
}
@@ -170,15 +217,17 @@ public void run() {
List completedReceives = selector.completedReceives();
for (NetworkReceive rcv : completedReceives) {
KafkaChannel channel = channel(rcv.source());
- String channelId = channel.id();
- selector.mute(channelId);
- NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
- if (outputChannel == null)
- selector.send(send);
- else {
- for (ByteBuffer buffer : send.buffers)
- outputChannel.write(buffer);
- selector.unmute(channelId);
+ if (!maybeBeginServerReauthentication(channel, rcv, time)) {
+ String channelId = channel.id();
+ selector.mute(channelId);
+ NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
+ if (outputChannel == null)
+ selector.send(send);
+ else {
+ for (ByteBuffer buffer : send.buffers)
+ outputChannel.write(buffer);
+ selector.unmute(channelId);
+ }
}
}
for (Send send : selector.completedSends()) {
@@ -195,6 +244,17 @@ public int numSent() {
return numSent;
}
+ private static boolean maybeBeginServerReauthentication(KafkaChannel channel, NetworkReceive networkReceive, Time time) {
+ try {
+ if (TestUtils.apiKeyFrom(networkReceive) == ApiKeys.SASL_HANDSHAKE) {
+ return channel.maybeBeginServerReauthentication(networkReceive, () -> time.nanoseconds());
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ return false;
+ }
+
private String id(SocketChannel channel) {
return channel.socket().getLocalAddress().getHostAddress() + ":" + channel.socket().getLocalPort() + "-" +
channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort();
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index 26cc544cd6d8b..dedd1d452538b 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -22,6 +22,7 @@
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.apache.kafka.common.utils.Time;
import org.junit.Test;
import java.util.Collections;
@@ -74,7 +75,7 @@ private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtoco
JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null);
Map jaasContexts = Collections.singletonMap("PLAIN", jaasContext);
return new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol, new ListenerName("PLAIN"),
- false, "PLAIN", true, null, null);
+ false, "PLAIN", true, null, null, Time.SYSTEM);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 1b8a5fd724515..80b24b8d106d3 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -893,7 +893,7 @@ public void testServerKeystoreDynamicUpdate() throws Exception {
TestSecurityConfig config = new TestSecurityConfig(sslServerConfigs);
ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
- false, securityProtocol, config, null, null);
+ false, securityProtocol, config, null, null, time);
server = new NioEchoServer(listenerName, securityProtocol, config,
"localhost", serverChannelBuilder, null, time);
server.start();
@@ -953,7 +953,7 @@ public void testServerTruststoreDynamicUpdate() throws Exception {
TestSecurityConfig config = new TestSecurityConfig(sslServerConfigs);
ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
- false, securityProtocol, config, null, null);
+ false, securityProtocol, config, null, null, time);
server = new NioEchoServer(listenerName, securityProtocol, config,
"localhost", serverChannelBuilder, null, time);
server.start();
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
index fc88e9ebbc82f..78b59a5b542d4 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -49,8 +49,8 @@ public void schemaVersionOutOfRange() {
*
* - Cluster actions used only for inter-broker are throttled only if unauthorized
*
- SASL_HANDSHAKE and SASL_AUTHENTICATE are not throttled when used for authentication
- * when a connection is established. At any other time, this request returns an error
- * response that may be throttled.
+ * when a connection is established or for re-authentication thereafter; these requests
+ * return an error response that may be throttled if they are sent otherwise.
*
*/
@Test
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b7f39ef9d6722..e9b36a0d13fec 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -167,6 +167,10 @@ public void testSerialization() throws Exception {
checkRequest(createSaslHandshakeRequest());
checkErrorResponse(createSaslHandshakeRequest(), new UnknownServerException());
checkResponse(createSaslHandshakeResponse(), 0);
+ checkRequest(createSaslAuthenticateRequest());
+ checkErrorResponse(createSaslAuthenticateRequest(), new UnknownServerException());
+ checkResponse(createSaslAuthenticateResponse(), 0);
+ checkResponse(createSaslAuthenticateResponse(), 1);
checkRequest(createApiVersionRequest());
checkErrorResponse(createApiVersionRequest(), new UnknownServerException());
checkResponse(createApiVersionResponse(), 0);
@@ -345,9 +349,19 @@ private void checkErrorResponse(AbstractRequest req, Throwable e) throws Excepti
private void checkRequest(AbstractRequest req) throws Exception {
// Check that we can serialize, deserialize and serialize again
// We don't check for equality or hashCode because it is likely to fail for any request containing a HashMap
+ checkRequest(req, false);
+ }
+
+ private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) throws Exception {
+ // Check that we can serialize, deserialize and serialize again
+ // Check for equality and hashCode only if indicated
Struct struct = req.toStruct();
AbstractRequest deserialized = (AbstractRequest) deserialize(req, struct, req.version());
- deserialized.toStruct();
+ Struct struct2 = deserialized.toStruct();
+ if (checkEqualityAndHashCode) {
+ assertEquals(struct, struct2);
+ assertEquals(struct.hashCode(), struct2.hashCode());
+ }
}
private void checkResponse(AbstractResponse response, int version) throws Exception {
@@ -355,7 +369,7 @@ private void checkResponse(AbstractResponse response, int version) throws Except
// We don't check for equality or hashCode because it is likely to fail for any response containing a HashMap
Struct struct = response.toStruct((short) version);
AbstractResponse deserialized = (AbstractResponse) deserialize(response, struct, (short) version);
- deserialized.toStruct((short) version);
+ Struct struct2 = deserialized.toStruct((short) version);
}
private AbstractRequestResponse deserialize(AbstractRequestResponse req, Struct struct, short version) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
@@ -975,6 +989,14 @@ private SaslHandshakeResponse createSaslHandshakeResponse() {
return new SaslHandshakeResponse(Errors.NONE, singletonList("GSSAPI"));
}
+ private SaslAuthenticateRequest createSaslAuthenticateRequest() {
+ return new SaslAuthenticateRequest(ByteBuffer.wrap(new byte[0]));
+ }
+
+ private SaslAuthenticateResponse createSaslAuthenticateResponse() {
+ return new SaslAuthenticateResponse(Errors.NONE, null, ByteBuffer.wrap(new byte[0]), Long.MAX_VALUE);
+ }
+
private ApiVersionsRequest createApiVersionRequest() {
return new ApiVersionsRequest.Builder().build();
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
index 81a883cebf98f..07cbb7856dded 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
@@ -36,6 +36,8 @@ public class TestSecurityConfig extends AbstractConfig {
Importance.MEDIUM, BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_DOC)
.define(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS,
null, Importance.MEDIUM, BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+ .define(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, Type.LONG, 0L, Importance.MEDIUM,
+ BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC)
.withClientSslSupport()
.withClientSaslSupport();
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
index c8e6edb4a442a..773fffbb287f5 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java
@@ -184,7 +184,7 @@ private void createSelector(SecurityProtocol securityProtocol, Map options = new HashMap<>();
+ String tokenId = "token1";
+ String tokenHmac = "abcdefghijkl";
+ options.put("username", tokenId); // tokenId
+ options.put("password", tokenHmac); // token hmac
+ options.put(ScramLoginModule.TOKEN_AUTH_CONFIG, "true"); // enable token authentication
+ jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
+
+ // ensure re-authentication based on token expiry rather than a default value
+ saslServerConfigs.put(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, Long.MAX_VALUE);
+ /*
+ * create a token cache that adjusts the token expiration dynamically so that
+ * the first time the expiry is read during authentication we use it to define a
+ * session expiration time that we can then sleep through; then the second time
+ * the value is read (during re-authentication) it will be in the future.
+ */
+ Function tokenLifetime = callNum -> 10 * callNum * CONNECTIONS_MAX_REAUTH_MS_VALUE;
+ DelegationTokenCache tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames()) {
+ int callNum = 0;
+
+ @Override
+ public TokenInformation token(String tokenId) {
+ TokenInformation baseTokenInfo = super.token(tokenId);
+ long thisLifetimeMs = System.currentTimeMillis() + tokenLifetime.apply(++callNum).longValue();
+ TokenInformation retvalTokenInfo = new TokenInformation(baseTokenInfo.tokenId(), baseTokenInfo.owner(),
+ baseTokenInfo.renewers(), baseTokenInfo.issueTimestamp(), thisLifetimeMs, thisLifetimeMs);
+ return retvalTokenInfo;
+ }
+ };
+ server = createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol, tokenCache);
+
+ KafkaPrincipal owner = SecurityUtils.parseKafkaPrincipal("User:Owner");
+ KafkaPrincipal renewer = SecurityUtils.parseKafkaPrincipal("User:Renewer1");
+ TokenInformation tokenInfo = new TokenInformation(tokenId, owner, Collections.singleton(renewer),
+ System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis());
+ server.tokenCache().addToken(tokenId, tokenInfo);
+ updateTokenCredentialCache(tokenId, tokenHmac);
+ // initial authentication must succeed
+ createClientConnection(securityProtocol, "0");
+ checkClientConnection("0");
+ // ensure metrics are as expected before trying to re-authenticate
+ server.verifyAuthenticationMetrics(1, 0);
+ server.verifyReauthenticationMetrics(0, 0);
+ /*
+ * Now re-authenticate and ensure it succeeds. We have to sleep long enough so
+ * that the current delegation token will be expired when the next write occurs;
+ * this will trigger a re-authentication. Then the second time the delegation
+ * token is read and transmitted to the server it will again have an expiration
+ * date in the future.
+ */
+ delay(tokenLifetime.apply(1));
+ checkClientConnection("0");
+ server.verifyReauthenticationMetrics(1, 0);
}
/**
@@ -916,6 +1028,7 @@ public void testDisabledMechanism() throws Exception {
server = createEchoServer(securityProtocol);
createAndCheckClientConnectionFailure(securityProtocol, node);
server.verifyAuthenticationMetrics(0, 1);
+ server.verifyReauthenticationMetrics(0, 0);
}
/**
@@ -931,6 +1044,7 @@ public void testInvalidMechanism() throws Exception {
server = createEchoServer(securityProtocol);
createAndCheckClientConnectionFailure(securityProtocol, node);
server.verifyAuthenticationMetrics(0, 1);
+ server.verifyReauthenticationMetrics(0, 0);
}
/**
@@ -1207,7 +1321,164 @@ public void testValidSaslOauthBearerMechanism() throws Exception {
server = createEchoServer(securityProtocol);
createAndCheckClientConnection(securityProtocol, node);
}
+
+ /**
+ * Re-authentication must fail if principal changes
+ */
+ @Test
+ public void testCannotReauthenticateWithDifferentPrincipal() throws Exception {
+ String node = "0";
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+ saslClientConfigs.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+ AlternateLoginCallbackHandler.class.getName());
+ configureMechanisms(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+ Arrays.asList(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM));
+ server = createEchoServer(securityProtocol);
+ // initial authentication must succeed
+ createClientConnection(securityProtocol, node);
+ checkClientConnection(node);
+ // ensure metrics are as expected before trying to re-authenticate
+ server.verifyAuthenticationMetrics(1, 0);
+ server.verifyReauthenticationMetrics(0, 0);
+ /*
+ * Now re-authenticate with a different principal and ensure it fails. We first
+ * have to sleep long enough for the background refresh thread to replace the
+ * original token with a new one.
+ */
+ delay(1000L);
+ try {
+ checkClientConnection(node);
+ fail("Re-authentication with a different principal should have failed but did not");
+ } catch (AssertionError e) {
+ // ignore, expected
+ server.verifyReauthenticationMetrics(0, 1);
+ }
+ }
+
+ /**
+ * Re-authentication must fail if mechanism changes
+ */
+ @Test
+ public void testCannotReauthenticateWithDifferentMechanism() throws Exception {
+ String node = "0";
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+ configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", "PLAIN"));
+ configureDigestMd5ServerCallback(securityProtocol);
+ server = createEchoServer(securityProtocol);
+
+ String saslMechanism = (String) saslClientConfigs.get(SaslConfigs.SASL_MECHANISM);
+ Map configs = new TestSecurityConfig(saslClientConfigs).values();
+ this.channelBuilder = new AlternateSaslChannelBuilder(Mode.CLIENT,
+ Collections.singletonMap(saslMechanism, JaasContext.loadClientContext(configs)), securityProtocol, null,
+ false, saslMechanism, true, credentialCache, null, time);
+ this.channelBuilder.configure(configs);
+ // initial authentication must succeed
+ this.selector = NetworkTestUtils.createSelector(channelBuilder, time);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+ checkClientConnection(node);
+ // ensure metrics are as expected before trying to re-authenticate
+ server.verifyAuthenticationMetrics(1, 0);
+ server.verifyReauthenticationMetrics(0, 0);
+ /*
+ * Now re-authenticate with a different mechanism and ensure it fails. We have
+ * to sleep long enough so that the next write will trigger a re-authentication.
+ */
+ delay((long) (CONNECTIONS_MAX_REAUTH_MS_VALUE * 1.1));
+ try {
+ checkClientConnection(node);
+ fail("Re-authentication with a different mechanism should have failed but did not");
+ } catch (AssertionError e) {
+ // ignore, expected
+ server.verifyAuthenticationMetrics(1, 0);
+ server.verifyReauthenticationMetrics(0, 1);
+ }
+ }
+
+ /**
+ * Second re-authentication must fail if it is sooner than one second after the first
+ */
+ @Test
+ public void testCannotReauthenticateAgainFasterThanOneSecond() throws Exception {
+ String node = "0";
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+ configureMechanisms(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+ Arrays.asList(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM));
+ server = createEchoServer(securityProtocol);
+ try {
+ createClientConnection(securityProtocol, node);
+ checkClientConnection(node);
+ server.verifyAuthenticationMetrics(1, 0);
+ server.verifyReauthenticationMetrics(0, 0);
+ /*
+ * Now sleep long enough so that the next write will cause re-authentication,
+ * which we expect to succeed.
+ */
+ delay((long) (CONNECTIONS_MAX_REAUTH_MS_VALUE * 1.1));
+ checkClientConnection(node);
+ server.verifyAuthenticationMetrics(1, 0);
+ server.verifyReauthenticationMetrics(1, 0);
+ /*
+ * Now sleep long enough so that the next write will cause re-authentication,
+ * but this time we expect re-authentication to not occur since it has been too
+ * soon. The checkClientConnection() call should return an error saying it
+ * expected the one byte-plus-node response but got the SaslHandshakeRequest
+ * instead
+ */
+ delay((long) (CONNECTIONS_MAX_REAUTH_MS_VALUE * 1.1));
+ NetworkTestUtils.checkClientConnection(selector, node, 1, 1);
+ fail("Expected a failure when trying to re-authenticate to quickly, but that did not occur");
+ } catch (AssertionError e) {
+ String expectedResponseTextRegex = "\\w-" + node;
+ String receivedResponseTextRegex = ".*" + OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+ assertTrue(
+ "Should have received the SaslHandshakeRequest bytes back since we re-authenticated too quickly, but instead we got our generated message echoed back, implying re-auth succeeded when it should not have",
+ e.getMessage().matches(
+ ".*\\<\\[" + expectedResponseTextRegex + "]>.*\\<\\[" + receivedResponseTextRegex + "]>"));
+ server.verifyReauthenticationMetrics(1, 0); // unchanged
+ } finally {
+ selector.close();
+ selector = null;
+ }
+ }
+ /**
+ * Tests good path SASL/PLAIN client and server channels using SSL transport layer.
+ * Repeatedly tests successful re-authentication over several seconds.
+ */
+ @Test
+ public void testRepeatedValidSaslPlainOverSsl() throws Exception {
+ String node = "0";
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
+ configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
+ /*
+ * Make sure 85% of this value is at least 1 second otherwise it is possible for
+ * the client to start re-authenticating but the server does not start due to
+ * the 1-second minimum. If this happens the SASL HANDSHAKE request that was
+ * injected to start re-authentication will be echoed back to the client instead
+ * of the data that the client explicitly sent, and then the client will not
+ * recognize that data and will throw an assertion error.
+ */
+ saslServerConfigs.put(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS,
+ new Double(1.1 * 1000L / 0.85).longValue());
+
+ server = createEchoServer(securityProtocol);
+ createClientConnection(securityProtocol, node);
+ checkClientConnection(node);
+ server.verifyAuthenticationMetrics(1, 0);
+ server.verifyReauthenticationMetrics(0, 0);
+ double successfulReauthentications = 0;
+ int desiredNumReauthentications = 5;
+ long startMs = Time.SYSTEM.milliseconds();
+ long timeoutMs = startMs + 1000 * 15; // stop after 15 seconds
+ while (successfulReauthentications < desiredNumReauthentications
+ && Time.SYSTEM.milliseconds() < timeoutMs) {
+ checkClientConnection(node);
+ successfulReauthentications = server.metricValue("successful-reauthentication-total");
+ }
+ server.verifyReauthenticationMetrics(desiredNumReauthentications, 0);
+ }
+
/**
* Tests OAUTHBEARER client channels without tokens for the server.
*/
@@ -1313,15 +1584,16 @@ private NioEchoServer startServerWithoutSaslAuthenticateHeader(final SecurityPro
if (isScram)
ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContexts,
- securityProtocol, listenerName, false, saslMechanism, true, credentialCache, null) {
+ securityProtocol, listenerName, false, saslMechanism, true, credentialCache, null, time) {
@Override
protected SaslServerAuthenticator buildServerAuthenticator(Map configs,
Map callbackHandlers,
String id,
TransportLayer transportLayer,
- Map subjects) throws IOException {
- return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, null, listenerName, securityProtocol, transportLayer) {
+ Map subjects,
+ Map connectionsMaxReauthMsByMechanism) {
+ return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, null, listenerName, securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, time) {
@Override
protected ApiVersionsResponse apiVersionsResponse() {
@@ -1359,7 +1631,7 @@ private void createClientConnectionWithoutSaslAuthenticateHeader(final SecurityP
final Map jaasContexts = Collections.singletonMap(saslMechanism, jaasContext);
SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts,
- securityProtocol, listenerName, false, saslMechanism, true, null, null) {
+ securityProtocol, listenerName, false, saslMechanism, true, null, null, time) {
@Override
protected SaslClientAuthenticator buildClientAuthenticator(Map configs,
@@ -1368,16 +1640,16 @@ protected SaslClientAuthenticator buildClientAuthenticator(Map config
String serverHost,
String servicePrincipal,
TransportLayer transportLayer,
- Subject subject) throws IOException {
+ Subject subject) {
return new SaslClientAuthenticator(configs, callbackHandler, id, subject,
- servicePrincipal, serverHost, saslMechanism, true, transportLayer) {
+ servicePrincipal, serverHost, saslMechanism, true, transportLayer, time) {
@Override
protected SaslHandshakeRequest createSaslHandshakeRequest(short version) {
return new SaslHandshakeRequest.Builder(saslMechanism).build((short) 0);
}
@Override
- protected void saslAuthenticateVersion(short version) {
+ protected void saslAuthenticateVersion(ApiVersionsResponse apiVersionsResponse) {
// Don't set version so that headers are disabled
}
};
@@ -1467,6 +1739,7 @@ private void authenticateUsingSaslPlainAndCheckConnection(String node, boolean e
private TestJaasConfig configureMechanisms(String clientMechanism, List serverMechanisms) {
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, clientMechanism);
saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, serverMechanisms);
+ saslServerConfigs.put(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, CONNECTIONS_MAX_REAUTH_MS_VALUE);
if (serverMechanisms.contains("DIGEST-MD5")) {
saslServerConfigs.put("digest-md5." + BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS,
TestDigestLoginModule.DigestServerCallbackHandler.class.getName());
@@ -1488,7 +1761,7 @@ private void createSelector(SecurityProtocol securityProtocol, Map) saslServerConfigs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG)) {
ScramMechanism scramMechanism = ScramMechanism.forMechanismName(mechanism);
@@ -1615,6 +1940,12 @@ private void updateTokenCredentialCache(String username, String password) throws
}
}
+ private static void delay(long delayMillis) throws InterruptedException {
+ final long startTime = System.currentTimeMillis();
+ while ((System.currentTimeMillis() - startTime) < delayMillis)
+ Thread.sleep(CONNECTIONS_MAX_REAUTH_MS_VALUE / 5);
+ }
+
public static class TestClientCallbackHandler implements AuthenticateCallbackHandler {
static final String USERNAME = "TestClientCallbackHandler-user";
@@ -1731,4 +2062,114 @@ public void initialize(Subject subject, CallbackHandler callbackHandler, Map 0)
+ for (Callback callback : callbacks) {
+ if (callback instanceof OAuthBearerTokenCallback) {
+ OAuthBearerTokenCallback oauthBearerTokenCallback = (OAuthBearerTokenCallback) callback;
+ OAuthBearerToken token = oauthBearerTokenCallback.token();
+ if (token != null) {
+ String changedPrincipalNameToUse = token.principalName()
+ + String.valueOf(++numInvocations);
+ String headerJson = "{" + claimOrHeaderJsonText("alg", "none") + "}";
+ /*
+ * Use a short lifetime so the background refresh thread replaces it before we
+ * re-authenticate
+ */
+ String lifetimeSecondsValueToUse = "1";
+ String claimsJson;
+ try {
+ claimsJson = String.format("{%s,%s,%s}",
+ expClaimText(Long.parseLong(lifetimeSecondsValueToUse)),
+ claimOrHeaderJsonText("iat", time.milliseconds() / 1000.0),
+ claimOrHeaderJsonText("sub", changedPrincipalNameToUse));
+ } catch (NumberFormatException e) {
+ throw new OAuthBearerConfigException(e.getMessage());
+ }
+ try {
+ Encoder urlEncoderNoPadding = Base64.getUrlEncoder().withoutPadding();
+ OAuthBearerUnsecuredJws jws = new OAuthBearerUnsecuredJws(String.format("%s.%s.",
+ urlEncoderNoPadding.encodeToString(headerJson.getBytes(StandardCharsets.UTF_8)),
+ urlEncoderNoPadding
+ .encodeToString(claimsJson.getBytes(StandardCharsets.UTF_8))),
+ "sub", "scope");
+ oauthBearerTokenCallback.token(jws);
+ } catch (OAuthBearerIllegalTokenException e) {
+ // occurs if the principal claim doesn't exist or has an empty value
+ throw new OAuthBearerConfigException(e.getMessage(), e);
+ }
+ }
+ }
+ }
+ }
+
+ private static String claimOrHeaderJsonText(String claimName, String claimValue) {
+ return QUOTE + claimName + QUOTE + ":" + QUOTE + claimValue + QUOTE;
+ }
+
+ private static String claimOrHeaderJsonText(String claimName, Number claimValue) {
+ return QUOTE + claimName + QUOTE + ":" + claimValue;
+ }
+
+ private static String expClaimText(long lifetimeSeconds) {
+ return claimOrHeaderJsonText("exp", time.milliseconds() / 1000.0 + lifetimeSeconds);
+ }
+
+ @Override
+ public void configure(Map configs, String saslMechanism,
+ List jaasConfigEntries) {
+ DELEGATE.configure(configs, saslMechanism, jaasConfigEntries);
+ }
+
+ @Override
+ public void close() {
+ DELEGATE.close();
+ }
+ }
+
+ /*
+ * Define a channel builder that starts with the DIGEST-MD5 mechanism and then
+ * switches to the PLAIN mechanism
+ */
+ private static class AlternateSaslChannelBuilder extends SaslChannelBuilder {
+ private int numInvocations = 0;
+
+ public AlternateSaslChannelBuilder(Mode mode, Map jaasContexts,
+ SecurityProtocol securityProtocol, ListenerName listenerName, boolean isInterBrokerListener,
+ String clientSaslMechanism, boolean handshakeRequestEnable, CredentialCache credentialCache,
+ DelegationTokenCache tokenCache, Time time) {
+ super(mode, jaasContexts, securityProtocol, listenerName, isInterBrokerListener, clientSaslMechanism,
+ handshakeRequestEnable, credentialCache, tokenCache, time);
+ }
+
+ @Override
+ protected SaslClientAuthenticator buildClientAuthenticator(Map configs,
+ AuthenticateCallbackHandler callbackHandler, String id, String serverHost, String servicePrincipal,
+ TransportLayer transportLayer, Subject subject) {
+ if (++numInvocations == 1)
+ return new SaslClientAuthenticator(configs, callbackHandler, id, subject, servicePrincipal, serverHost,
+ "DIGEST-MD5", true, transportLayer, time);
+ else
+ return new SaslClientAuthenticator(configs, callbackHandler, id, subject, servicePrincipal, serverHost,
+ "PLAIN", true, transportLayer, time) {
+ @Override
+ protected SaslHandshakeRequest createSaslHandshakeRequest(short version) {
+ return new SaslHandshakeRequest.Builder("PLAIN").build(version);
+ }
+ };
+ }
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index d62261ac3631a..51204f9764d16 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -28,6 +28,7 @@
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.apache.kafka.common.utils.Time;
import org.junit.Test;
import javax.security.auth.Subject;
@@ -100,7 +101,7 @@ private SaslServerAuthenticator setupAuthenticator(Map configs, Trans
Map callbackHandlers = Collections.singletonMap(
mechanism, new SaslServerCallbackHandler());
return new SaslServerAuthenticator(configs, callbackHandlers, "node", subjects, null,
- new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer);
+ new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, Collections.emptyMap(), Time.SYSTEM);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
index d6566085593fc..5d8b84c17b02c 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslServerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.security.oauthbearer.internals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
@@ -36,6 +37,7 @@
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.authenticator.SaslInternalConfigs;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
@@ -103,6 +105,15 @@ public void noAuthorizationIdSpecified() throws Exception {
assertTrue("Next challenge is not empty", nextChallenge.length == 0);
}
+ @Test
+ public void negotiatedProperty() throws Exception {
+ saslServer.evaluateResponse(clientInitialResponse(USER));
+ OAuthBearerToken token = (OAuthBearerToken) saslServer.getNegotiatedProperty("OAUTHBEARER.token");
+ assertNotNull(token);
+ assertEquals(token.lifetimeMs(),
+ saslServer.getNegotiatedProperty(SaslInternalConfigs.CREDENTIAL_LIFETIME_MS_SASL_NEGOTIATED_PROPERTY_KEY));
+ }
+
/**
* SASL Extensions that are validated by the callback handler should be accessible through the {@code #getNegotiatedProperty()} method
*/
diff --git a/clients/src/test/java/org/apache/kafka/test/TestCondition.java b/clients/src/test/java/org/apache/kafka/test/TestCondition.java
index 9087ad41a9b2e..e7b78cf71e6a7 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestCondition.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestCondition.java
@@ -20,6 +20,7 @@
* Interface to wrap actions that are required to wait until a condition is met
* for testing purposes. Note that this is not intended to do any assertions.
*/
+@FunctionalInterface
public interface TestCondition {
boolean conditionMet();
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 3ab2bce8f73ad..afb342bb1512c 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -21,7 +21,10 @@
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +49,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.concurrent.Future;
+import java.util.function.Supplier;
import java.util.concurrent.ExecutionException;
import static java.util.Arrays.asList;
@@ -253,7 +257,14 @@ public static Properties consumerConfig(final String bootstrapServers, final Cla
* uses default value of 15 seconds for timeout
*/
public static void waitForCondition(final TestCondition testCondition, final String conditionDetails) throws InterruptedException {
- waitForCondition(testCondition, DEFAULT_MAX_WAIT_MS, conditionDetails);
+ waitForCondition(testCondition, DEFAULT_MAX_WAIT_MS, () -> conditionDetails);
+ }
+
+ /**
+ * uses default value of 15 seconds for timeout
+ */
+ public static void waitForCondition(final TestCondition testCondition, final Supplier conditionDetailsSupplier) throws InterruptedException {
+ waitForCondition(testCondition, DEFAULT_MAX_WAIT_MS, conditionDetailsSupplier);
}
/**
@@ -263,6 +274,16 @@ public static void waitForCondition(final TestCondition testCondition, final Str
* avoid transient failures due to slow or overloaded machines.
*/
public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs, String conditionDetails) throws InterruptedException {
+ waitForCondition(testCondition, maxWaitMs, () -> conditionDetails);
+ }
+
+ /**
+ * Wait for condition to be met for at most {@code maxWaitMs} and throw assertion failure otherwise.
+ * This should be used instead of {@code Thread.sleep} whenever possible as it allows a longer timeout to be used
+ * without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to
+ * avoid transient failures due to slow or overloaded machines.
+ */
+ public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs, Supplier conditionDetailsSupplier) throws InterruptedException {
final long startTime = System.currentTimeMillis();
boolean testConditionMet;
@@ -274,7 +295,8 @@ public static void waitForCondition(final TestCondition testCondition, final lon
// could be avoided by making the implementations more robust, but we have a large number of such implementations
// and it's easier to simply avoid the issue altogether)
if (!testConditionMet) {
- conditionDetails = conditionDetails != null ? conditionDetails : "";
+ String conditionDetailsSupplied = conditionDetailsSupplier != null ? conditionDetailsSupplier.get() : null;
+ String conditionDetails = conditionDetailsSupplied != null ? conditionDetailsSupplied : "";
throw new AssertionError("Condition not met within timeout " + maxWaitMs + ". " + conditionDetails);
}
}
@@ -356,4 +378,8 @@ public static void assertFutureError(Future> future, Class extends Throwable
exceptionClass, cause.getClass());
}
}
+
+ public static ApiKeys apiKeyFrom(NetworkReceive networkReceive) {
+ return RequestHeader.parse(networkReceive.payload().duplicate()).apiKey();
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 5725ff5f55421..cb62fb6c99d6b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -101,7 +101,7 @@ public WorkerGroupMember(DistributedConfig config,
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0);
String metricGrpPrefix = "connect";
- ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
+ ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
this.metadata,
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index aaa09035b506a..5e0f7c87f5934 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -432,7 +432,7 @@ object AdminClient {
val time = Time.SYSTEM
val metrics = new Metrics(time)
val metadata = new Metadata(100L, 60 * 60 * 1000L, true)
- val channelBuilder = ClientUtils.createChannelBuilder(config)
+ val channelBuilder = ClientUtils.createChannelBuilder(config, time)
val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)
val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 7002219efd259..85da8b8c0b2c8 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -116,6 +116,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
config,
config.interBrokerListenerName,
config.saslMechanismInterBrokerProtocol,
+ time,
config.saslInterBrokerHandshakeRequestEnable
)
val selector = new Selector(
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index bd25d94e91625..f49fa7b2a34d9 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -51,6 +51,7 @@ object TransactionMarkerChannelManager {
config,
config.interBrokerListenerName,
config.saslMechanismInterBrokerProtocol,
+ time,
config.saslInterBrokerHandshakeRequestEnable
)
val selector = new Selector(
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 1365f90f7636f..b5b3e4d0ec84e 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -23,6 +23,7 @@ import java.nio.channels._
import java.nio.channels.{Selector => NSelector}
import java.util.concurrent._
import java.util.concurrent.atomic._
+import java.util.function.Supplier
import com.yammer.metrics.core.Gauge
import kafka.cluster.{BrokerEndPoint, EndPoint}
@@ -35,8 +36,10 @@ import org.apache.kafka.common.{KafkaException, Reconfigurable}
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.Meter
+import org.apache.kafka.common.metrics.stats.Total
import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
+import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
@@ -117,6 +120,19 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def value = memoryPool.size() - memoryPool.availableMemory()
}
)
+ newGauge("ExpiredConnectionsKilledCount",
+ new Gauge[Double] {
+
+ def value = SocketServer.this.synchronized {
+ val expiredConnectionsKilledCountMetricNames = processors.values.asScala.map { p =>
+ metrics.metricName("expired-connections-killed-count", "socket-server-metrics", p.metricTags)
+ }
+ expiredConnectionsKilledCountMetricNames.map { metricName =>
+ Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
+ }.sum
+ }
+ }
+ )
info("Started " + acceptors.size + " acceptor threads")
}
@@ -548,6 +564,10 @@ private[kafka] class Processor(val id: Int,
// also includes the listener name)
Map(NetworkProcessorMetricTag -> id.toString)
)
+
+ val expiredConnectionsKilledCount = new Total()
+ private val expiredConnectionsKilledCountMetricName = metrics.metricName("expired-connections-killed-count", "socket-server-metrics", metricTags)
+ metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount)
private val selector = createSelector(
ChannelBuilders.serverChannelBuilder(listenerName,
@@ -555,7 +575,8 @@ private[kafka] class Processor(val id: Int,
securityProtocol,
config,
credentialProvider.credentialCache,
- credentialProvider.tokenCache))
+ credentialProvider.tokenCache,
+ time))
// Visible to override for testing
protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
channelBuilder match {
@@ -685,6 +706,10 @@ private[kafka] class Processor(val id: Int,
}
}
+ private def nowNanosSupplier = new Supplier[java.lang.Long] {
+ override def get(): java.lang.Long = time.nanoseconds()
+ }
+
private def poll() {
try selector.poll(300)
catch {
@@ -701,14 +726,25 @@ private[kafka] class Processor(val id: Int,
openOrClosingChannel(receive.source) match {
case Some(channel) =>
val header = RequestHeader.parse(receive.payload)
- val connectionId = receive.source
- val context = new RequestContext(header, connectionId, channel.socketAddress,
- channel.principal, listenerName, securityProtocol)
- val req = new RequestChannel.Request(processor = id, context = context,
- startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)
- requestChannel.sendRequest(req)
- selector.mute(connectionId)
- handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
+ if (header.apiKey() == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier))
+ trace(s"Begin re-authentication: $channel")
+ else {
+ val nowNanos = time.nanoseconds()
+ if (channel.serverAuthenticationSessionExpired(nowNanos)) {
+ channel.disconnect()
+ debug(s"Disconnected expired channel: $channel : $header")
+ expiredConnectionsKilledCount.record(null, 1, 0)
+ } else {
+ val connectionId = receive.source
+ val context = new RequestContext(header, connectionId, channel.socketAddress,
+ channel.principal, listenerName, securityProtocol)
+ val req = new RequestChannel.Request(processor = id, context = context,
+ startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
+ requestChannel.sendRequest(req)
+ selector.mute(connectionId)
+ handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
+ }
+ }
case None =>
// This should never happen since completed receives are processed immediately after `poll()`
throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
@@ -883,6 +919,7 @@ private[kafka] class Processor(val id: Int,
override def shutdown(): Unit = {
super.shutdown()
removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
+ metrics.removeMetric(expiredConnectionsKilledCountMetricName)
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9bf41a1e0542b..13f555a20594c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -222,6 +222,9 @@ object Defaults {
val SslClientAuth = SslClientAuthNone
val SslPrincipalMappingRules = BrokerSecurityConfigs.DEFAULT_SSL_PRINCIPAL_MAPPING_RULES
+ /** ********* General Security configuration ***********/
+ val ConnectionsMaxReauthMsDefault = 0L
+
/** ********* Sasl configuration ***********/
val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM
val SaslEnabledMechanisms = SaslConfigs.DEFAULT_SASL_ENABLED_MECHANISMS
@@ -422,6 +425,7 @@ object KafkaConfig {
/** ******** Common Security Configuration *************/
val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
+ val ConnectionsMaxReauthMsProp = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS
/** ********* SSL Configuration ****************/
val SslProtocolProp = SslConfigs.SSL_PROTOCOL_CONFIG
@@ -744,6 +748,7 @@ object KafkaConfig {
/** ******** Common Security Configuration *************/
val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
+ val ConnectionsMaxReauthMsDoc = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC
/** ********* SSL Configuration ****************/
val SslProtocolDoc = SslConfigs.SSL_PROTOCOL_DOC
@@ -983,6 +988,9 @@ object KafkaConfig {
.define(AlterLogDirsReplicationQuotaWindowSizeSecondsProp, INT, Defaults.AlterLogDirsReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, AlterLogDirsReplicationQuotaWindowSizeSecondsDoc)
.define(ClientQuotaCallbackClassProp, CLASS, null, LOW, ClientQuotaCallbackClassDoc)
+ /** ********* General Security Configuration ****************/
+ .define(ConnectionsMaxReauthMsProp, LONG, Defaults.ConnectionsMaxReauthMsDefault, MEDIUM, ConnectionsMaxReauthMsDoc)
+
/** ********* SSL Configuration ****************/
.define(PrincipalBuilderClassProp, CLASS, null, MEDIUM, PrincipalBuilderClassDoc)
.define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index bef0663c26fbc..5f9b3612c4fb8 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -421,6 +421,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
config,
config.interBrokerListenerName,
config.saslMechanismInterBrokerProtocol,
+ time,
config.saslInterBrokerHandshakeRequestEnable)
val selector = new Selector(
NetworkReceive.UNLIMITED,
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index 4e642f3cda2ac..01a9b9f7b14b9 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -56,6 +56,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
brokerConfig,
brokerConfig.interBrokerListenerName,
brokerConfig.saslMechanismInterBrokerProtocol,
+ time,
brokerConfig.saslInterBrokerHandshakeRequestEnable
)
val selector = new Selector(
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 1f87d7acfec2f..9a5ac7bdeb527 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -450,7 +450,7 @@ private class ReplicaFetcherBlockingSend(sourceNode: Node,
private val socketTimeout: Int = consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)
private val networkClient = {
- val channelBuilder = org.apache.kafka.clients.ClientUtils.createChannelBuilder(consumerConfig)
+ val channelBuilder = org.apache.kafka.clients.ClientUtils.createChannelBuilder(consumerConfig, time)
val selector = new Selector(
NetworkReceive.UNLIMITED,
consumerConfig.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index c9da8347bb3c1..a1f2cff76913c 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -17,6 +17,9 @@
package kafka.api
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.{Gauge, Metric, MetricName}
+
import java.io.File
import java.util.ArrayList
import java.util.concurrent.ExecutionException
@@ -170,6 +173,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3")
this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3")
this.serverConfig.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
+ this.serverConfig.setProperty(KafkaConfig.ConnectionsMaxReauthMsProp, "1500")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group")
/**
@@ -204,6 +208,27 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
consumeRecords(consumer, numRecords)
+ confirmReauthenticationMetrics
+ }
+
+ protected def confirmReauthenticationMetrics() : Unit = {
+ val expiredConnectionsKilledCountTotal = getGauge("ExpiredConnectionsKilledCount").value()
+ servers.foreach { s =>
+ val numExpiredKilled = TestUtils.totalMetricValue(s, "expired-connections-killed-count")
+ assertTrue("Should have been zero expired connections killed: " + numExpiredKilled + "(total=" + expiredConnectionsKilledCountTotal + ")", numExpiredKilled == 0)
+ }
+ assertEquals("Should have been zero expired connections killed total", 0, expiredConnectionsKilledCountTotal, 0.0)
+ servers.foreach { s =>
+ assertTrue("failed re-authentications not 0", TestUtils.totalMetricValue(s, "failed-reauthentication-total") == 0)
+ }
+ }
+
+ private def getGauge(metricName: String) = {
+ Metrics.defaultRegistry.allMetrics.asScala
+ .filterKeys(k => k.getName == metricName)
+ .headOption
+ .getOrElse { fail( "Unable to find metric " + metricName ) }
+ ._2.asInstanceOf[Gauge[Double]]
}
@Test
@@ -212,6 +237,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val consumer = createConsumer()
consumer.subscribe(List(topic).asJava)
consumeRecords(consumer, numRecords)
+ confirmReauthenticationMetrics
}
@Test
@@ -222,6 +248,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val consumer = createConsumer()
consumer.subscribe(List(topic).asJava)
consumeRecords(consumer, numRecords)
+ confirmReauthenticationMetrics
}
@Test
@@ -232,6 +259,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val consumer = createConsumer()
consumer.subscribe(List(topic).asJava)
consumeRecords(consumer, numRecords)
+ confirmReauthenticationMetrics
}
@Test
@@ -242,6 +270,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val consumer = createConsumer()
consumer.assign(List(tp2).asJava)
consumeRecords(consumer, numRecords, topic = tp2.topic)
+ confirmReauthenticationMetrics
}
private def setWildcardResourceAcls() {
@@ -280,6 +309,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
def testNoProduceWithoutDescribeAcl(): Unit = {
val producer = createProducer()
sendRecords(producer, numRecords, tp)
+ confirmReauthenticationMetrics
}
@Test
@@ -296,6 +326,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
case e: TopicAuthorizationException =>
assertEquals(Set(topic).asJava, e.unauthorizedTopics())
}
+ confirmReauthenticationMetrics
}
/**
@@ -309,6 +340,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
consumer.assign(List(tp).asJava)
// the exception is expected when the consumer attempts to lookup offsets
consumeRecords(consumer)
+ confirmReauthenticationMetrics
}
@Test(expected = classOf[TopicAuthorizationException])
@@ -351,6 +383,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
case e: TopicAuthorizationException =>
assertEquals(Set(topic).asJava, e.unauthorizedTopics())
}
+ confirmReauthenticationMetrics
}
@Test
@@ -366,6 +399,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
case e: TopicAuthorizationException =>
assertEquals(Set(topic).asJava, e.unauthorizedTopics())
}
+ confirmReauthenticationMetrics
}
private def noConsumeWithDescribeAclSetup(): Unit = {
@@ -401,6 +435,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
case e: GroupAuthorizationException =>
assertEquals(group, e.groupId())
}
+ confirmReauthenticationMetrics
}
protected final def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index 572d9d3b52020..c8521f67ed868 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -74,5 +74,6 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
case e: TopicAuthorizationException => assertTrue(e.unauthorizedTopics.contains(topic))
case e: GroupAuthorizationException => assertEquals(group, e.groupId)
}
+ confirmReauthenticationMetrics
}
}
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index cbe8462b3fddd..07885cdbb0062 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -188,7 +188,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
private def createSelector(): Selector = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol,
- JaasContext.Type.CLIENT, new TestSecurityConfig(clientConfig), null, kafkaClientSaslMechanism, true)
+ JaasContext.Type.CLIENT, new TestSecurityConfig(clientConfig), null, kafkaClientSaslMechanism, time, true)
NetworkTestUtils.createSelector(channelBuilder, time)
}
}
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index c8876327dd2d2..f07bf84f43854 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.internals.FatalExitError
import org.junit.{After, Before, Test}
import org.junit.Assert._
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
class KafkaTest {
@@ -108,6 +109,21 @@ class KafkaTest {
assertEquals(password, config.getPassword(KafkaConfig.SslTruststorePasswordProp).value)
}
+ @Test
+ def testConnectionsMaxReauthMsDefault(): Unit = {
+ val propertiesFile = prepareDefaultConfig()
+ val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
+ assertEquals(0L, config.valuesWithPrefixOverride("sasl_ssl.oauthbearer.").get(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS).asInstanceOf[Long])
+ }
+
+ @Test
+ def testConnectionsMaxReauthMsExplicit(): Unit = {
+ val propertiesFile = prepareDefaultConfig()
+ val expected = 3600000
+ val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", s"sasl_ssl.oauthbearer.connections.max.reauth.ms=${expected}")))
+ assertEquals(expected, config.valuesWithPrefixOverride("sasl_ssl.oauthbearer.").get(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS).asInstanceOf[Long])
+ }
+
def prepareDefaultConfig(): String = {
prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index ba8e54b2eac42..9ca72561748b5 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -675,6 +675,7 @@ class KafkaConfigTest {
case KafkaConfig.RackProp => // ignore string
//SSL Configs
case KafkaConfig.PrincipalBuilderClassProp =>
+ case KafkaConfig.ConnectionsMaxReauthMsProp =>
case KafkaConfig.SslProtocolProp => // ignore string
case KafkaConfig.SslProviderProp => // ignore string
case KafkaConfig.SslEnabledProtocolsProp =>
diff --git a/docs/ops.html b/docs/ops.html
index 158602b96a16d..3b29dbb3f9b92 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -968,6 +968,16 @@
kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent |
between 0 and 1, ideally > 0.3 |
+
+ | The number of connections disconnected on a processor due to a client not re-authenticating and then using the connection beyond its expiration time for anything other than re-authentication |
+ kafka.server:type=socket-server-metrics,listener=[SASL_PLAINTEXT|SASL_SSL],networkProcessor=<#>,name=expired-connections-killed-count |
+ ideally 0 when re-authentication is enabled, implying there are no longer any older, pre-2.2.0 clients connecting to this (listener, processor) combination |
+
+
+ | The total number of connections disconnected, across all processors, due to a client not re-authenticating and then using the connection beyond its expiration time for anything other than re-authentication |
+ kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount |
+ ideally 0 when re-authentication is enabled, implying there are no longer any older, pre-2.2.0 clients connecting to this broker |
+
| The average fraction of time the request handler threads are idle |
kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent |
@@ -1152,6 +1162,41 @@ Common monitoring me
| Total connections that failed authentication. |
kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
+
+ | successful-reauthentication-rate |
+ Connections per second that were successfully re-authenticated using SASL. |
+ kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
+
+
+ | successful-reauthentication-total |
+ Total connections that were successfully re-authenticated using SASL. |
+ kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
+
+
+ | reauthentication-latency-max |
+ The maximum latency in ms observed due to re-authentication. |
+ kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
+
+
+ | reauthentication-latency-avg |
+ The average latency in ms observed due to re-authentication. |
+ kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
+
+
+ | failed-reauthentication-rate |
+ Connections per second that failed re-authentication. |
+ kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
+
+
+ | failed-reauthentication-total |
+ Total connections that failed re-authentication. |
+ kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
+
+
+ | successful-authentication-no-reauth-total |
+ Total connections that were successfully authenticated by older, pre-2.2.0 SASL clients that do not support re-authentication. May only be non-zero |
+ kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
+
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index 9f15696fe7173..d85effc2833cb 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -57,6 +57,7 @@
public class ConnectionStressWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ConnectionStressWorker.class);
+ private static final Time TIME = Time.SYSTEM;
private static final int THROTTLE_PERIOD_MS = 100;
@@ -100,7 +101,7 @@ public void start(Platform platform, WorkerStatusTracker status,
this.status = status;
this.totalConnections = 0;
this.totalFailedConnections = 0;
- this.startTimeMs = Time.SYSTEM.milliseconds();
+ this.startTimeMs = TIME.milliseconds();
this.throttle = new ConnectStressThrottle(WorkerUtils.
perSecToPerPeriod(spec.targetConnectionsPerSec(), THROTTLE_PERIOD_MS));
this.nextReportTime = 0;
@@ -168,11 +169,11 @@ private boolean attemptConnection(AdminClientConfig conf,
try {
List nodes = updater.fetchNodes();
Node targetNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
- try (ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(conf)) {
+ try (ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(conf, TIME)) {
try (Metrics metrics = new Metrics()) {
LogContext logContext = new LogContext();
try (Selector selector = new Selector(conf.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
- metrics, Time.SYSTEM, "", channelBuilder, logContext)) {
+ metrics, TIME, "", channelBuilder, logContext)) {
try (NetworkClient client = new NetworkClient(selector,
updater,
"ConnectionStressWorker",
@@ -183,11 +184,11 @@ private boolean attemptConnection(AdminClientConfig conf,
4096,
1000,
ClientDnsLookup.forConfig(conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)),
- Time.SYSTEM,
+ TIME,
false,
new ApiVersions(),
logContext)) {
- NetworkClientUtils.awaitReady(client, targetNode, Time.SYSTEM, 100);
+ NetworkClientUtils.awaitReady(client, targetNode, TIME, 100);
}
}
}