Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>

<suppress checks="CyclomaticComplexity"
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler).java"/>
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator).java"/>

<suppress checks="JavaNCSS"
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -102,11 +103,11 @@ public static void closeQuietly(Closeable c, String name, AtomicReference<Throwa
* @param config client configs
* @return configured ChannelBuilder based on the configs.
*/
public static ChannelBuilder createChannelBuilder(AbstractConfig config) {
public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time) {
SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM);
return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null,
clientSaslMechanism, true);
clientSaslMechanism, time, true);
}

static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso
reporters.add(new JmxReporter(JMX_PREFIX));
metrics = new Metrics(metricConfig, reporters, time);
String metricGrpPrefix = "admin-client";
channelBuilder = ClientUtils.createChannelBuilder(config);
channelBuilder = ClientUtils.createChannelBuilder(config, time);
selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics, time, metricGrpPrefix, channelBuilder, logContext);
networkClient = new NetworkClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ private KafkaConsumer(ConsumerConfig config,
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "consumer";
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);

IsolationLevel isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null);
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class BrokerSecurityConfigs {
public static final String SASL_ENABLED_MECHANISMS_CONFIG = "sasl.enabled.mechanisms";
public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class";
public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules";
public static final String CONNECTIONS_MAX_REAUTH_MS = "connections.max.reauth.ms";
Comment thread
rondagostino marked this conversation as resolved.

public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " +
"KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " +
Expand Down Expand Up @@ -84,4 +85,9 @@ public class BrokerSecurityConfigs {
+ "listener prefix and SASL mechanism name in lower-case. For example, "
+ "listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler.";

public static final String CONNECTIONS_MAX_REAUTH_MS_DOC = "When explicitly set to a positive number (the default is 0, not a positive number), "
+ "a session lifetime that will not exceed the configured value will be communicated to v2.2.0 or later clients when they authenticate. "
+ "The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently "
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused by what this part of this sentence, can you elaborate?

 that is then subsequently used for any purpose other than re-authentication.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkaminski1988 Here's the quote from the KIP related to this:

From a behavior perspective on the server (broker) side, when the expired-connection-kill feature is enabled with a positive value the broker will communicate a session time via SASL_AUTHENTICATE and will close a connection when the connection is used past the expiration time and the specific API request is not directly related to re-authentication (SaslHandshakeRequest and SaslAuthenticateRequest). In other words, if a connection sits idle, it will not be closed – something unrelated to re-authentication must traverse the connection before a disconnect will occur.

Does that clarify it?

+ "used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL "
+ "mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* Authentication for Channel
Expand Down Expand Up @@ -54,4 +56,104 @@ default void handleAuthenticationFailure() throws IOException {
* returns true if authentication is complete otherwise returns false;
*/
boolean complete();

/**
* Begins re-authentication. Uses transportLayer to read or write tokens as is
* done for {@link #authenticate()}. For security protocols PLAINTEXT and SSL,
* this is a no-op since re-authentication does not apply/is not supported,
* respectively. For SASL_PLAINTEXT and SASL_SSL, this performs a SASL
* authentication. Any in-flight responses from prior requests can/will be read
* and collected for later processing as required. There must not be partially
* written requests; any request queued for writing (for which zero bytes have
* been written) remains queued until after re-authentication succeeds.
*
* @param reauthenticationContext
* the context in which this re-authentication is occurring. This
* instance is responsible for closing the previous Authenticator
* returned by
* {@link ReauthenticationContext#previousAuthenticator()}.
* @throws AuthenticationException
* if authentication fails due to invalid credentials or other
* security configuration errors
* @throws IOException
* if read/write fails due to an I/O error
*/
default void reauthenticate(ReauthenticationContext reauthenticationContext) throws IOException {
// empty
}

/**
* Return the session expiration time, if any, otherwise null. The value is in
* nanoseconds as per {@code System.nanoTime()} and is therefore only useful
* when compared to such a value -- it's absolute value is meaningless. This
* value may be non-null only on the server-side. It represents the time after
* which, in the absence of re-authentication, the broker will close the session
* if it receives a request unrelated to authentication. We store nanoseconds
* here to avoid having to invoke the more expensive {@code milliseconds()} call
* on the broker for every request
*
* @return the session expiration time, if any, otherwise null
*/
default Long serverSessionExpirationTimeNanos() {
Comment thread
rondagostino marked this conversation as resolved.
return null;
}

/**
* Return the time on or after which a client should re-authenticate this
* session, if any, otherwise null. The value is in nanoseconds as per
* {@code System.nanoTime()} and is therefore only useful when compared to such
* a value -- it's absolute value is meaningless. This value may be non-null
* only on the client-side. It will be a random time between 85% and 95% of the
* full session lifetime to account for latency between client and server and to
* avoid re-authentication storms that could be caused by many sessions
* re-authenticating simultaneously.
*
* @return the time on or after which a client should re-authenticate this
* session, if any, otherwise null
*/
default Long clientSessionReauthenticationTimeNanos() {
return null;
}

/**
* Return the number of milliseconds that elapsed while re-authenticating this
* session from the perspective of this instance, if applicable, otherwise null.
* The server-side perspective will yield a lower value than the client-side
* perspective of the same re-authentication because the client-side observes an
* additional network round-trip.
*
* @return the number of milliseconds that elapsed while re-authenticating this
* session from the perspective of this instance, if applicable,
* otherwise null
*/
default Long reauthenticationLatencyMs() {
return null;
}

/**
* Return the (always non-null but possibly empty) client-side
* {@link NetworkReceive} responses that arrived during re-authentication that
* are unrelated to re-authentication, if any. These correspond to requests sent
* prior to the beginning of re-authentication; the requests were made when the
* channel was successfully authenticated, and the responses arrived during the
* re-authentication process.
*
* @return the (always non-null but possibly empty) client-side
* {@link NetworkReceive} responses that arrived during
* re-authentication that are unrelated to re-authentication, if any
*/
default List<NetworkReceive> getAndClearResponsesReceivedDuringReauthentication() {
return Collections.emptyList();
}

/**
* Return true if this is a server-side authenticator and the connected client
* has indicated that it supports re-authentication, otherwise false
*
* @return true if this is a server-side authenticator and the connected client
* has indicated that it supports re-authentication, otherwise false
*/
default boolean connectedClientSupportsReauthentication() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;

import java.util.Collections;
Expand All @@ -36,7 +37,6 @@
import java.util.Map;

public class ChannelBuilders {

private ChannelBuilders() { }

/**
Expand All @@ -55,6 +55,7 @@ public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProto
AbstractConfig config,
ListenerName listenerName,
String clientSaslMechanism,
Time time,
boolean saslHandshakeRequestEnable) {

if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
Expand All @@ -64,7 +65,7 @@ public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProto
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
}
return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism,
saslHandshakeRequestEnable, null, null);
saslHandshakeRequestEnable, null, null, time);
}

/**
Expand All @@ -79,9 +80,10 @@ public static ChannelBuilder serverChannelBuilder(ListenerName listenerName,
SecurityProtocol securityProtocol,
AbstractConfig config,
CredentialCache credentialCache,
DelegationTokenCache tokenCache) {
DelegationTokenCache tokenCache,
Time time) {
return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName,
isInterBrokerListener, null, true, credentialCache, tokenCache);
isInterBrokerListener, null, true, credentialCache, tokenCache, time);
}

private static ChannelBuilder create(SecurityProtocol securityProtocol,
Expand All @@ -93,7 +95,8 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
String clientSaslMechanism,
boolean saslHandshakeRequestEnable,
CredentialCache credentialCache,
DelegationTokenCache tokenCache) {
DelegationTokenCache tokenCache,
Time time) {
Map<String, ?> configs;
if (listenerName == null)
configs = config.values();
Expand All @@ -111,6 +114,7 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
requireNonNullMode(mode, securityProtocol);
Map<String, JaasContext> jaasContexts;
if (mode == Mode.SERVER) {
@SuppressWarnings("unchecked")
List<String> enabledMechanisms = (List<String>) configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
jaasContexts = new HashMap<>(enabledMechanisms.size());
for (String mechanism : enabledMechanisms)
Expand All @@ -129,7 +133,8 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol,
clientSaslMechanism,
saslHandshakeRequestEnable,
credentialCache,
tokenCache);
tokenCache,
time);
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder(listenerName);
Expand Down
Loading