From 2a5d564288df9665082a9f01286010db5beef946 Mon Sep 17 00:00:00 2001 From: Ron Dagostino Date: Thu, 20 Sep 2018 23:11:36 -0400 Subject: [PATCH 01/12] KIP-368: Allow SASL Connections to Periodically Re-Authenticate The adoption of KIP-255: OAuth Authentication via SASL/OAUTHBEARER in release 2.0.0 creates the possibility of using information in the bearer token to make authorization decisions. Unfortunately, however, Kafka connections are long-lived, so there is no ability to change the bearer token associated with a particular connection. Allowing SASL connections to periodically re-authenticate would resolve this. In addition to this motivation there are two others that are security-related. First, to eliminate access to Kafka for connected clients, the current requirement is to remove all authorizations (i.e. remove all ACLs). This is necessary because of the long-lived nature of the connections. It is operationally simpler to shut off access at the point of authentication, and with the release of KIP-86: Configurable SASL Callback Handlers it is going to become more and more likely that installations will authenticate users against external directories (e.g. via LDAP). The ability to stop Kafka access by simply disabling an account in an LDAP directory (for example) is desirable. The second motivating factor for re-authentication related to security is that the use of short-lived tokens is a common OAuth security recommendation, but issuing a short-lived token to a Kafka client (or a broker when OAUTHBEARER is the inter-broker protocol) currently has no benefit because once a client is connected to a broker the client is never challenged again and the connection may remain intact beyond the token expiration time (and may remain intact indefinitely under perfect circumstances). This KIP proposes adding the ability for clients (and brokers when OAUTHBEARER is the inter-broker protocol) to re-authenticate their connections to brokers and for brokers to close connections that continue to use expired sessions. Signed-off-by: Ron Dagostino --- checkstyle/suppressions.xml | 2 +- .../org/apache/kafka/clients/ClientUtils.java | 5 +- .../kafka/clients/admin/KafkaAdminClient.java | 2 +- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../internals/BrokerSecurityConfigs.java | 6 + .../kafka/common/network/Authenticator.java | 102 +++++ .../kafka/common/network/ChannelBuilders.java | 17 +- .../kafka/common/network/KafkaChannel.java | 250 ++++++++++- .../network/PlaintextChannelBuilder.java | 5 +- .../network/ReauthenticationContext.java | 94 +++++ .../common/network/SaslChannelBuilder.java | 53 ++- .../apache/kafka/common/network/Selector.java | 59 ++- .../common/network/SslChannelBuilder.java | 5 +- .../requests/SaslAuthenticateRequest.java | 6 +- .../requests/SaslAuthenticateResponse.java | 23 +- .../SaslClientAuthenticator.java | 232 +++++++++-- .../authenticator/SaslInternalConfigs.java | 41 ++ .../SaslServerAuthenticator.java | 292 ++++++++++--- .../internals/OAuthBearerSaslServer.java | 4 +- .../scram/internals/ScramSaslServer.java | 7 +- .../internals/ScramServerCallbackHandler.java | 4 + .../DelegationTokenCredentialCallback.java | 9 + .../common/network/NetworkTestUtils.java | 10 + .../kafka/common/network/NioEchoServer.java | 162 ++++++-- .../network/SaslChannelBuilderTest.java | 3 +- .../common/network/SslTransportLayerTest.java | 4 +- .../kafka/common/protocol/ApiKeysTest.java | 4 +- .../common/requests/RequestResponseTest.java | 26 +- .../common/security/TestSecurityConfig.java | 2 + .../SaslAuthenticatorFailureDelayTest.java | 2 +- .../authenticator/SaslAuthenticatorTest.java | 387 ++++++++++++++++-- .../SaslServerAuthenticatorTest.java | 3 +- .../internals/OAuthBearerSaslServerTest.java | 11 + .../org/apache/kafka/test/TestCondition.java | 1 + .../java/org/apache/kafka/test/TestUtils.java | 30 +- .../distributed/WorkerGroupMember.java | 2 +- .../main/scala/kafka/admin/AdminClient.scala | 2 +- .../controller/ControllerChannelManager.scala | 1 + .../TransactionMarkerChannelManager.scala | 1 + .../scala/kafka/network/SocketServer.scala | 55 ++- .../main/scala/kafka/server/KafkaConfig.scala | 8 + .../main/scala/kafka/server/KafkaServer.scala | 1 + .../server/ReplicaFetcherBlockingSend.scala | 1 + .../kafka/tools/ReplicaVerificationTool.scala | 2 +- .../kafka/api/EndToEndAuthorizationTest.scala | 35 ++ .../api/SaslEndToEndAuthorizationTest.scala | 1 + .../server/GssapiAuthenticationTest.scala | 2 +- .../scala/unit/kafka/KafkaConfigTest.scala | 16 + .../unit/kafka/server/KafkaConfigTest.scala | 1 + docs/ops.html | 45 ++ .../workload/ConnectionStressWorker.java | 11 +- 52 files changed, 1840 insertions(+), 211 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/network/ReauthenticationContext.java create mode 100644 clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslInternalConfigs.java diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 5046c7617ff03..f3ab7ecfa8363 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -51,7 +51,7 @@ files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/> + files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator).java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index dce5f3fb65cda..fe83c5c54e904 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.network.ChannelBuilders; import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,11 +103,11 @@ public static void closeQuietly(Closeable c, String name, AtomicReference resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 15021bc1db19c..33a478687143e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -350,7 +350,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso reporters.add(new JmxReporter(JMX_PREFIX)); metrics = new Metrics(metricConfig, reporters, time); String metricGrpPrefix = "admin-client"; - channelBuilder = ClientUtils.createChannelBuilder(config); + channelBuilder = ClientUtils.createChannelBuilder(config, time); selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext); networkClient = new NetworkClient( diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 4061373c45021..714cd94d6b2f6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -715,7 +715,7 @@ private KafkaConsumer(ConsumerConfig config, this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0); String metricGrpPrefix = "consumer"; ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer"); - ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time); IsolationLevel isolationLevel = IsolationLevel.valueOf( config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT)); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index c68a014619a2d..cefe098cde986 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -436,7 +436,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) { int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null); int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); - ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig); + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time); ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient( diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java index e3a8a774a5172..98f6467a48429 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java @@ -35,6 +35,7 @@ public class BrokerSecurityConfigs { public static final String SASL_ENABLED_MECHANISMS_CONFIG = "sasl.enabled.mechanisms"; public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class"; public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules"; + public static final String CONNECTIONS_MAX_REAUTH_MS = "connections.max.reauth.ms"; public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " + "KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " + @@ -84,4 +85,9 @@ public class BrokerSecurityConfigs { + "listener prefix and SASL mechanism name in lower-case. For example, " + "listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler."; + public static final String CONNECTIONS_MAX_REAUTH_MS_DOC = "When explicitly set to a positive number (the default is 0, not a positive number), " + + "a session lifetime that will not exceed the configured value will be communicated to v2.2.0 or later clients when they authenticate. " + + "The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently " + + "used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL " + + "mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000"; } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java index 33c2e9085168d..bcb011e830bfd 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java @@ -21,6 +21,8 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Collections; +import java.util.List; /** * Authentication for Channel @@ -54,4 +56,104 @@ default void handleAuthenticationFailure() throws IOException { * returns true if authentication is complete otherwise returns false; */ boolean complete(); + + /** + * Begins re-authentication. Uses transportLayer to read or write tokens as is + * done for {@link #authenticate()}. For security protocols PLAINTEXT and SSL, + * this is a no-op since re-authentication does not apply/is not supported, + * respectively. For SASL_PLAINTEXT and SASL_SSL, this performs a SASL + * authentication. Any in-flight responses from prior requests can/will be read + * and collected for later processing as required. There must not be partially + * written requests; any request queued for writing (for which zero bytes have + * been written) remains queued until after re-authentication succeeds. + * + * @param reauthenticationContext + * the context in which this re-authentication is occurring. This + * instance is responsible for closing the previous Authenticator + * returned by + * {@link ReauthenticationContext#previousAuthenticator()}. + * @throws AuthenticationException + * if authentication fails due to invalid credentials or other + * security configuration errors + * @throws IOException + * if read/write fails due to an I/O error + */ + default void reauthenticate(ReauthenticationContext reauthenticationContext) throws IOException { + // empty + } + + /** + * Return the session expiration time, if any, otherwise null. The value is in + * nanoseconds as per {@code System.nanoTime()} and is therefore only useful + * when compared to such a value -- it's absolute value is meaningless. This + * value may be non-null only on the server-side. It represents the time after + * which, in the absence of re-authentication, the broker will close the session + * if it receives a request unrelated to authentication. We store nanoseconds + * here to avoid having to invoke the more expensive {@code milliseconds()} call + * on the broker for every request + * + * @return the session expiration time, if any, otherwise null + */ + default Long serverSessionExpirationTimeNanos() { + return null; + } + + /** + * Return the time on or after which a client should re-authenticate this + * session, if any, otherwise null. The value is in nanoseconds as per + * {@code System.nanoTime()} and is therefore only useful when compared to such + * a value -- it's absolute value is meaningless. This value may be non-null + * only on the client-side. It will be a random time between 85% and 95% of the + * full session lifetime to account for latency between client and server and to + * avoid re-authentication storms that could be caused by many sessions + * re-authenticating simultaneously. + * + * @return the time on or after which a client should re-authenticate this + * session, if any, otherwise null + */ + default Long clientSessionReauthenticationTimeNanos() { + return null; + } + + /** + * Return the number of milliseconds that elapsed while re-authenticating this + * session from the perspective of this instance, if applicable, otherwise null. + * The server-side perspective will yield a lower value than the client-side + * perspective of the same re-authentication because the client-side observes an + * additional network round-trip. + * + * @return the number of milliseconds that elapsed while re-authenticating this + * session from the perspective of this instance, if applicable, + * otherwise null + */ + default Long reauthenticationLatencyMs() { + return null; + } + + /** + * Return the (always non-null but possibly empty) client-side + * {@link NetworkReceive} responses that arrived during re-authentication that + * are unrelated to re-authentication, if any. These correspond to requests sent + * prior to the beginning of re-authentication; the requests were made when the + * channel was successfully authenticated, and the responses arrived during the + * re-authentication process. + * + * @return the (always non-null but possibly empty) client-side + * {@link NetworkReceive} responses that arrived during + * re-authentication that are unrelated to re-authentication, if any + */ + default List getAndClearResponsesReceivedDuringReauthentication() { + return Collections.emptyList(); + } + + /** + * Return true if this is a server-side authenticator and the connected client + * has indicated that it supports re-authentication, otherwise false + * + * @return true if this is a server-side authenticator and the connected client + * has indicated that it supports re-authentication, otherwise false + */ + default boolean connectedClientSupportsReauthentication() { + return false; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index b3040f3ef73c7..4257c957cd661 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.ssl.SslPrincipalMapper; import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import java.util.Collections; @@ -36,7 +37,6 @@ import java.util.Map; public class ChannelBuilders { - private ChannelBuilders() { } /** @@ -55,6 +55,7 @@ public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProto AbstractConfig config, ListenerName listenerName, String clientSaslMechanism, + Time time, boolean saslHandshakeRequestEnable) { if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) { @@ -64,7 +65,7 @@ public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProto throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`"); } return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism, - saslHandshakeRequestEnable, null, null); + saslHandshakeRequestEnable, null, null, time); } /** @@ -79,9 +80,10 @@ public static ChannelBuilder serverChannelBuilder(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config, CredentialCache credentialCache, - DelegationTokenCache tokenCache) { + DelegationTokenCache tokenCache, + Time time) { return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName, - isInterBrokerListener, null, true, credentialCache, tokenCache); + isInterBrokerListener, null, true, credentialCache, tokenCache, time); } private static ChannelBuilder create(SecurityProtocol securityProtocol, @@ -93,7 +95,8 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol, String clientSaslMechanism, boolean saslHandshakeRequestEnable, CredentialCache credentialCache, - DelegationTokenCache tokenCache) { + DelegationTokenCache tokenCache, + Time time) { Map configs; if (listenerName == null) configs = config.values(); @@ -111,6 +114,7 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol, requireNonNullMode(mode, securityProtocol); Map jaasContexts; if (mode == Mode.SERVER) { + @SuppressWarnings("unchecked") List enabledMechanisms = (List) configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG); jaasContexts = new HashMap<>(enabledMechanisms.size()); for (String mechanism : enabledMechanisms) @@ -129,7 +133,8 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache, - tokenCache); + tokenCache, + time); break; case PLAINTEXT: channelBuilder = new PlaintextChannelBuilder(listenerName); diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 6a12ef2331091..969b60f6ab3e4 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -27,9 +27,45 @@ import java.net.Socket; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.List; import java.util.Objects; - +import java.util.function.Supplier; + +/** + * A Kafka connection either existing on a client (which could be a broker in an + * inter-broker scenario) and representing the channel to a remote broker or the + * reverse (existing on a broker and representing the channel to a remote + * client, which could be a broker in an inter-broker scenario). + *

+ * Each instance has the following: + *

    + *
  • a unique ID identifying it in the {@code KafkaClient} instance via which + * the connection was made on the client-side or in the instance where it was + * accepted on the server-side
  • + *
  • a reference to the underlying {@link TransportLayer} to allow reading and + * writing
  • + *
  • an {@link Authenticator} that performs the authentication (or + * re-authentication, if that feature is enabled and it applies to this + * connection) by reading and writing directly from/to the same + * {@link TransportLayer}.
  • + *
  • a {@link MemoryPool} into which responses are read (typically the JVM + * heap for clients, though smaller pools can be used for brokers and for + * testing out-of-memory scenarios)
  • + *
  • a {@link NetworkReceive} representing the current incomplete/in-progress + * request (from the server-side perspective) or response (from the client-side + * perspective) being read, if applicable; or a non-null value that has had no + * data read into it yet or a null value if there is no in-progress + * request/response (either could be the case)
  • + *
  • a {@link Send} representing the current request (from the client-side + * perspective) or response (from the server-side perspective) that is either + * waiting to be sent or partially sent, if applicable, or null
  • + *
  • a {@link ChannelMuteState} to document if the channel has been muted due + * to memory pressure or other reasons
  • + *
+ */ public class KafkaChannel { + private static final long MIN_REAUTH_INTERVAL_ONE_SECOND_NANOS = 1000 * 1000 * 1000; + /** * Mute States for KafkaChannel: *