diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java index e3b2eeb58cce6..0cf1d74f85ce6 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java @@ -36,7 +36,8 @@ public interface ChannelBuilder extends AutoCloseable, Configurable { * @param memoryPool memory pool from which to allocate buffers, or null for none * @return KafkaChannel */ - KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException; + KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, + MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException; /** * Closes ChannelBuilder diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelMetadataRegistry.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelMetadataRegistry.java new file mode 100644 index 0000000000000..a3453d881c986 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelMetadataRegistry.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.io.Closeable; + +/** + * Metadata about a channel is provided in various places in the network stack. This + * registry is used as a common place to collect them. + */ +public interface ChannelMetadataRegistry extends Closeable { + + /** + * Register information about the SSL cipher we are using. + * Re-registering the information will overwrite the previous one. + */ + void registerCipherInformation(CipherInformation cipherInformation); + + /** + * Get the currently registered cipher information. + */ + CipherInformation cipherInformation(); + + /** + * Register information about the client client we are using. + * Depending on the clients, the ApiVersionsRequest could be received + * multiple times or not at all. Re-registering the information will + * overwrite the previous one. + */ + void registerClientInformation(ClientInformation clientInformation); + + /** + * Get the currently registered client information. + */ + ClientInformation clientInformation(); + + /** + * Unregister everything that has been registered and close the registry. + */ + void close(); +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/CipherInformation.java b/clients/src/main/java/org/apache/kafka/common/network/CipherInformation.java index a40469870480e..d65aeb981c009 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/CipherInformation.java +++ b/clients/src/main/java/org/apache/kafka/common/network/CipherInformation.java @@ -23,8 +23,8 @@ public class CipherInformation { private final String protocol; public CipherInformation(String cipher, String protocol) { - this.cipher = cipher; - this.protocol = protocol; + this.cipher = cipher == null || cipher.isEmpty() ? "unknown" : cipher; + this.protocol = protocol == null || protocol.isEmpty() ? "unknown" : protocol; } public String cipher() { diff --git a/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java b/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java new file mode 100644 index 0000000000000..cb99a8669e40f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + +import java.util.Objects; + +public class ClientInformation { + public static final String UNKNOWN_NAME_OR_VERSION = "unknown"; + public static final ClientInformation EMPTY = new ClientInformation(UNKNOWN_NAME_OR_VERSION, UNKNOWN_NAME_OR_VERSION); + + private final String softwareName; + private final String softwareVersion; + + public ClientInformation(String softwareName, String softwareVersion) { + this.softwareName = softwareName.isEmpty() ? UNKNOWN_NAME_OR_VERSION : softwareName; + this.softwareVersion = softwareVersion.isEmpty() ? UNKNOWN_NAME_OR_VERSION : softwareVersion; + } + + public String softwareName() { + return this.softwareName; + } + + public String softwareVersion() { + return this.softwareVersion; + } + + @Override + public String toString() { + return "ClientInformation(softwareName=" + softwareName + + ", softwareVersion=" + softwareVersion + ")"; + } + + @Override + public int hashCode() { + return Objects.hash(softwareName, softwareVersion); + } + + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + if (!(o instanceof ClientInformation)) { + return false; + } + ClientInformation other = (ClientInformation) o; + return other.softwareName.equals(softwareName) && + other.softwareVersion.equals(softwareVersion); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java b/clients/src/main/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java new file mode 100644 index 0000000000000..ae9e9a83a0c2c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +public class DefaultChannelMetadataRegistry implements ChannelMetadataRegistry { + private CipherInformation cipherInformation; + private ClientInformation clientInformation; + + @Override + public void registerCipherInformation(final CipherInformation cipherInformation) { + if (this.cipherInformation != null) { + this.cipherInformation = cipherInformation; + } + } + + @Override + public CipherInformation cipherInformation() { + return this.cipherInformation; + } + + @Override + public void registerClientInformation(final ClientInformation clientInformation) { + this.clientInformation = clientInformation; + } + + @Override + public ClientInformation clientInformation() { + return this.clientInformation; + } + + @Override + public void close() { + this.cipherInformation = null; + this.clientInformation = null; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 8651835eac3d7..59e08197fac99 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -29,7 +29,6 @@ import java.nio.channels.SocketChannel; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.function.Supplier; /** @@ -122,6 +121,7 @@ public enum ChannelMuteEvent { private long networkThreadTimeNanos; private final int maxReceiveSize; private final MemoryPool memoryPool; + private final ChannelMetadataRegistry metadataRegistry; private NetworkReceive receive; private Send send; // Track connection and mute state of channels to enable outstanding requests on channels to be @@ -134,7 +134,8 @@ public enum ChannelMuteEvent { private boolean midWrite; private long lastReauthenticationStartNanos; - public KafkaChannel(String id, TransportLayer transportLayer, Supplier authenticatorCreator, int maxReceiveSize, MemoryPool memoryPool) { + public KafkaChannel(String id, TransportLayer transportLayer, Supplier authenticatorCreator, + int maxReceiveSize, MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) { this.id = id; this.transportLayer = transportLayer; this.authenticatorCreator = authenticatorCreator; @@ -142,6 +143,7 @@ public KafkaChannel(String id, TransportLayer transportLayer, Supplier cipherInformation() { - return transportLayer.cipherInformation(); + public ChannelMetadataRegistry channelMetadataRegistry() { + return metadataRegistry; } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java index 3a9fd644972da..705e10f0cffa5 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java @@ -49,12 +49,13 @@ public void configure(Map configs) throws KafkaException { } @Override - public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { + public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, + MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException { try { PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key); Supplier authenticatorCreator = () -> new PlaintextAuthenticator(configs, transportLayer, listenerName); return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize, - memoryPool != null ? memoryPool : MemoryPool.NONE); + memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry); } catch (Exception e) { log.warn("Failed to create channel due to ", e); throw new KafkaException(e); diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java index 78393ee68900c..845b1474f4e31 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java @@ -27,7 +27,6 @@ import java.nio.channels.SelectionKey; import java.security.Principal; -import java.util.Optional; import org.apache.kafka.common.security.auth.KafkaPrincipal; @@ -215,9 +214,4 @@ public boolean hasBytesBuffered() { public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException { return fileChannel.transferTo(position, count, socketChannel); } - - @Override - public Optional cipherInformation() { - return Optional.empty(); - } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index a2c843108be10..51760022e322a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -182,11 +182,12 @@ public ListenerName listenerName() { } @Override - public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { + public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, + MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException { try { SocketChannel socketChannel = (SocketChannel) key.channel(); Socket socket = socketChannel.socket(); - TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel); + TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel, metadataRegistry); Supplier authenticatorCreator; if (mode == Mode.SERVER) { authenticatorCreator = () -> buildServerAuthenticator(configs, @@ -194,7 +195,8 @@ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize id, transportLayer, Collections.unmodifiableMap(subjects), - Collections.unmodifiableMap(connectionsMaxReauthMsByMechanism)); + Collections.unmodifiableMap(connectionsMaxReauthMsByMechanism), + metadataRegistry); } else { LoginManager loginManager = loginManagers.get(clientSaslMechanism); authenticatorCreator = () -> buildClientAuthenticator(configs, @@ -205,7 +207,8 @@ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize transportLayer, subjects.get(clientSaslMechanism)); } - return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE); + return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize, + memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry); } catch (Exception e) { log.info("Failed to create channel due to ", e); throw new KafkaException(e); @@ -222,10 +225,13 @@ public void close() { } // Visible to override for testing - protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException { + protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel, + ChannelMetadataRegistry metadataRegistry) throws IOException { if (this.securityProtocol == SecurityProtocol.SASL_SSL) { return SslTransportLayer.create(id, key, - sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort())); + sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), + socketChannel.socket().getPort()), + metadataRegistry); } else { return new PlaintextTransportLayer(key); } @@ -237,9 +243,11 @@ protected SaslServerAuthenticator buildServerAuthenticator(Map config String id, TransportLayer transportLayer, Map subjects, - Map connectionsMaxReauthMsByMechanism) { + Map connectionsMaxReauthMsByMechanism, + ChannelMetadataRegistry metadataRegistry) { return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, - kerberosShortNamer, listenerName, securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, time); + kerberosShortNamer, listenerName, securityProtocol, transportLayer, + connectionsMaxReauthMsByMechanism, metadataRegistry, time); } // Visible to override for testing diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index beb918471c9da..bd4f31ada5079 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -52,7 +52,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -314,6 +313,11 @@ public void register(String id, SocketChannel socketChannel) throws IOException ensureNotRegistered(id); registerChannel(id, socketChannel, SelectionKey.OP_READ); this.sensors.connectionCreated.record(); + // Default to empty client information as the ApiVersionsRequest is not + // mandatory. In this case, we still want to account for the connection. + ChannelMetadataRegistry metadataRegistry = this.channel(id).channelMetadataRegistry(); + if (metadataRegistry.clientInformation() == null) + metadataRegistry.registerClientInformation(ClientInformation.EMPTY); } private void ensureNotRegistered(String id) { @@ -334,7 +338,8 @@ protected SelectionKey registerChannel(String id, SocketChannel socketChannel, i private KafkaChannel buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, SelectionKey key) throws IOException { try { - KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool); + KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool, + new SelectorChannelMetadataRegistry()); key.attach(channel); return channel; } catch (Exception e) { @@ -563,10 +568,6 @@ void pollSelectionKeys(Set selectionKeys, sensors.successfulAuthentication.record(1.0, readyTimeMs); if (!channel.connectedClientSupportsReauthentication()) sensors.successfulAuthenticationNoReauth.record(1.0, readyTimeMs); - Optional cipherInformation = channel.cipherInformation(); - if (cipherInformation.isPresent()) { - sensors.connectionsByCipher.increment(cipherInformation.get()); - } } log.debug("Successfully {}authenticated with {}", isReauthentication ? "re-" : "", channel.socketDescription()); @@ -926,10 +927,6 @@ private void doClose(KafkaChannel channel, boolean notifyDisconnect) { key.attach(null); } - Optional cipherInformation = channel.cipherInformation(); - if (cipherInformation.isPresent()) { - sensors.connectionsByCipher.decrement(cipherInformation.get()); - } this.sensors.connectionClosed.record(); this.stagedReceives.remove(channel); this.explicitlyMutedChannels.remove(channel); @@ -1072,6 +1069,58 @@ public int numStagedReceives(KafkaChannel channel) { return deque == null ? 0 : deque.size(); } + class SelectorChannelMetadataRegistry implements ChannelMetadataRegistry { + private CipherInformation cipherInformation; + private ClientInformation clientInformation; + + @Override + public void registerCipherInformation(final CipherInformation cipherInformation) { + if (this.cipherInformation != null) { + if (this.cipherInformation.equals(cipherInformation)) + return; + sensors.connectionsByCipher.decrement(this.cipherInformation); + } + + this.cipherInformation = cipherInformation; + sensors.connectionsByCipher.increment(cipherInformation); + } + + @Override + public CipherInformation cipherInformation() { + return cipherInformation; + } + + @Override + public void registerClientInformation(final ClientInformation clientInformation) { + if (this.clientInformation != null) { + if (this.clientInformation.equals(clientInformation)) + return; + sensors.connectionsByClient.decrement(this.clientInformation); + } + + this.clientInformation = clientInformation; + sensors.connectionsByClient.increment(clientInformation); + } + + @Override + public ClientInformation clientInformation() { + return clientInformation; + } + + @Override + public void close() { + if (this.cipherInformation != null) { + sensors.connectionsByCipher.decrement(this.cipherInformation); + this.cipherInformation = null; + } + + if (this.clientInformation != null) { + sensors.connectionsByClient.decrement(this.clientInformation); + this.clientInformation = null; + } + } + } + class SelectorMetrics implements AutoCloseable { private final Metrics metrics; private final String metricGrpPrefix; @@ -1094,6 +1143,7 @@ class SelectorMetrics implements AutoCloseable { public final Sensor selectTime; public final Sensor ioTime; public final IntGaugeSuite connectionsByCipher; + public final IntGaugeSuite connectionsByClient; /* Names of metrics that are not registered through sensors */ private final List topLevelMetricNames = new ArrayList<>(); @@ -1199,6 +1249,15 @@ public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map(log, "clients", metrics, + clientInformation -> { + Map tags = new LinkedHashMap<>(); + tags.put("clientSoftwareName", clientInformation.softwareName()); + tags.put("clientSoftwareVersion", clientInformation.softwareVersion()); + tags.putAll(metricTags); + return metrics.metricName("connections", metricGrpName, "The number of connections with this client and version.", tags); + }, 100); + metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags); topLevelMetricNames.add(metricName); this.metrics.addMetric(metricName, (config, now) -> channels.size()); @@ -1322,6 +1381,7 @@ public void close() { for (Sensor sensor : sensors) metrics.removeSensor(sensor.name()); connectionsByCipher.close(); + connectionsByClient.close(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index 9c6816124132e..ced8afe01225a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -93,12 +93,15 @@ public ListenerName listenerName() { } @Override - public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { + public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, + MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException { try { - SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key)); - Supplier authenticatorCreator = () -> new SslAuthenticator(configs, transportLayer, listenerName, sslPrincipalMapper); + SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, + peerHost(key), metadataRegistry); + Supplier authenticatorCreator = () -> + new SslAuthenticator(configs, transportLayer, listenerName, sslPrincipalMapper); return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize, - memoryPool != null ? memoryPool : MemoryPool.NONE); + memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry); } catch (Exception e) { log.info("Failed to create channel due to ", e); throw new KafkaException(e); @@ -108,9 +111,11 @@ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize @Override public void close() {} - protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, String host) throws IOException { + protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, + String host, ChannelMetadataRegistry metadataRegistry) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); - return SslTransportLayer.create(id, key, sslFactory.createSslEngine(host, socketChannel.socket().getPort())); + return SslTransportLayer.create(id, key, sslFactory.createSslEngine(host, socketChannel.socket().getPort()), + metadataRegistry); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 315b258114614..d8b0474c94339 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -25,7 +25,6 @@ import java.nio.channels.CancelledKeyException; import java.security.Principal; -import java.util.Optional; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; @@ -61,6 +60,7 @@ private enum State { private final SSLEngine sslEngine; private final SelectionKey key; private final SocketChannel socketChannel; + private final ChannelMetadataRegistry metadataRegistry; private final Logger log; private HandshakeStatus handshakeStatus; @@ -72,19 +72,21 @@ private enum State { private ByteBuffer appReadBuffer; private ByteBuffer fileChannelBuffer; private boolean hasBytesBuffered; - private CipherInformation cipherInformation; - public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { - return new SslTransportLayer(channelId, key, sslEngine); + public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine, + ChannelMetadataRegistry metadataRegistry) throws IOException { + return new SslTransportLayer(channelId, key, sslEngine, metadataRegistry); } // Prefer `create`, only use this in tests - SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) { + SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, + ChannelMetadataRegistry metadataRegistry) { this.channelId = channelId; this.key = key; this.socketChannel = (SocketChannel) key.channel(); this.sslEngine = sslEngine; this.state = State.NOT_INITALIZED; + this.metadataRegistry = metadataRegistry; final LogContext logContext = new LogContext(String.format("[SslTransportLayer channelId=%s key=%s] ", channelId, key)); this.log = logContext.logger(getClass()); @@ -428,17 +430,8 @@ private void handshakeFinished() throws IOException { SSLSession session = sslEngine.getSession(); log.debug("SSL handshake completed successfully with peerHost '{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'", session.getPeerHost(), session.getPeerPort(), peerPrincipal(), session.getCipherSuite()); - if (cipherInformation == null) { - String cipherSuiteName = session.getCipherSuite(); - if (cipherSuiteName == null || cipherSuiteName.isEmpty()) { - cipherSuiteName = "unknown"; - } - String protocolName = session.getProtocol(); - if (protocolName == null || protocolName.isEmpty()) { - protocolName = "unknown"; - } - cipherInformation = new CipherInformation(cipherSuiteName, protocolName); - } + metadataRegistry.registerCipherInformation( + new CipherInformation(session.getCipherSuite(), session.getProtocol())); } log.trace("SSLHandshake FINISHED channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} ", @@ -973,9 +966,4 @@ public long transferFrom(FileChannel fileChannel, long position, long count) thr throw e; } } - - @Override - public Optional cipherInformation() { - return Optional.ofNullable(cipherInformation); - } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java index 10e957e6d5700..b196c5be96c65 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java @@ -31,7 +31,6 @@ import java.nio.channels.GatheringByteChannel; import java.security.Principal; -import java.util.Optional; import org.apache.kafka.common.errors.AuthenticationException; @@ -114,10 +113,4 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan * @see FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel) */ long transferFrom(FileChannel fileChannel, long position, long count) throws IOException; - - /** - * Get information about the ciphers that this transport layer is using. If this transport layer is not - * using SSL, or the SSL handshake has not been completed, returns empty. - */ - Optional cipherInformation(); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java index 8e21f8e40be5e..71fbc33615046 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.message.ApiVersionsRequestData; +import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; @@ -38,19 +39,22 @@ public class RequestContext implements AuthorizableRequestContext { public final KafkaPrincipal principal; public final ListenerName listenerName; public final SecurityProtocol securityProtocol; + public final ClientInformation clientInformation; public RequestContext(RequestHeader header, String connectionId, InetAddress clientAddress, KafkaPrincipal principal, ListenerName listenerName, - SecurityProtocol securityProtocol) { + SecurityProtocol securityProtocol, + ClientInformation clientInformation) { this.header = header; this.connectionId = connectionId; this.clientAddress = clientAddress; this.principal = principal; this.listenerName = listenerName; this.securityProtocol = securityProtocol; + this.clientInformation = clientInformation; } public RequestAndSize parseRequest(ByteBuffer buffer) { diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index afaf6b728a112..240f777453b66 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -29,6 +29,8 @@ import org.apache.kafka.common.message.SaslHandshakeResponseData; import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.ChannelBuilders; +import org.apache.kafka.common.network.ChannelMetadataRegistry; +import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; @@ -126,6 +128,7 @@ private enum SaslState { private final Map connectionsMaxReauthMsByMechanism; private final Time time; private final ReauthInfo reauthInfo; + private final ChannelMetadataRegistry metadataRegistry; // Current SASL state private SaslState saslState = SaslState.INITIAL_REQUEST; @@ -152,6 +155,7 @@ public SaslServerAuthenticator(Map configs, SecurityProtocol securityProtocol, TransportLayer transportLayer, Map connectionsMaxReauthMsByMechanism, + ChannelMetadataRegistry metadataRegistry, Time time) { this.callbackHandlers = callbackHandlers; this.connectionId = connectionId; @@ -163,6 +167,7 @@ public SaslServerAuthenticator(Map configs, this.connectionsMaxReauthMsByMechanism = connectionsMaxReauthMsByMechanism; this.time = time; this.reauthInfo = new ReauthInfo(); + this.metadataRegistry = metadataRegistry; this.configs = configs; @SuppressWarnings("unchecked") @@ -426,7 +431,7 @@ private void handleSaslToken(byte[] clientToken) throws IOException { ApiKeys apiKey = header.apiKey(); short version = header.apiVersion(); RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), - KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol); + KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY); RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer); if (apiKey != ApiKeys.SASL_AUTHENTICATE) { IllegalSaslStateException e = new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL authentication."); @@ -512,7 +517,7 @@ private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, Auth RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), - KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol); + KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY); RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer); if (apiKey == ApiKeys.API_VERSIONS) handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request); @@ -585,6 +590,8 @@ private void handleApiVersionsRequest(RequestContext context, ApiVersionsRequest else if (!apiVersionsRequest.isValid()) sendKafkaResponse(context, apiVersionsRequest.getErrorResponse(0, Errors.INVALID_REQUEST.exception())); else { + metadataRegistry.registerClientInformation(new ClientInformation(apiVersionsRequest.data.clientSoftwareName(), + apiVersionsRequest.data.clientSoftwareVersion())); sendKafkaResponse(context, apiVersionsResponse()); setSaslState(SaslState.HANDSHAKE_REQUEST); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/KafkaChannelTest.java b/clients/src/test/java/org/apache/kafka/common/network/KafkaChannelTest.java index d8950a0527eed..c2115b3f3526b 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/KafkaChannelTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/KafkaChannelTest.java @@ -38,8 +38,10 @@ public void testSending() throws IOException { Authenticator authenticator = Mockito.mock(Authenticator.class); TransportLayer transport = Mockito.mock(TransportLayer.class); MemoryPool pool = Mockito.mock(MemoryPool.class); + ChannelMetadataRegistry metadataRegistry = Mockito.mock(ChannelMetadataRegistry.class); - KafkaChannel channel = new KafkaChannel("0", transport, () -> authenticator, 1024, pool); + KafkaChannel channel = new KafkaChannel("0", transport, () -> authenticator, + 1024, pool, metadataRegistry); NetworkSend send = new NetworkSend("0", ByteBuffer.wrap(TestUtils.randomBytes(128))); channel.setSend(send); @@ -67,13 +69,15 @@ public void testReceiving() throws IOException { Authenticator authenticator = Mockito.mock(Authenticator.class); TransportLayer transport = Mockito.mock(TransportLayer.class); MemoryPool pool = Mockito.mock(MemoryPool.class); + ChannelMetadataRegistry metadataRegistry = Mockito.mock(ChannelMetadataRegistry.class); ArgumentCaptor sizeCaptor = ArgumentCaptor.forClass(Integer.class); Mockito.when(pool.tryAllocate(sizeCaptor.capture())).thenAnswer(invocation -> { return ByteBuffer.allocate(sizeCaptor.getValue()); }); - KafkaChannel channel = new KafkaChannel("0", transport, () -> authenticator, 1024, pool); + KafkaChannel channel = new KafkaChannel("0", transport, () -> authenticator, + 1024, pool, metadataRegistry); ArgumentCaptor bufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class); Mockito.when(transport.read(bufferCaptor.capture())).thenAnswer(invocation -> { diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 116a7973cd1e8..70eb588299d06 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -371,7 +371,7 @@ public void registerFailure() throws Exception { ChannelBuilder channelBuilder = new PlaintextChannelBuilder(null) { @Override public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, - MemoryPool memoryPool) throws KafkaException { + MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException { throw new RuntimeException("Test exception"); } @Override @@ -746,6 +746,62 @@ public void testInboundConnectionsCountInConnectionCreationMetric() throws Excep assertEquals((double) conns, getMetric("connection-count").metricValue()); } + @Test + public void testConnectionsByClientMetric() throws Exception { + String node = "0"; + Map unknownNameAndVersion = softwareNameAndVersionTags( + ClientInformation.UNKNOWN_NAME_OR_VERSION, ClientInformation.UNKNOWN_NAME_OR_VERSION); + Map knownNameAndVersion = softwareNameAndVersionTags("A", "B"); + + try (ServerSocketChannel ss = ServerSocketChannel.open()) { + ss.bind(new InetSocketAddress(0)); + InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress(); + + Thread sender = createSender(serverAddress, randomPayload(1)); + sender.start(); + SocketChannel channel = ss.accept(); + channel.configureBlocking(false); + + // Metric with unknown / unknown should be there + selector.register(node, channel); + assertEquals(1, + getMetric("connections", unknownNameAndVersion).metricValue()); + assertEquals(ClientInformation.EMPTY, + selector.channel(node).channelMetadataRegistry().clientInformation()); + + // Metric with unknown / unknown should not be there, metric with A / B should be there + ClientInformation clientInformation = new ClientInformation("A", "B"); + selector.channel(node).channelMetadataRegistry() + .registerClientInformation(clientInformation); + assertEquals(clientInformation, + selector.channel(node).channelMetadataRegistry().clientInformation()); + assertEquals(0, getMetric("connections", unknownNameAndVersion).metricValue()); + assertEquals(1, getMetric("connections", knownNameAndVersion).metricValue()); + + // Metric with A / B should not be there, + selector.close(node); + assertEquals(0, getMetric("connections", knownNameAndVersion).metricValue()); + } + } + + private Map softwareNameAndVersionTags(String clientSoftwareName, String clientSoftwareVersion) { + Map tags = new HashMap<>(2); + tags.put("clientSoftwareName", clientSoftwareName); + tags.put("clientSoftwareVersion", clientSoftwareVersion); + return tags; + } + + private KafkaMetric getMetric(String name, Map tags) throws Exception { + Optional> metric = metrics.metrics().entrySet().stream() + .filter(entry -> + entry.getKey().name().equals(name) && entry.getKey().tags().equals(tags)) + .findFirst(); + if (!metric.isPresent()) + throw new Exception(String.format("Could not find metric called %s with tags %s", name, tags.toString())); + + return metric.get().getValue(); + } + @SuppressWarnings("unchecked") @Test public void testLowestPriorityChannel() throws Exception { diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 0172661fe7b5c..78174d9cac38c 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -129,6 +129,7 @@ public void testConnectionWithCustomKeyManager() throws Exception { TestUtils.waitForCondition(() -> cipherMetrics(metrics).size() == 1, "Waiting for cipher metrics to be created."); assertEquals(Integer.valueOf(1), cipherMetrics(metrics).get(0).metricValue()); + assertNotNull(selector.channel(node).channelMetadataRegistry().cipherInformation()); selector.close(node); super.verifySelectorEmpty(selector); @@ -368,10 +369,11 @@ public TestSslChannelBuilder(Mode mode) { } @Override - protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, String host) throws IOException { + protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, + String host, ChannelMetadataRegistry metadataRegistry) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); SSLEngine sslEngine = sslFactory.createSslEngine(host, socketChannel.socket().getPort()); - TestSslTransportLayer transportLayer = new TestSslTransportLayer(id, key, sslEngine); + TestSslTransportLayer transportLayer = new TestSslTransportLayer(id, key, sslEngine, metadataRegistry); return transportLayer; } @@ -383,8 +385,9 @@ static class TestSslTransportLayer extends SslTransportLayer { static Map transportLayers = new HashMap<>(); boolean muteSocket = false; - public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { - super(channelId, key, sslEngine); + public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, + ChannelMetadataRegistry metadataRegistry) throws IOException { + super(channelId, key, sslEngine, metadataRegistry); transportLayers.put(channelId, this); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index d25fa6124681c..653b85264438c 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -1114,7 +1114,8 @@ public void configureBufferSizes(Integer netReadBufSize, Integer netWriteBufSize } @Override - protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, String host) throws IOException { + protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, + String host, ChannelMetadataRegistry metadataRegistry) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); SSLEngine sslEngine = sslFactory.createSslEngine(host, socketChannel.socket().getPort()); TestSslTransportLayer transportLayer = newTransportLayer(id, key, sslEngine); @@ -1145,7 +1146,7 @@ class TestSslTransportLayer extends SslTransportLayer { private final AtomicInteger numDelayedFlushesRemaining; public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { - super(channelId, key, sslEngine); + super(channelId, key, sslEngine, new DefaultChannelMetadataRegistry()); this.netReadBufSize = new ResizeableBufferSize(netReadBufSizeOverride); this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSizeOverride); this.appBufSize = new ResizeableBufferSize(appBufSizeOverride); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java index 7d066250d2f2b..fd8f1e552a09d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; @@ -41,7 +42,7 @@ public void testSerdeUnsupportedApiVersionRequest() throws Exception { RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, Short.MAX_VALUE, "", correlationId); RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS, - new ListenerName("ssl"), SecurityProtocol.SASL_SSL); + new ListenerName("ssl"), SecurityProtocol.SASL_SSL, ClientInformation.EMPTY); assertEquals(0, context.apiVersion()); // Write some garbage to the request buffer. This should be ignored since we will treat diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 6d95d2e91d857..0c4122c76925e 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -62,6 +62,7 @@ import org.apache.kafka.common.network.CertStores; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.ChannelBuilders; +import org.apache.kafka.common.network.ChannelMetadataRegistry; import org.apache.kafka.common.network.ChannelState; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Mode; @@ -1747,8 +1748,10 @@ protected SaslServerAuthenticator buildServerAuthenticator(Map config String id, TransportLayer transportLayer, Map subjects, - Map connectionsMaxReauthMsByMechanism) { - return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, null, listenerName, securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, time) { + Map connectionsMaxReauthMsByMechanism, + ChannelMetadataRegistry metadataRegistry) { + return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, null, listenerName, + securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, metadataRegistry, time) { @Override protected ApiVersionsResponse apiVersionsResponse() { diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index 51204f9764d16..2778d73e3e431 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -16,18 +16,24 @@ */ package org.apache.kafka.common.security.authenticator; +import java.net.InetAddress; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.errors.IllegalSaslStateException; +import org.apache.kafka.common.network.ChannelMetadataRegistry; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.DefaultChannelMetadataRegistry; import org.apache.kafka.common.network.InvalidReceiveException; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Time; import org.junit.Test; @@ -37,8 +43,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.mockito.Answers; import static org.apache.kafka.common.security.scram.internals.ScramMechanism.SCRAM_SHA_256; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -53,7 +61,8 @@ public void testOversizeRequest() throws IOException { TransportLayer transportLayer = mock(TransportLayer.class); Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); - SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName()); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, + SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry()); when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> { invocation.getArgument(0).putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1); @@ -68,7 +77,8 @@ public void testUnexpectedRequestType() throws IOException { TransportLayer transportLayer = mock(TransportLayer.class); Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); - SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName()); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, + SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry()); final RequestHeader header = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", 13243); final Struct headerStruct = header.toStruct(); @@ -92,7 +102,54 @@ public void testUnexpectedRequestType() throws IOException { verify(transportLayer, times(2)).read(any(ByteBuffer.class)); } - private SaslServerAuthenticator setupAuthenticator(Map configs, TransportLayer transportLayer, String mechanism) throws IOException { + @Test + public void testOldestApiVersionsRequest() throws IOException { + testApiVersionsRequest(ApiKeys.API_VERSIONS.oldestVersion(), + ClientInformation.UNKNOWN_NAME_OR_VERSION, ClientInformation.UNKNOWN_NAME_OR_VERSION); + } + + @Test + public void testLatestApiVersionsRequest() throws IOException { + testApiVersionsRequest(ApiKeys.API_VERSIONS.latestVersion(), + "apache-kafka-java", AppInfoParser.getVersion()); + } + + public void testApiVersionsRequest(short version, String expectedSoftwareName, + String expectedSoftwareVersion) throws IOException { + TransportLayer transportLayer = mock(TransportLayer.class, Answers.RETURNS_DEEP_STUBS); + Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, + Collections.singletonList(SCRAM_SHA_256.mechanismName())); + ChannelMetadataRegistry metadataRegistry = new DefaultChannelMetadataRegistry(); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, + SCRAM_SHA_256.mechanismName(), metadataRegistry); + + final RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, version, "clientId", 0); + final Struct headerStruct = header.toStruct(); + + final ApiVersionsRequest request = new ApiVersionsRequest.Builder().build(version); + final Struct requestStruct = request.data.toStruct(version); + + when(transportLayer.socketChannel().socket().getInetAddress()).thenReturn(InetAddress.getLoopbackAddress()); + + when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> { + invocation.getArgument(0).putInt(headerStruct.sizeOf() + requestStruct.sizeOf()); + return 4; + }).then(invocation -> { + headerStruct.writeTo(invocation.getArgument(0)); + requestStruct.writeTo(invocation.getArgument(0)); + return headerStruct.sizeOf() + requestStruct.sizeOf(); + }); + + authenticator.authenticate(); + + assertEquals(expectedSoftwareName, metadataRegistry.clientInformation().softwareName()); + assertEquals(expectedSoftwareVersion, metadataRegistry.clientInformation().softwareVersion()); + + verify(transportLayer, times(2)).read(any(ByteBuffer.class)); + } + + private SaslServerAuthenticator setupAuthenticator(Map configs, TransportLayer transportLayer, + String mechanism, ChannelMetadataRegistry metadataRegistry) throws IOException { TestJaasConfig jaasConfig = new TestJaasConfig(); jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap()); Map jaasContexts = Collections.singletonMap(mechanism, @@ -101,7 +158,8 @@ private SaslServerAuthenticator setupAuthenticator(Map configs, Trans Map callbackHandlers = Collections.singletonMap( mechanism, new SaslServerCallbackHandler()); return new SaslServerAuthenticator(configs, callbackHandlers, "node", subjects, null, - new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, Collections.emptyMap(), Time.SYSTEM); + new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, Collections.emptyMap(), + metadataRegistry, Time.SYSTEM); } } diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala index 92cdb9ee02396..158ea01869c2a 100644 --- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala +++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala @@ -162,8 +162,8 @@ object BrokerApiVersionsCommand { private def getApiVersions(node: Node): ApiVersionsResponseKeyCollection = { val response = send(node, ApiKeys.API_VERSIONS, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse] - Errors.forCode(response.data.errorCode()).maybeThrow() - response.data.apiKeys() + Errors.forCode(response.data.errorCode).maybeThrow() + response.data.apiKeys } /** diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 4a8c290d945e8..08db320b563f4 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -197,6 +197,7 @@ object RequestChannel extends Logging { .append(",securityProtocol:").append(context.securityProtocol) .append(",principal:").append(session.principal) .append(",listener:").append(context.listenerName.value) + .append(",clientInformation:").append(context.clientInformation) if (temporaryMemoryBytes > 0) builder.append(",temporaryMemoryBytes:").append(temporaryMemoryBytes) if (messageConversionsTimeMs > 0) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index a147aaa3722f5..95e4450b87259 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -41,9 +41,11 @@ import org.apache.kafka.common.{Endpoint, KafkaException, Reconfigurable} import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool} import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.{CumulativeSum, Meter} +import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, ListenerReconfigurable, Selectable, Send, Selector => KSelector} import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.ApiVersionsRequest import org.apache.kafka.common.requests.{RequestContext, RequestHeader} import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time} @@ -902,7 +904,7 @@ private[kafka] class Processor(val id: Int, openOrClosingChannel(receive.source) match { case Some(channel) => val header = RequestHeader.parse(receive.payload) - if (header.apiKey() == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier)) + if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier)) trace(s"Begin re-authentication: $channel") else { val nowNanos = time.nanoseconds() @@ -914,9 +916,20 @@ private[kafka] class Processor(val id: Int, } else { val connectionId = receive.source val context = new RequestContext(header, connectionId, channel.socketAddress, - channel.principal, listenerName, securityProtocol) + channel.principal, listenerName, securityProtocol, + channel.channelMetadataRegistry.clientInformation) val req = new RequestChannel.Request(processor = id, context = context, startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics) + // KIP-511: ApiVersionsRequest is intercepted here to catch the client software name + // and version. It is done here to avoid wiring things up to the api layer. + if (header.apiKey == ApiKeys.API_VERSIONS) { + val apiVersionsRequest = req.body[ApiVersionsRequest] + if (apiVersionsRequest.isValid) { + channel.channelMetadataRegistry.registerClientInformation(new ClientInformation( + apiVersionsRequest.data.clientSoftwareName, + apiVersionsRequest.data.clientSoftwareVersion)) + } + } requestChannel.sendRequest(req) selector.mute(connectionId) handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 2325ee857d54e..76d835922af10 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -36,13 +36,15 @@ import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.message.SaslAuthenticateRequestData import org.apache.kafka.common.message.SaslHandshakeRequestData import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.KafkaChannel.ChannelMuteState import org.apache.kafka.common.network.{ChannelBuilder, ChannelState, KafkaChannel, ListenerName, NetworkReceive, NetworkSend, Selector, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader, SaslAuthenticateRequest, SaslHandshakeRequest} +import org.apache.kafka.common.requests.{AbstractRequest, ApiVersionsRequest, ProduceRequest, RequestHeader, SaslAuthenticateRequest, SaslHandshakeRequest} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.scram.internals.ScramMechanism +import org.apache.kafka.common.utils.AppInfoParser import org.apache.kafka.common.utils.{LogContext, MockTime, Time} import org.apache.log4j.Level import org.junit.Assert._ @@ -61,7 +63,7 @@ class SocketServerTest { props.put("socket.send.buffer.bytes", "300000") props.put("socket.receive.buffer.bytes", "300000") props.put("queued.max.requests", "50") - props.put("socket.request.max.bytes", "50") + props.put("socket.request.max.bytes", "100") props.put("max.connections.per.ip", "5") props.put("connections.max.idle.ms", "60000") val config = KafkaConfig.fromProps(props) @@ -188,6 +190,16 @@ class SocketServerTest { serializedBytes } + private def apiVersionRequestBytes(clientId: String, version: Short): Array[Byte] = { + val request = new ApiVersionsRequest.Builder().build(version) + val header = new RequestHeader(ApiKeys.API_VERSIONS, request.version(), clientId, -1) + val buffer = request.serialize(header) + buffer.rewind() + val bytes = new Array[Byte](buffer.remaining()) + buffer.get(bytes) + bytes + } + @Test def simpleRequest(): Unit = { val plainSocket = connect() @@ -200,6 +212,56 @@ class SocketServerTest { verifyAcceptorBlockedPercent("PLAINTEXT", expectBlocked = false) } + + private def testClientInformation(version: Short, expectedClientSoftwareName: String, + expectedClientSoftwareVersion: String): Unit = { + val plainSocket = connect() + val address = plainSocket.getLocalAddress + val clientId = "clientId" + + // Send ApiVersionsRequest - unknown expected + sendRequest(plainSocket, apiVersionRequestBytes(clientId, version)) + var receivedReq = receiveRequest(server.dataPlaneRequestChannel) + + assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, receivedReq.context.clientInformation.softwareName) + assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, receivedReq.context.clientInformation.softwareVersion) + + server.dataPlaneRequestChannel.sendResponse(new RequestChannel.NoOpResponse(receivedReq)) + + // Send ProduceRequest - client info expected + sendRequest(plainSocket, producerRequestBytes()) + receivedReq = receiveRequest(server.dataPlaneRequestChannel) + + assertEquals(expectedClientSoftwareName, receivedReq.context.clientInformation.softwareName) + assertEquals(expectedClientSoftwareVersion, receivedReq.context.clientInformation.softwareVersion) + + server.dataPlaneRequestChannel.sendResponse(new RequestChannel.NoOpResponse(receivedReq)) + + // Close the socket + plainSocket.setSoLinger(true, 0) + plainSocket.close() + + TestUtils.waitUntilTrue(() => server.connectionCount(address) == 0, msg = "Connection not closed") + } + + @Test + def testClientInformationWithLatestApiVersionsRequest(): Unit = { + testClientInformation( + ApiKeys.API_VERSIONS.latestVersion, + "apache-kafka-java", + AppInfoParser.getVersion + ) + } + + @Test + def testClientInformationWithOldestApiVersionsRequest(): Unit = { + testClientInformation( + ApiKeys.API_VERSIONS.oldestVersion, + ClientInformation.UNKNOWN_NAME_OR_VERSION, + ClientInformation.UNKNOWN_NAME_OR_VERSION + ) + } + @Test def testStagedListenerStartup(): Unit = { val testProps = new Properties diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala index 6250958596620..2fb4aca3792b3 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala @@ -32,6 +32,7 @@ import org.apache.kafka.common.acl._ import org.apache.kafka.common.acl.AclOperation._ import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} import org.apache.kafka.common.errors.{ApiException, UnsupportedVersionException} +import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{RequestContext, RequestHeader} @@ -832,7 +833,8 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { private def newRequestContext(principal: KafkaPrincipal, clientAddress: InetAddress, apiKey: ApiKeys = ApiKeys.PRODUCE): RequestContext = { val securityProtocol = SecurityProtocol.SASL_PLAINTEXT val header = new RequestHeader(apiKey, 2, "", 1) //ApiKeys apiKey, short version, String clientId, int correlation - new RequestContext(header, "", clientAddress, principal, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) + new RequestContext(header, "", clientAddress, principal, ListenerName.forSecurityProtocol(securityProtocol), + securityProtocol, ClientInformation.EMPTY) } private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 828fb4841a271..f1eafd3cceeb4 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -26,6 +26,7 @@ import kafka.server.QuotaType._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota} +import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader} @@ -65,7 +66,7 @@ class ClientQuotaManagerTest { // read the header from the buffer first so that the body can be read next from the Request constructor val header = RequestHeader.parse(buffer) val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, - listenerName, SecurityProtocol.PLAINTEXT) + listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY) (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer, requestChannelMetrics)) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 4c438beb1dc59..febcef663c72d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -44,6 +44,7 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequ import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} import org.apache.kafka.common.message._ import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.FileRecords.TimestampAndOffset @@ -1011,7 +1012,7 @@ class KafkaApisTest { // read the header from the buffer first so that the body can be read next from the Request constructor val header = RequestHeader.parse(buffer) val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, - listenerName, SecurityProtocol.PLAINTEXT) + listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY) (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer, requestChannelMetrics)) } diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 88d98c8a67fc2..6d1f8cafeae18 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -68,7 +68,7 @@ class SaslApiVersionsRequestTest extends AbstractApiVersionsRequestTest with Sas sendSaslHandshakeRequestValidateResponse(socket) val response = sendAndReceive[ApiVersionsResponse]( new ApiVersionsRequest.Builder().build(0), socket) - assertEquals(Errors.ILLEGAL_SASL_STATE.code(), response.data.errorCode()) + assertEquals(Errors.ILLEGAL_SASL_STATE.code, response.data.errorCode) } finally { socket.close() } @@ -80,7 +80,7 @@ class SaslApiVersionsRequestTest extends AbstractApiVersionsRequestTest with Sas try { val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0) val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest) - assertEquals(Errors.UNSUPPORTED_VERSION.code(), apiVersionsResponse.data.errorCode()) + assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode) val apiVersionsResponse2 = sendAndReceive[ApiVersionsResponse]( new ApiVersionsRequest.Builder().build(0), socket) validateApiVersionsResponse(apiVersionsResponse2) diff --git a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala index 15bbcc6e2b906..bffbeec449ab1 100644 --- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala @@ -28,6 +28,7 @@ import kafka.network.RequestChannel.{EndThrottlingResponse, Response, StartThrot import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.metrics.MetricConfig +import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader} @@ -55,7 +56,7 @@ class ThrottledChannelExpirationTest { // read the header from the buffer first so that the body can be read next from the Request constructor val header = RequestHeader.parse(buffer) val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, - listenerName, SecurityProtocol.PLAINTEXT) + listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY) (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer, requestChannelMetrics)) }