From 0339513a6a6b15a72ae691d000425b2ae48d8e09 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 26 Nov 2019 14:19:45 +0100 Subject: [PATCH 1/4] KAFKA-8855; Collect and Expose Client's Name and Version in the Brokers (KIP-511 Part 2) --- .../kafka/common/network/Authenticator.java | 10 +++ .../common/network/ClientInformation.java | 65 +++++++++++++++++ .../kafka/common/network/KafkaChannel.java | 37 ++++++++++ .../apache/kafka/common/network/Selector.java | 20 ++++++ .../kafka/common/requests/RequestContext.java | 6 +- .../SaslServerAuthenticator.java | 16 ++++- .../kafka/common/network/SelectorTest.java | 49 +++++++++++++ .../common/requests/RequestContextTest.java | 3 +- .../SaslServerAuthenticatorTest.java | 72 +++++++++++++++++++ .../admin/BrokerApiVersionsCommand.scala | 4 +- .../scala/kafka/network/RequestChannel.scala | 2 + .../scala/kafka/network/SocketServer.scala | 15 +++- .../unit/kafka/network/SocketServerTest.scala | 66 ++++++++++++++++- .../authorizer/AclAuthorizerTest.scala | 4 +- .../kafka/server/ClientQuotaManagerTest.scala | 3 +- .../unit/kafka/server/KafkaApisTest.scala | 3 +- .../server/SaslApiVersionsRequestTest.scala | 4 +- .../ThrottledChannelExpirationTest.scala | 3 +- 18 files changed, 366 insertions(+), 16 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java index bcb011e830bfd..2f82adce303af 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.network; +import java.util.Optional; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.security.auth.KafkaPrincipal; @@ -52,6 +53,15 @@ default void handleAuthenticationFailure() throws IOException { */ KafkaPrincipal principal(); + /** + * Returns the ClientInformation extracted from the ApiVersionsRequest which may + * have been received by the Authenticator or an empty Option if the remote end + * did not provide it. + */ + default Optional clientInformation() { + return Optional.empty(); + } + /** * returns true if authentication is complete otherwise returns false; */ diff --git a/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java b/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java new file mode 100644 index 0000000000000..cb99a8669e40f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + +import java.util.Objects; + +public class ClientInformation { + public static final String UNKNOWN_NAME_OR_VERSION = "unknown"; + public static final ClientInformation EMPTY = new ClientInformation(UNKNOWN_NAME_OR_VERSION, UNKNOWN_NAME_OR_VERSION); + + private final String softwareName; + private final String softwareVersion; + + public ClientInformation(String softwareName, String softwareVersion) { + this.softwareName = softwareName.isEmpty() ? UNKNOWN_NAME_OR_VERSION : softwareName; + this.softwareVersion = softwareVersion.isEmpty() ? UNKNOWN_NAME_OR_VERSION : softwareVersion; + } + + public String softwareName() { + return this.softwareName; + } + + public String softwareVersion() { + return this.softwareVersion; + } + + @Override + public String toString() { + return "ClientInformation(softwareName=" + softwareName + + ", softwareVersion=" + softwareVersion + ")"; + } + + @Override + public int hashCode() { + return Objects.hash(softwareName, softwareVersion); + } + + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + if (!(o instanceof ClientInformation)) { + return false; + } + ClientInformation other = (ClientInformation) o; + return other.softwareName.equals(softwareName) && + other.softwareVersion.equals(softwareVersion); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 8651835eac3d7..3c38a30fb0b09 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -116,7 +116,9 @@ public enum ChannelMuteEvent { private final String id; private final TransportLayer transportLayer; private final Supplier authenticatorCreator; + private Selector selector; private Authenticator authenticator; + private ClientInformation clientInformation; // Tracks accumulated network thread time. This is updated on the network thread. // The values are read and reset after each response is sent. private long networkThreadTimeNanos; @@ -145,6 +147,7 @@ public KafkaChannel(String id, TransportLayer transportLayer, Supplier connectionsByCipher; + public final IntGaugeSuite connectionsByClient; /* Names of metrics that are not registered through sensors */ private final List topLevelMetricNames = new ArrayList<>(); @@ -1199,6 +1209,15 @@ public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map(log, "clients", metrics, + clientInformation -> { + Map tags = new LinkedHashMap<>(); + tags.put("clientSoftwareName", clientInformation.softwareName()); + tags.put("clientSoftwareVersion", clientInformation.softwareVersion()); + tags.putAll(metricTags); + return metrics.metricName("connections", metricGrpName, "The number of connections with this client and version.", tags); + }, 100); + metricName = metrics.metricName("connection-count", metricGrpName, "The current number of active connections.", metricTags); topLevelMetricNames.add(metricName); this.metrics.addMetric(metricName, (config, now) -> channels.size()); @@ -1322,6 +1341,7 @@ public void close() { for (Sensor sensor : sensors) metrics.removeSensor(sensor.name()); connectionsByCipher.close(); + connectionsByClient.close(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java index 8e21f8e40be5e..71fbc33615046 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.message.ApiVersionsRequestData; +import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; @@ -38,19 +39,22 @@ public class RequestContext implements AuthorizableRequestContext { public final KafkaPrincipal principal; public final ListenerName listenerName; public final SecurityProtocol securityProtocol; + public final ClientInformation clientInformation; public RequestContext(RequestHeader header, String connectionId, InetAddress clientAddress, KafkaPrincipal principal, ListenerName listenerName, - SecurityProtocol securityProtocol) { + SecurityProtocol securityProtocol, + ClientInformation clientInformation) { this.header = header; this.connectionId = connectionId; this.clientAddress = clientAddress; this.principal = principal; this.listenerName = listenerName; this.securityProtocol = securityProtocol; + this.clientInformation = clientInformation; } public RequestAndSize parseRequest(ByteBuffer buffer) { diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index afaf6b728a112..1a3675ea0d120 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.security.authenticator; +import java.util.Optional; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; @@ -29,6 +30,7 @@ import org.apache.kafka.common.message.SaslHandshakeResponseData; import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.ChannelBuilders; +import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; @@ -143,6 +145,9 @@ private enum SaslState { // flag indicating if sasl tokens are sent as Kafka SaslAuthenticate request/responses private boolean enableKafkaSaslAuthenticateHeaders; + // Client information extracted from the ApiVersionsRequest if provided + private Optional clientInformation = Optional.empty(); + public SaslServerAuthenticator(Map configs, Map callbackHandlers, String connectionId, @@ -318,6 +323,11 @@ public KafkaPrincipal principal() { return principal; } + @Override + public Optional clientInformation() { + return clientInformation; + } + @Override public boolean complete() { return saslState == SaslState.COMPLETE; @@ -426,7 +436,7 @@ private void handleSaslToken(byte[] clientToken) throws IOException { ApiKeys apiKey = header.apiKey(); short version = header.apiVersion(); RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), - KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol); + KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY); RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer); if (apiKey != ApiKeys.SASL_AUTHENTICATE) { IllegalSaslStateException e = new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL authentication."); @@ -512,7 +522,7 @@ private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, Auth RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), - KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol); + KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY); RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer); if (apiKey == ApiKeys.API_VERSIONS) handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request); @@ -585,6 +595,8 @@ private void handleApiVersionsRequest(RequestContext context, ApiVersionsRequest else if (!apiVersionsRequest.isValid()) sendKafkaResponse(context, apiVersionsRequest.getErrorResponse(0, Errors.INVALID_REQUEST.exception())); else { + clientInformation = Optional.of(new ClientInformation(apiVersionsRequest.data.clientSoftwareName(), + apiVersionsRequest.data.clientSoftwareVersion())); sendKafkaResponse(context, apiVersionsResponse()); setSaslState(SaslState.HANDSHAKE_REQUEST); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 116a7973cd1e8..ab4e8de55da0d 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -682,6 +682,7 @@ public void testConnectDisconnectDuringInSinglePoll() throws Exception { when(kafkaChannel.finishConnect()).thenReturn(true); when(kafkaChannel.isConnected()).thenReturn(true); when(kafkaChannel.ready()).thenReturn(false); + when(kafkaChannel.clientInformation()).thenReturn(ClientInformation.EMPTY); doThrow(new IOException()).when(kafkaChannel).prepare(); SelectionKey selectionKey = mock(SelectionKey.class); @@ -746,6 +747,54 @@ public void testInboundConnectionsCountInConnectionCreationMetric() throws Excep assertEquals((double) conns, getMetric("connection-count").metricValue()); } + @Test + public void testConnectionsByClientMetric() throws Exception { + String node = "0"; + Map unknownNameAndVersion = softwareNameAndVersionTags("unknown", "unknown"); + Map knownNameAndVersion = softwareNameAndVersionTags("A", "B"); + + try (ServerSocketChannel ss = ServerSocketChannel.open()) { + ss.bind(new InetSocketAddress(0)); + InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress(); + + Thread sender = createSender(serverAddress, randomPayload(1)); + sender.start(); + SocketChannel channel = ss.accept(); + channel.configureBlocking(false); + + // Metric with unknown / unknown should be there + selector.register(node, channel); + assertEquals(1, getMetric("connections", unknownNameAndVersion).metricValue()); + + // Metric with unknown / unknown should not be there, metric with A / B should be there + selector.channel(node).updateClientInformation(new ClientInformation("A", "B")); + assertEquals(0, getMetric("connections", unknownNameAndVersion).metricValue()); + assertEquals(1, getMetric("connections", knownNameAndVersion).metricValue()); + + // Metric with A / B should not be there, + selector.close(node); + assertEquals(0, getMetric("connections", knownNameAndVersion).metricValue()); + } + } + + private Map softwareNameAndVersionTags(String clientSoftwareName, String clientSoftwareVersion) { + Map tags = new HashMap<>(2); + tags.put("clientSoftwareName", clientSoftwareName); + tags.put("clientSoftwareVersion", clientSoftwareVersion); + return tags; + } + + private KafkaMetric getMetric(String name, Map tags) throws Exception { + Optional> metric = metrics.metrics().entrySet().stream() + .filter(entry -> + entry.getKey().name().equals(name) && entry.getKey().tags().equals(tags)) + .findFirst(); + if (!metric.isPresent()) + throw new Exception(String.format("Could not find metric called %s with tags %s", name, tags.toString())); + + return metric.get().getValue(); + } + @SuppressWarnings("unchecked") @Test public void testLowestPriorityChannel() throws Exception { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java index 7d066250d2f2b..fd8f1e552a09d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection; +import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; @@ -41,7 +42,7 @@ public void testSerdeUnsupportedApiVersionRequest() throws Exception { RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, Short.MAX_VALUE, "", correlationId); RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS, - new ListenerName("ssl"), SecurityProtocol.SASL_SSL); + new ListenerName("ssl"), SecurityProtocol.SASL_SSL, ClientInformation.EMPTY); assertEquals(0, context.apiVersion()); // Write some garbage to the request buffer. This should be ignored since we will treat diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index 51204f9764d16..66482eab206a2 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -16,18 +16,22 @@ */ package org.apache.kafka.common.security.authenticator; +import java.net.InetAddress; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.errors.IllegalSaslStateException; +import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.InvalidReceiveException; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Time; import org.junit.Test; @@ -37,8 +41,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.mockito.Answers; import static org.apache.kafka.common.security.scram.internals.ScramMechanism.SCRAM_SHA_256; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -92,6 +98,72 @@ public void testUnexpectedRequestType() throws IOException { verify(transportLayer, times(2)).read(any(ByteBuffer.class)); } + @Test + public void testOldestApiVersionsRequest() throws IOException { + final short version = ApiKeys.API_VERSIONS.oldestVersion(); + TransportLayer transportLayer = mock(TransportLayer.class, Answers.RETURNS_DEEP_STUBS); + Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, + Collections.singletonList(SCRAM_SHA_256.mechanismName())); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName()); + + final RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, version, "clientId", 0); + final Struct headerStruct = header.toStruct(); + + final ApiVersionsRequest request = new ApiVersionsRequest.Builder().build(version); + final Struct requestStruct = request.data.toStruct(version); + + when(transportLayer.socketChannel().socket().getInetAddress()).thenReturn(InetAddress.getLoopbackAddress()); + + when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> { + invocation.getArgument(0).putInt(headerStruct.sizeOf() + requestStruct.sizeOf()); + return 4; + }).then(invocation -> { + headerStruct.writeTo(invocation.getArgument(0)); + requestStruct.writeTo(invocation.getArgument(0)); + return headerStruct.sizeOf() + requestStruct.sizeOf(); + }); + + authenticator.authenticate(); + + assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, authenticator.clientInformation().get().softwareName()); + assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, authenticator.clientInformation().get().softwareVersion()); + + verify(transportLayer, times(2)).read(any(ByteBuffer.class)); + } + + @Test + public void testLatestApiVersionsRequest() throws IOException { + final short version = ApiKeys.API_VERSIONS.latestVersion(); + TransportLayer transportLayer = mock(TransportLayer.class, Answers.RETURNS_DEEP_STUBS); + Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, + Collections.singletonList(SCRAM_SHA_256.mechanismName())); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName()); + + final RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, version, "clientId", 0); + final Struct headerStruct = header.toStruct(); + + final ApiVersionsRequest request = new ApiVersionsRequest.Builder().build(version); + final Struct requestStruct = request.data.toStruct(version); + + when(transportLayer.socketChannel().socket().getInetAddress()).thenReturn(InetAddress.getLoopbackAddress()); + + when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> { + invocation.getArgument(0).putInt(headerStruct.sizeOf() + requestStruct.sizeOf()); + return 4; + }).then(invocation -> { + headerStruct.writeTo(invocation.getArgument(0)); + requestStruct.writeTo(invocation.getArgument(0)); + return headerStruct.sizeOf() + requestStruct.sizeOf(); + }); + + authenticator.authenticate(); + + assertEquals("apache-kafka-java", authenticator.clientInformation().get().softwareName()); + assertEquals(AppInfoParser.getVersion(), authenticator.clientInformation().get().softwareVersion()); + + verify(transportLayer, times(2)).read(any(ByteBuffer.class)); + } + private SaslServerAuthenticator setupAuthenticator(Map configs, TransportLayer transportLayer, String mechanism) throws IOException { TestJaasConfig jaasConfig = new TestJaasConfig(); jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap()); diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala index 92cdb9ee02396..158ea01869c2a 100644 --- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala +++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala @@ -162,8 +162,8 @@ object BrokerApiVersionsCommand { private def getApiVersions(node: Node): ApiVersionsResponseKeyCollection = { val response = send(node, ApiKeys.API_VERSIONS, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse] - Errors.forCode(response.data.errorCode()).maybeThrow() - response.data.apiKeys() + Errors.forCode(response.data.errorCode).maybeThrow() + response.data.apiKeys } /** diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 4a8c290d945e8..5a5314cfebe21 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -197,6 +197,8 @@ object RequestChannel extends Logging { .append(",securityProtocol:").append(context.securityProtocol) .append(",principal:").append(session.principal) .append(",listener:").append(context.listenerName.value) + .append(",clientSoftwareName:").append(context.clientInformation.softwareName) + .append(",clientSoftwareVersion:").append(context.clientInformation.softwareVersion) if (temporaryMemoryBytes > 0) builder.append(",temporaryMemoryBytes:").append(temporaryMemoryBytes) if (messageConversionsTimeMs > 0) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index a147aaa3722f5..b16fc691a96e0 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -41,9 +41,11 @@ import org.apache.kafka.common.{Endpoint, KafkaException, Reconfigurable} import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool} import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.{CumulativeSum, Meter} +import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, ListenerReconfigurable, Selectable, Send, Selector => KSelector} import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.ApiVersionsRequest import org.apache.kafka.common.requests.{RequestContext, RequestHeader} import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time} @@ -902,7 +904,7 @@ private[kafka] class Processor(val id: Int, openOrClosingChannel(receive.source) match { case Some(channel) => val header = RequestHeader.parse(receive.payload) - if (header.apiKey() == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier)) + if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier)) trace(s"Begin re-authentication: $channel") else { val nowNanos = time.nanoseconds() @@ -914,9 +916,18 @@ private[kafka] class Processor(val id: Int, } else { val connectionId = receive.source val context = new RequestContext(header, connectionId, channel.socketAddress, - channel.principal, listenerName, securityProtocol) + channel.principal, listenerName, securityProtocol, + channel.clientInformation) val req = new RequestChannel.Request(processor = id, context = context, startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics) + if (header.apiKey == ApiKeys.API_VERSIONS) { + val apiVersionsRequest = req.body[ApiVersionsRequest] + if (apiVersionsRequest.isValid) { + channel.updateClientInformation(new ClientInformation( + apiVersionsRequest.data.clientSoftwareName, + apiVersionsRequest.data.clientSoftwareVersion)) + } + } requestChannel.sendRequest(req) selector.mute(connectionId) handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 2325ee857d54e..76d835922af10 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -36,13 +36,15 @@ import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.message.SaslAuthenticateRequestData import org.apache.kafka.common.message.SaslHandshakeRequestData import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.KafkaChannel.ChannelMuteState import org.apache.kafka.common.network.{ChannelBuilder, ChannelState, KafkaChannel, ListenerName, NetworkReceive, NetworkSend, Selector, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader, SaslAuthenticateRequest, SaslHandshakeRequest} +import org.apache.kafka.common.requests.{AbstractRequest, ApiVersionsRequest, ProduceRequest, RequestHeader, SaslAuthenticateRequest, SaslHandshakeRequest} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.scram.internals.ScramMechanism +import org.apache.kafka.common.utils.AppInfoParser import org.apache.kafka.common.utils.{LogContext, MockTime, Time} import org.apache.log4j.Level import org.junit.Assert._ @@ -61,7 +63,7 @@ class SocketServerTest { props.put("socket.send.buffer.bytes", "300000") props.put("socket.receive.buffer.bytes", "300000") props.put("queued.max.requests", "50") - props.put("socket.request.max.bytes", "50") + props.put("socket.request.max.bytes", "100") props.put("max.connections.per.ip", "5") props.put("connections.max.idle.ms", "60000") val config = KafkaConfig.fromProps(props) @@ -188,6 +190,16 @@ class SocketServerTest { serializedBytes } + private def apiVersionRequestBytes(clientId: String, version: Short): Array[Byte] = { + val request = new ApiVersionsRequest.Builder().build(version) + val header = new RequestHeader(ApiKeys.API_VERSIONS, request.version(), clientId, -1) + val buffer = request.serialize(header) + buffer.rewind() + val bytes = new Array[Byte](buffer.remaining()) + buffer.get(bytes) + bytes + } + @Test def simpleRequest(): Unit = { val plainSocket = connect() @@ -200,6 +212,56 @@ class SocketServerTest { verifyAcceptorBlockedPercent("PLAINTEXT", expectBlocked = false) } + + private def testClientInformation(version: Short, expectedClientSoftwareName: String, + expectedClientSoftwareVersion: String): Unit = { + val plainSocket = connect() + val address = plainSocket.getLocalAddress + val clientId = "clientId" + + // Send ApiVersionsRequest - unknown expected + sendRequest(plainSocket, apiVersionRequestBytes(clientId, version)) + var receivedReq = receiveRequest(server.dataPlaneRequestChannel) + + assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, receivedReq.context.clientInformation.softwareName) + assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, receivedReq.context.clientInformation.softwareVersion) + + server.dataPlaneRequestChannel.sendResponse(new RequestChannel.NoOpResponse(receivedReq)) + + // Send ProduceRequest - client info expected + sendRequest(plainSocket, producerRequestBytes()) + receivedReq = receiveRequest(server.dataPlaneRequestChannel) + + assertEquals(expectedClientSoftwareName, receivedReq.context.clientInformation.softwareName) + assertEquals(expectedClientSoftwareVersion, receivedReq.context.clientInformation.softwareVersion) + + server.dataPlaneRequestChannel.sendResponse(new RequestChannel.NoOpResponse(receivedReq)) + + // Close the socket + plainSocket.setSoLinger(true, 0) + plainSocket.close() + + TestUtils.waitUntilTrue(() => server.connectionCount(address) == 0, msg = "Connection not closed") + } + + @Test + def testClientInformationWithLatestApiVersionsRequest(): Unit = { + testClientInformation( + ApiKeys.API_VERSIONS.latestVersion, + "apache-kafka-java", + AppInfoParser.getVersion + ) + } + + @Test + def testClientInformationWithOldestApiVersionsRequest(): Unit = { + testClientInformation( + ApiKeys.API_VERSIONS.oldestVersion, + ClientInformation.UNKNOWN_NAME_OR_VERSION, + ClientInformation.UNKNOWN_NAME_OR_VERSION + ) + } + @Test def testStagedListenerStartup(): Unit = { val testProps = new Properties diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala index 6250958596620..2fb4aca3792b3 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala @@ -32,6 +32,7 @@ import org.apache.kafka.common.acl._ import org.apache.kafka.common.acl.AclOperation._ import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} import org.apache.kafka.common.errors.{ApiException, UnsupportedVersionException} +import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{RequestContext, RequestHeader} @@ -832,7 +833,8 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { private def newRequestContext(principal: KafkaPrincipal, clientAddress: InetAddress, apiKey: ApiKeys = ApiKeys.PRODUCE): RequestContext = { val securityProtocol = SecurityProtocol.SASL_PLAINTEXT val header = new RequestHeader(apiKey, 2, "", 1) //ApiKeys apiKey, short version, String clientId, int correlation - new RequestContext(header, "", clientAddress, principal, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) + new RequestContext(header, "", clientAddress, principal, ListenerName.forSecurityProtocol(securityProtocol), + securityProtocol, ClientInformation.EMPTY) } private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = { diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 828fb4841a271..f1eafd3cceeb4 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -26,6 +26,7 @@ import kafka.server.QuotaType._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota} +import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader} @@ -65,7 +66,7 @@ class ClientQuotaManagerTest { // read the header from the buffer first so that the body can be read next from the Request constructor val header = RequestHeader.parse(buffer) val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, - listenerName, SecurityProtocol.PLAINTEXT) + listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY) (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer, requestChannelMetrics)) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 4c438beb1dc59..febcef663c72d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -44,6 +44,7 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequ import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} import org.apache.kafka.common.message._ import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.FileRecords.TimestampAndOffset @@ -1011,7 +1012,7 @@ class KafkaApisTest { // read the header from the buffer first so that the body can be read next from the Request constructor val header = RequestHeader.parse(buffer) val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, - listenerName, SecurityProtocol.PLAINTEXT) + listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY) (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer, requestChannelMetrics)) } diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 88d98c8a67fc2..6d1f8cafeae18 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -68,7 +68,7 @@ class SaslApiVersionsRequestTest extends AbstractApiVersionsRequestTest with Sas sendSaslHandshakeRequestValidateResponse(socket) val response = sendAndReceive[ApiVersionsResponse]( new ApiVersionsRequest.Builder().build(0), socket) - assertEquals(Errors.ILLEGAL_SASL_STATE.code(), response.data.errorCode()) + assertEquals(Errors.ILLEGAL_SASL_STATE.code, response.data.errorCode) } finally { socket.close() } @@ -80,7 +80,7 @@ class SaslApiVersionsRequestTest extends AbstractApiVersionsRequestTest with Sas try { val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0) val apiVersionsResponse = sendUnsupportedApiVersionRequest(apiVersionsRequest) - assertEquals(Errors.UNSUPPORTED_VERSION.code(), apiVersionsResponse.data.errorCode()) + assertEquals(Errors.UNSUPPORTED_VERSION.code, apiVersionsResponse.data.errorCode) val apiVersionsResponse2 = sendAndReceive[ApiVersionsResponse]( new ApiVersionsRequest.Builder().build(0), socket) validateApiVersionsResponse(apiVersionsResponse2) diff --git a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala index 15bbcc6e2b906..bffbeec449ab1 100644 --- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala @@ -28,6 +28,7 @@ import kafka.network.RequestChannel.{EndThrottlingResponse, Response, StartThrot import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.metrics.MetricConfig +import org.apache.kafka.common.network.ClientInformation import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader} @@ -55,7 +56,7 @@ class ThrottledChannelExpirationTest { // read the header from the buffer first so that the body can be read next from the Request constructor val header = RequestHeader.parse(buffer) val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, - listenerName, SecurityProtocol.PLAINTEXT) + listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY) (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer, requestChannelMetrics)) } From 03bda9fb1eaa8a484d1297ce98322c455f073e59 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 12 Dec 2019 11:37:49 +0100 Subject: [PATCH 2/4] Rework based on Colin's PR. --- .../kafka/common/network/Authenticator.java | 10 --- .../kafka/common/network/ChannelBuilder.java | 3 +- .../network/ChannelMetadataRegistry.java | 55 +++++++++++++++ .../common/network/CipherInformation.java | 4 +- .../DefaultChannelMetadataRegistry.java | 50 ++++++++++++++ .../kafka/common/network/KafkaChannel.java | 49 ++----------- .../network/PlaintextChannelBuilder.java | 5 +- .../network/PlaintextTransportLayer.java | 6 -- .../common/network/SaslChannelBuilder.java | 24 ++++--- .../apache/kafka/common/network/Selector.java | 68 ++++++++++++++----- .../common/network/SslChannelBuilder.java | 17 +++-- .../common/network/SslTransportLayer.java | 30 +++----- .../kafka/common/network/TransportLayer.java | 7 -- .../SaslServerAuthenticator.java | 15 ++-- .../common/network/KafkaChannelTest.java | 8 ++- .../kafka/common/network/SelectorTest.java | 14 ++-- .../kafka/common/network/SslSelectorTest.java | 11 +-- .../common/network/SslTransportLayerTest.java | 5 +- .../authenticator/SaslAuthenticatorTest.java | 7 +- .../SaslServerAuthenticatorTest.java | 34 +++++++--- .../scala/kafka/network/SocketServer.scala | 4 +- 21 files changed, 268 insertions(+), 158 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/network/ChannelMetadataRegistry.java create mode 100644 clients/src/main/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java index 2f82adce303af..bcb011e830bfd 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.network; -import java.util.Optional; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.security.auth.KafkaPrincipal; @@ -53,15 +52,6 @@ default void handleAuthenticationFailure() throws IOException { */ KafkaPrincipal principal(); - /** - * Returns the ClientInformation extracted from the ApiVersionsRequest which may - * have been received by the Authenticator or an empty Option if the remote end - * did not provide it. - */ - default Optional clientInformation() { - return Optional.empty(); - } - /** * returns true if authentication is complete otherwise returns false; */ diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java index e3b2eeb58cce6..0cf1d74f85ce6 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java @@ -36,7 +36,8 @@ public interface ChannelBuilder extends AutoCloseable, Configurable { * @param memoryPool memory pool from which to allocate buffers, or null for none * @return KafkaChannel */ - KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException; + KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, + MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException; /** * Closes ChannelBuilder diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelMetadataRegistry.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelMetadataRegistry.java new file mode 100644 index 0000000000000..b49d6b64d0562 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelMetadataRegistry.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.io.Closeable; + +/** + * Metadata about a channel is provided in various places in the network stack. This + * registry is used as a common place to collect them. + */ +public interface ChannelMetadataRegistry extends Closeable { + + /** + * Register information about the SSL cipher we are using. + * If we already registered this information, this call will be ignored. + */ + void registerCipherInformation(CipherInformation cipherInformation); + + /** + * Get the currently registered cipher information. + */ + CipherInformation cipherInformation(); + + /** + * Register information about the client client we are using. + * Depending on the clients, the ApiVersionsRequest could be received + * multiple times or not at all. Re-registering the information will + * overwrite the previous one. + */ + void registerClientInformation(ClientInformation clientInformation); + + /** + * Get the currently registered client information. + */ + ClientInformation clientInformation(); + + /** + * Unregister everything that has been registered and close the registry. + */ + void close(); +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/CipherInformation.java b/clients/src/main/java/org/apache/kafka/common/network/CipherInformation.java index a40469870480e..d65aeb981c009 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/CipherInformation.java +++ b/clients/src/main/java/org/apache/kafka/common/network/CipherInformation.java @@ -23,8 +23,8 @@ public class CipherInformation { private final String protocol; public CipherInformation(String cipher, String protocol) { - this.cipher = cipher; - this.protocol = protocol; + this.cipher = cipher == null || cipher.isEmpty() ? "unknown" : cipher; + this.protocol = protocol == null || protocol.isEmpty() ? "unknown" : protocol; } public String cipher() { diff --git a/clients/src/main/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java b/clients/src/main/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java new file mode 100644 index 0000000000000..ae9e9a83a0c2c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +public class DefaultChannelMetadataRegistry implements ChannelMetadataRegistry { + private CipherInformation cipherInformation; + private ClientInformation clientInformation; + + @Override + public void registerCipherInformation(final CipherInformation cipherInformation) { + if (this.cipherInformation != null) { + this.cipherInformation = cipherInformation; + } + } + + @Override + public CipherInformation cipherInformation() { + return this.cipherInformation; + } + + @Override + public void registerClientInformation(final ClientInformation clientInformation) { + this.clientInformation = clientInformation; + } + + @Override + public ClientInformation clientInformation() { + return this.clientInformation; + } + + @Override + public void close() { + this.cipherInformation = null; + this.clientInformation = null; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 3c38a30fb0b09..59e08197fac99 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -29,7 +29,6 @@ import java.nio.channels.SocketChannel; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.function.Supplier; /** @@ -116,14 +115,13 @@ public enum ChannelMuteEvent { private final String id; private final TransportLayer transportLayer; private final Supplier authenticatorCreator; - private Selector selector; private Authenticator authenticator; - private ClientInformation clientInformation; // Tracks accumulated network thread time. This is updated on the network thread. // The values are read and reset after each response is sent. private long networkThreadTimeNanos; private final int maxReceiveSize; private final MemoryPool memoryPool; + private final ChannelMetadataRegistry metadataRegistry; private NetworkReceive receive; private Send send; // Track connection and mute state of channels to enable outstanding requests on channels to be @@ -136,7 +134,8 @@ public enum ChannelMuteEvent { private boolean midWrite; private long lastReauthenticationStartNanos; - public KafkaChannel(String id, TransportLayer transportLayer, Supplier authenticatorCreator, int maxReceiveSize, MemoryPool memoryPool) { + public KafkaChannel(String id, TransportLayer transportLayer, Supplier authenticatorCreator, + int maxReceiveSize, MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) { this.id = id; this.transportLayer = transportLayer; this.authenticatorCreator = authenticatorCreator; @@ -144,30 +143,15 @@ public KafkaChannel(String id, TransportLayer transportLayer, Supplier cipherInformation() { - return transportLayer.cipherInformation(); + public ChannelMetadataRegistry channelMetadataRegistry() { + return metadataRegistry; } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java index 3a9fd644972da..705e10f0cffa5 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java @@ -49,12 +49,13 @@ public void configure(Map configs) throws KafkaException { } @Override - public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { + public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, + MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException { try { PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key); Supplier authenticatorCreator = () -> new PlaintextAuthenticator(configs, transportLayer, listenerName); return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize, - memoryPool != null ? memoryPool : MemoryPool.NONE); + memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry); } catch (Exception e) { log.warn("Failed to create channel due to ", e); throw new KafkaException(e); diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java index 78393ee68900c..845b1474f4e31 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java @@ -27,7 +27,6 @@ import java.nio.channels.SelectionKey; import java.security.Principal; -import java.util.Optional; import org.apache.kafka.common.security.auth.KafkaPrincipal; @@ -215,9 +214,4 @@ public boolean hasBytesBuffered() { public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException { return fileChannel.transferTo(position, count, socketChannel); } - - @Override - public Optional cipherInformation() { - return Optional.empty(); - } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index a2c843108be10..51760022e322a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -182,11 +182,12 @@ public ListenerName listenerName() { } @Override - public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { + public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, + MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException { try { SocketChannel socketChannel = (SocketChannel) key.channel(); Socket socket = socketChannel.socket(); - TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel); + TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel, metadataRegistry); Supplier authenticatorCreator; if (mode == Mode.SERVER) { authenticatorCreator = () -> buildServerAuthenticator(configs, @@ -194,7 +195,8 @@ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize id, transportLayer, Collections.unmodifiableMap(subjects), - Collections.unmodifiableMap(connectionsMaxReauthMsByMechanism)); + Collections.unmodifiableMap(connectionsMaxReauthMsByMechanism), + metadataRegistry); } else { LoginManager loginManager = loginManagers.get(clientSaslMechanism); authenticatorCreator = () -> buildClientAuthenticator(configs, @@ -205,7 +207,8 @@ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize transportLayer, subjects.get(clientSaslMechanism)); } - return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE); + return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize, + memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry); } catch (Exception e) { log.info("Failed to create channel due to ", e); throw new KafkaException(e); @@ -222,10 +225,13 @@ public void close() { } // Visible to override for testing - protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException { + protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel, + ChannelMetadataRegistry metadataRegistry) throws IOException { if (this.securityProtocol == SecurityProtocol.SASL_SSL) { return SslTransportLayer.create(id, key, - sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort())); + sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), + socketChannel.socket().getPort()), + metadataRegistry); } else { return new PlaintextTransportLayer(key); } @@ -237,9 +243,11 @@ protected SaslServerAuthenticator buildServerAuthenticator(Map config String id, TransportLayer transportLayer, Map subjects, - Map connectionsMaxReauthMsByMechanism) { + Map connectionsMaxReauthMsByMechanism, + ChannelMetadataRegistry metadataRegistry) { return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, - kerberosShortNamer, listenerName, securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, time); + kerberosShortNamer, listenerName, securityProtocol, transportLayer, + connectionsMaxReauthMsByMechanism, metadataRegistry, time); } // Visible to override for testing diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 778999b7792a9..cc9ddd09889e0 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -52,7 +52,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -314,7 +313,6 @@ public void register(String id, SocketChannel socketChannel) throws IOException ensureNotRegistered(id); registerChannel(id, socketChannel, SelectionKey.OP_READ); this.sensors.connectionCreated.record(); - this.sensors.connectionsByClient.increment(channel(id).clientInformation()); } private void ensureNotRegistered(String id) { @@ -327,7 +325,6 @@ private void ensureNotRegistered(String id) { protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException { SelectionKey key = socketChannel.register(nioSelector, interestedOps); KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key); - channel.register(this); this.channels.put(id, channel); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), time.nanoseconds()); @@ -336,7 +333,8 @@ protected SelectionKey registerChannel(String id, SocketChannel socketChannel, i private KafkaChannel buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, SelectionKey key) throws IOException { try { - KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool); + KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool, + new SelectorChannelMetadataRegistry()); key.attach(channel); return channel; } catch (Exception e) { @@ -565,10 +563,6 @@ void pollSelectionKeys(Set selectionKeys, sensors.successfulAuthentication.record(1.0, readyTimeMs); if (!channel.connectedClientSupportsReauthentication()) sensors.successfulAuthenticationNoReauth.record(1.0, readyTimeMs); - Optional cipherInformation = channel.cipherInformation(); - if (cipherInformation.isPresent()) { - sensors.connectionsByCipher.increment(cipherInformation.get()); - } } log.debug("Successfully {}authenticated with {}", isReauthentication ? "re-" : "", channel.socketDescription()); @@ -928,11 +922,6 @@ private void doClose(KafkaChannel channel, boolean notifyDisconnect) { key.attach(null); } - Optional cipherInformation = channel.cipherInformation(); - if (cipherInformation.isPresent()) { - sensors.connectionsByCipher.decrement(cipherInformation.get()); - } - this.sensors.connectionsByClient.decrement(channel.clientInformation()); this.sensors.connectionClosed.record(); this.stagedReceives.remove(channel); this.explicitlyMutedChannels.remove(channel); @@ -1075,10 +1064,55 @@ public int numStagedReceives(KafkaChannel channel) { return deque == null ? 0 : deque.size(); } - // package private, called by KafkaChannel - void clientInformationUpdated(ClientInformation oldClientInformation, ClientInformation newClientInformation) { - this.sensors.connectionsByClient.decrement(oldClientInformation); - this.sensors.connectionsByClient.increment(newClientInformation); + class SelectorChannelMetadataRegistry implements ChannelMetadataRegistry { + private CipherInformation cipherInformation; + private ClientInformation clientInformation; + + SelectorChannelMetadataRegistry() { + // Default to empty client information as the ApiVersionsRequest is not + // mandatory or could not have the information yet if an older version + // is used. In this case, we still want to account for the connection. + registerClientInformation(ClientInformation.EMPTY); + } + + @Override + public void registerCipherInformation(final CipherInformation cipherInformation) { + if (this.cipherInformation == null) { + this.cipherInformation = cipherInformation; + sensors.connectionsByCipher.increment(cipherInformation); + } + } + + @Override + public CipherInformation cipherInformation() { + return cipherInformation; + } + + @Override + public void registerClientInformation(final ClientInformation clientInformation) { + if (this.clientInformation != null) { + sensors.connectionsByClient.decrement(this.clientInformation); + } + + this.clientInformation = clientInformation; + sensors.connectionsByClient.increment(clientInformation); + } + + @Override + public ClientInformation clientInformation() { + return clientInformation; + } + + @Override + public void close() { + if (this.cipherInformation != null) { + sensors.connectionsByCipher.decrement(this.cipherInformation); + this.cipherInformation = null; + } + + sensors.connectionsByClient.decrement(this.clientInformation); + this.clientInformation = null; + } } class SelectorMetrics implements AutoCloseable { diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index 9c6816124132e..ced8afe01225a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -93,12 +93,15 @@ public ListenerName listenerName() { } @Override - public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { + public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, + MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException { try { - SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key)); - Supplier authenticatorCreator = () -> new SslAuthenticator(configs, transportLayer, listenerName, sslPrincipalMapper); + SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, + peerHost(key), metadataRegistry); + Supplier authenticatorCreator = () -> + new SslAuthenticator(configs, transportLayer, listenerName, sslPrincipalMapper); return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize, - memoryPool != null ? memoryPool : MemoryPool.NONE); + memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry); } catch (Exception e) { log.info("Failed to create channel due to ", e); throw new KafkaException(e); @@ -108,9 +111,11 @@ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize @Override public void close() {} - protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, String host) throws IOException { + protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, + String host, ChannelMetadataRegistry metadataRegistry) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); - return SslTransportLayer.create(id, key, sslFactory.createSslEngine(host, socketChannel.socket().getPort())); + return SslTransportLayer.create(id, key, sslFactory.createSslEngine(host, socketChannel.socket().getPort()), + metadataRegistry); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 315b258114614..d8b0474c94339 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -25,7 +25,6 @@ import java.nio.channels.CancelledKeyException; import java.security.Principal; -import java.util.Optional; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; @@ -61,6 +60,7 @@ private enum State { private final SSLEngine sslEngine; private final SelectionKey key; private final SocketChannel socketChannel; + private final ChannelMetadataRegistry metadataRegistry; private final Logger log; private HandshakeStatus handshakeStatus; @@ -72,19 +72,21 @@ private enum State { private ByteBuffer appReadBuffer; private ByteBuffer fileChannelBuffer; private boolean hasBytesBuffered; - private CipherInformation cipherInformation; - public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { - return new SslTransportLayer(channelId, key, sslEngine); + public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine, + ChannelMetadataRegistry metadataRegistry) throws IOException { + return new SslTransportLayer(channelId, key, sslEngine, metadataRegistry); } // Prefer `create`, only use this in tests - SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) { + SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, + ChannelMetadataRegistry metadataRegistry) { this.channelId = channelId; this.key = key; this.socketChannel = (SocketChannel) key.channel(); this.sslEngine = sslEngine; this.state = State.NOT_INITALIZED; + this.metadataRegistry = metadataRegistry; final LogContext logContext = new LogContext(String.format("[SslTransportLayer channelId=%s key=%s] ", channelId, key)); this.log = logContext.logger(getClass()); @@ -428,17 +430,8 @@ private void handshakeFinished() throws IOException { SSLSession session = sslEngine.getSession(); log.debug("SSL handshake completed successfully with peerHost '{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'", session.getPeerHost(), session.getPeerPort(), peerPrincipal(), session.getCipherSuite()); - if (cipherInformation == null) { - String cipherSuiteName = session.getCipherSuite(); - if (cipherSuiteName == null || cipherSuiteName.isEmpty()) { - cipherSuiteName = "unknown"; - } - String protocolName = session.getProtocol(); - if (protocolName == null || protocolName.isEmpty()) { - protocolName = "unknown"; - } - cipherInformation = new CipherInformation(cipherSuiteName, protocolName); - } + metadataRegistry.registerCipherInformation( + new CipherInformation(session.getCipherSuite(), session.getProtocol())); } log.trace("SSLHandshake FINISHED channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} ", @@ -973,9 +966,4 @@ public long transferFrom(FileChannel fileChannel, long position, long count) thr throw e; } } - - @Override - public Optional cipherInformation() { - return Optional.ofNullable(cipherInformation); - } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java index 10e957e6d5700..b196c5be96c65 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java @@ -31,7 +31,6 @@ import java.nio.channels.GatheringByteChannel; import java.security.Principal; -import java.util.Optional; import org.apache.kafka.common.errors.AuthenticationException; @@ -114,10 +113,4 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan * @see FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel) */ long transferFrom(FileChannel fileChannel, long position, long count) throws IOException; - - /** - * Get information about the ciphers that this transport layer is using. If this transport layer is not - * using SSL, or the SSL handshake has not been completed, returns empty. - */ - Optional cipherInformation(); } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 1a3675ea0d120..240f777453b66 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.security.authenticator; -import java.util.Optional; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; @@ -30,6 +29,7 @@ import org.apache.kafka.common.message.SaslHandshakeResponseData; import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.ChannelBuilders; +import org.apache.kafka.common.network.ChannelMetadataRegistry; import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.NetworkReceive; @@ -128,6 +128,7 @@ private enum SaslState { private final Map connectionsMaxReauthMsByMechanism; private final Time time; private final ReauthInfo reauthInfo; + private final ChannelMetadataRegistry metadataRegistry; // Current SASL state private SaslState saslState = SaslState.INITIAL_REQUEST; @@ -145,9 +146,6 @@ private enum SaslState { // flag indicating if sasl tokens are sent as Kafka SaslAuthenticate request/responses private boolean enableKafkaSaslAuthenticateHeaders; - // Client information extracted from the ApiVersionsRequest if provided - private Optional clientInformation = Optional.empty(); - public SaslServerAuthenticator(Map configs, Map callbackHandlers, String connectionId, @@ -157,6 +155,7 @@ public SaslServerAuthenticator(Map configs, SecurityProtocol securityProtocol, TransportLayer transportLayer, Map connectionsMaxReauthMsByMechanism, + ChannelMetadataRegistry metadataRegistry, Time time) { this.callbackHandlers = callbackHandlers; this.connectionId = connectionId; @@ -168,6 +167,7 @@ public SaslServerAuthenticator(Map configs, this.connectionsMaxReauthMsByMechanism = connectionsMaxReauthMsByMechanism; this.time = time; this.reauthInfo = new ReauthInfo(); + this.metadataRegistry = metadataRegistry; this.configs = configs; @SuppressWarnings("unchecked") @@ -323,11 +323,6 @@ public KafkaPrincipal principal() { return principal; } - @Override - public Optional clientInformation() { - return clientInformation; - } - @Override public boolean complete() { return saslState == SaslState.COMPLETE; @@ -595,7 +590,7 @@ private void handleApiVersionsRequest(RequestContext context, ApiVersionsRequest else if (!apiVersionsRequest.isValid()) sendKafkaResponse(context, apiVersionsRequest.getErrorResponse(0, Errors.INVALID_REQUEST.exception())); else { - clientInformation = Optional.of(new ClientInformation(apiVersionsRequest.data.clientSoftwareName(), + metadataRegistry.registerClientInformation(new ClientInformation(apiVersionsRequest.data.clientSoftwareName(), apiVersionsRequest.data.clientSoftwareVersion())); sendKafkaResponse(context, apiVersionsResponse()); setSaslState(SaslState.HANDSHAKE_REQUEST); diff --git a/clients/src/test/java/org/apache/kafka/common/network/KafkaChannelTest.java b/clients/src/test/java/org/apache/kafka/common/network/KafkaChannelTest.java index d8950a0527eed..c2115b3f3526b 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/KafkaChannelTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/KafkaChannelTest.java @@ -38,8 +38,10 @@ public void testSending() throws IOException { Authenticator authenticator = Mockito.mock(Authenticator.class); TransportLayer transport = Mockito.mock(TransportLayer.class); MemoryPool pool = Mockito.mock(MemoryPool.class); + ChannelMetadataRegistry metadataRegistry = Mockito.mock(ChannelMetadataRegistry.class); - KafkaChannel channel = new KafkaChannel("0", transport, () -> authenticator, 1024, pool); + KafkaChannel channel = new KafkaChannel("0", transport, () -> authenticator, + 1024, pool, metadataRegistry); NetworkSend send = new NetworkSend("0", ByteBuffer.wrap(TestUtils.randomBytes(128))); channel.setSend(send); @@ -67,13 +69,15 @@ public void testReceiving() throws IOException { Authenticator authenticator = Mockito.mock(Authenticator.class); TransportLayer transport = Mockito.mock(TransportLayer.class); MemoryPool pool = Mockito.mock(MemoryPool.class); + ChannelMetadataRegistry metadataRegistry = Mockito.mock(ChannelMetadataRegistry.class); ArgumentCaptor sizeCaptor = ArgumentCaptor.forClass(Integer.class); Mockito.when(pool.tryAllocate(sizeCaptor.capture())).thenAnswer(invocation -> { return ByteBuffer.allocate(sizeCaptor.getValue()); }); - KafkaChannel channel = new KafkaChannel("0", transport, () -> authenticator, 1024, pool); + KafkaChannel channel = new KafkaChannel("0", transport, () -> authenticator, + 1024, pool, metadataRegistry); ArgumentCaptor bufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class); Mockito.when(transport.read(bufferCaptor.capture())).thenAnswer(invocation -> { diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index ab4e8de55da0d..f537a4ecd3135 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -371,7 +371,7 @@ public void registerFailure() throws Exception { ChannelBuilder channelBuilder = new PlaintextChannelBuilder(null) { @Override public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, - MemoryPool memoryPool) throws KafkaException { + MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException { throw new RuntimeException("Test exception"); } @Override @@ -682,7 +682,6 @@ public void testConnectDisconnectDuringInSinglePoll() throws Exception { when(kafkaChannel.finishConnect()).thenReturn(true); when(kafkaChannel.isConnected()).thenReturn(true); when(kafkaChannel.ready()).thenReturn(false); - when(kafkaChannel.clientInformation()).thenReturn(ClientInformation.EMPTY); doThrow(new IOException()).when(kafkaChannel).prepare(); SelectionKey selectionKey = mock(SelectionKey.class); @@ -764,10 +763,17 @@ public void testConnectionsByClientMetric() throws Exception { // Metric with unknown / unknown should be there selector.register(node, channel); - assertEquals(1, getMetric("connections", unknownNameAndVersion).metricValue()); + assertEquals(1, + getMetric("connections", unknownNameAndVersion).metricValue()); + assertEquals(ClientInformation.EMPTY, + selector.channel(node).channelMetadataRegistry().clientInformation()); // Metric with unknown / unknown should not be there, metric with A / B should be there - selector.channel(node).updateClientInformation(new ClientInformation("A", "B")); + ClientInformation clientInformation = new ClientInformation("A", "B"); + selector.channel(node).channelMetadataRegistry() + .registerClientInformation(clientInformation); + assertEquals(clientInformation, + selector.channel(node).channelMetadataRegistry().clientInformation()); assertEquals(0, getMetric("connections", unknownNameAndVersion).metricValue()); assertEquals(1, getMetric("connections", knownNameAndVersion).metricValue()); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 0172661fe7b5c..78174d9cac38c 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -129,6 +129,7 @@ public void testConnectionWithCustomKeyManager() throws Exception { TestUtils.waitForCondition(() -> cipherMetrics(metrics).size() == 1, "Waiting for cipher metrics to be created."); assertEquals(Integer.valueOf(1), cipherMetrics(metrics).get(0).metricValue()); + assertNotNull(selector.channel(node).channelMetadataRegistry().cipherInformation()); selector.close(node); super.verifySelectorEmpty(selector); @@ -368,10 +369,11 @@ public TestSslChannelBuilder(Mode mode) { } @Override - protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, String host) throws IOException { + protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, + String host, ChannelMetadataRegistry metadataRegistry) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); SSLEngine sslEngine = sslFactory.createSslEngine(host, socketChannel.socket().getPort()); - TestSslTransportLayer transportLayer = new TestSslTransportLayer(id, key, sslEngine); + TestSslTransportLayer transportLayer = new TestSslTransportLayer(id, key, sslEngine, metadataRegistry); return transportLayer; } @@ -383,8 +385,9 @@ static class TestSslTransportLayer extends SslTransportLayer { static Map transportLayers = new HashMap<>(); boolean muteSocket = false; - public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { - super(channelId, key, sslEngine); + public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, + ChannelMetadataRegistry metadataRegistry) throws IOException { + super(channelId, key, sslEngine, metadataRegistry); transportLayers.put(channelId, this); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index d25fa6124681c..653b85264438c 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -1114,7 +1114,8 @@ public void configureBufferSizes(Integer netReadBufSize, Integer netWriteBufSize } @Override - protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, String host) throws IOException { + protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, + String host, ChannelMetadataRegistry metadataRegistry) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); SSLEngine sslEngine = sslFactory.createSslEngine(host, socketChannel.socket().getPort()); TestSslTransportLayer transportLayer = newTransportLayer(id, key, sslEngine); @@ -1145,7 +1146,7 @@ class TestSslTransportLayer extends SslTransportLayer { private final AtomicInteger numDelayedFlushesRemaining; public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { - super(channelId, key, sslEngine); + super(channelId, key, sslEngine, new DefaultChannelMetadataRegistry()); this.netReadBufSize = new ResizeableBufferSize(netReadBufSizeOverride); this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSizeOverride); this.appBufSize = new ResizeableBufferSize(appBufSizeOverride); diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 6d95d2e91d857..0c4122c76925e 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -62,6 +62,7 @@ import org.apache.kafka.common.network.CertStores; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.ChannelBuilders; +import org.apache.kafka.common.network.ChannelMetadataRegistry; import org.apache.kafka.common.network.ChannelState; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Mode; @@ -1747,8 +1748,10 @@ protected SaslServerAuthenticator buildServerAuthenticator(Map config String id, TransportLayer transportLayer, Map subjects, - Map connectionsMaxReauthMsByMechanism) { - return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, null, listenerName, securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, time) { + Map connectionsMaxReauthMsByMechanism, + ChannelMetadataRegistry metadataRegistry) { + return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, null, listenerName, + securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, metadataRegistry, time) { @Override protected ApiVersionsResponse apiVersionsResponse() { diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index 66482eab206a2..fb4cdd996e475 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -19,7 +19,9 @@ import java.net.InetAddress; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.errors.IllegalSaslStateException; +import org.apache.kafka.common.network.ChannelMetadataRegistry; import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.DefaultChannelMetadataRegistry; import org.apache.kafka.common.network.InvalidReceiveException; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.TransportLayer; @@ -59,7 +61,8 @@ public void testOversizeRequest() throws IOException { TransportLayer transportLayer = mock(TransportLayer.class); Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); - SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName()); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, + SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry()); when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> { invocation.getArgument(0).putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1); @@ -74,7 +77,8 @@ public void testUnexpectedRequestType() throws IOException { TransportLayer transportLayer = mock(TransportLayer.class); Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); - SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName()); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, + SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry()); final RequestHeader header = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", 13243); final Struct headerStruct = header.toStruct(); @@ -104,7 +108,9 @@ public void testOldestApiVersionsRequest() throws IOException { TransportLayer transportLayer = mock(TransportLayer.class, Answers.RETURNS_DEEP_STUBS); Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); - SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName()); + ChannelMetadataRegistry metadataRegistry = new DefaultChannelMetadataRegistry(); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, + SCRAM_SHA_256.mechanismName(), metadataRegistry); final RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, version, "clientId", 0); final Struct headerStruct = header.toStruct(); @@ -125,8 +131,10 @@ public void testOldestApiVersionsRequest() throws IOException { authenticator.authenticate(); - assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, authenticator.clientInformation().get().softwareName()); - assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, authenticator.clientInformation().get().softwareVersion()); + assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, + metadataRegistry.clientInformation().softwareName()); + assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, + metadataRegistry.clientInformation().softwareVersion()); verify(transportLayer, times(2)).read(any(ByteBuffer.class)); } @@ -137,7 +145,9 @@ public void testLatestApiVersionsRequest() throws IOException { TransportLayer transportLayer = mock(TransportLayer.class, Answers.RETURNS_DEEP_STUBS); Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); - SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName()); + ChannelMetadataRegistry metadataRegistry = new DefaultChannelMetadataRegistry(); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, + SCRAM_SHA_256.mechanismName(), metadataRegistry); final RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, version, "clientId", 0); final Struct headerStruct = header.toStruct(); @@ -158,13 +168,16 @@ public void testLatestApiVersionsRequest() throws IOException { authenticator.authenticate(); - assertEquals("apache-kafka-java", authenticator.clientInformation().get().softwareName()); - assertEquals(AppInfoParser.getVersion(), authenticator.clientInformation().get().softwareVersion()); + assertEquals("apache-kafka-java", + metadataRegistry.clientInformation().softwareName()); + assertEquals(AppInfoParser.getVersion(), + metadataRegistry.clientInformation().softwareVersion()); verify(transportLayer, times(2)).read(any(ByteBuffer.class)); } - private SaslServerAuthenticator setupAuthenticator(Map configs, TransportLayer transportLayer, String mechanism) throws IOException { + private SaslServerAuthenticator setupAuthenticator(Map configs, TransportLayer transportLayer, + String mechanism, ChannelMetadataRegistry metadataRegistry) throws IOException { TestJaasConfig jaasConfig = new TestJaasConfig(); jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap()); Map jaasContexts = Collections.singletonMap(mechanism, @@ -173,7 +186,8 @@ private SaslServerAuthenticator setupAuthenticator(Map configs, Trans Map callbackHandlers = Collections.singletonMap( mechanism, new SaslServerCallbackHandler()); return new SaslServerAuthenticator(configs, callbackHandlers, "node", subjects, null, - new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, Collections.emptyMap(), Time.SYSTEM); + new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, Collections.emptyMap(), + metadataRegistry, Time.SYSTEM); } } diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b16fc691a96e0..ff754e329574e 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -917,13 +917,13 @@ private[kafka] class Processor(val id: Int, val connectionId = receive.source val context = new RequestContext(header, connectionId, channel.socketAddress, channel.principal, listenerName, securityProtocol, - channel.clientInformation) + channel.channelMetadataRegistry.clientInformation) val req = new RequestChannel.Request(processor = id, context = context, startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics) if (header.apiKey == ApiKeys.API_VERSIONS) { val apiVersionsRequest = req.body[ApiVersionsRequest] if (apiVersionsRequest.isValid) { - channel.updateClientInformation(new ClientInformation( + channel.channelMetadataRegistry.registerClientInformation(new ClientInformation( apiVersionsRequest.data.clientSoftwareName, apiVersionsRequest.data.clientSoftwareVersion)) } From 2570a692458b10f66c87bc354b46e5d9b50c6456 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 12 Dec 2019 14:03:26 +0100 Subject: [PATCH 3/4] fixup --- .../apache/kafka/common/network/Selector.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index cc9ddd09889e0..85668904b02dd 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -313,6 +313,9 @@ public void register(String id, SocketChannel socketChannel) throws IOException ensureNotRegistered(id); registerChannel(id, socketChannel, SelectionKey.OP_READ); this.sensors.connectionCreated.record(); + // Default to empty client information as the ApiVersionsRequest is not + // mandatory. In this case, we still want to account for the connection. + this.channel(id).channelMetadataRegistry().registerClientInformation(ClientInformation.EMPTY); } private void ensureNotRegistered(String id) { @@ -1068,13 +1071,6 @@ class SelectorChannelMetadataRegistry implements ChannelMetadataRegistry { private CipherInformation cipherInformation; private ClientInformation clientInformation; - SelectorChannelMetadataRegistry() { - // Default to empty client information as the ApiVersionsRequest is not - // mandatory or could not have the information yet if an older version - // is used. In this case, we still want to account for the connection. - registerClientInformation(ClientInformation.EMPTY); - } - @Override public void registerCipherInformation(final CipherInformation cipherInformation) { if (this.cipherInformation == null) { @@ -1110,8 +1106,10 @@ public void close() { this.cipherInformation = null; } - sensors.connectionsByClient.decrement(this.clientInformation); - this.clientInformation = null; + if (this.clientInformation != null) { + sensors.connectionsByClient.decrement(this.clientInformation); + this.clientInformation = null; + } } } From 1f4dd576d146b8eb3ca2d543f3f537c506c591b7 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 13 Dec 2019 12:07:52 +0100 Subject: [PATCH 4/4] Address Colin's comments --- .../network/ChannelMetadataRegistry.java | 2 +- .../apache/kafka/common/network/Selector.java | 16 +++++-- .../kafka/common/network/SelectorTest.java | 3 +- .../SaslServerAuthenticatorTest.java | 48 ++++--------------- .../scala/kafka/network/RequestChannel.scala | 3 +- .../scala/kafka/network/SocketServer.scala | 2 + 6 files changed, 28 insertions(+), 46 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelMetadataRegistry.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelMetadataRegistry.java index b49d6b64d0562..a3453d881c986 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelMetadataRegistry.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelMetadataRegistry.java @@ -26,7 +26,7 @@ public interface ChannelMetadataRegistry extends Closeable { /** * Register information about the SSL cipher we are using. - * If we already registered this information, this call will be ignored. + * Re-registering the information will overwrite the previous one. */ void registerCipherInformation(CipherInformation cipherInformation); diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 85668904b02dd..bd4f31ada5079 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -315,7 +315,9 @@ public void register(String id, SocketChannel socketChannel) throws IOException this.sensors.connectionCreated.record(); // Default to empty client information as the ApiVersionsRequest is not // mandatory. In this case, we still want to account for the connection. - this.channel(id).channelMetadataRegistry().registerClientInformation(ClientInformation.EMPTY); + ChannelMetadataRegistry metadataRegistry = this.channel(id).channelMetadataRegistry(); + if (metadataRegistry.clientInformation() == null) + metadataRegistry.registerClientInformation(ClientInformation.EMPTY); } private void ensureNotRegistered(String id) { @@ -1073,10 +1075,14 @@ class SelectorChannelMetadataRegistry implements ChannelMetadataRegistry { @Override public void registerCipherInformation(final CipherInformation cipherInformation) { - if (this.cipherInformation == null) { - this.cipherInformation = cipherInformation; - sensors.connectionsByCipher.increment(cipherInformation); + if (this.cipherInformation != null) { + if (this.cipherInformation.equals(cipherInformation)) + return; + sensors.connectionsByCipher.decrement(this.cipherInformation); } + + this.cipherInformation = cipherInformation; + sensors.connectionsByCipher.increment(cipherInformation); } @Override @@ -1087,6 +1093,8 @@ public CipherInformation cipherInformation() { @Override public void registerClientInformation(final ClientInformation clientInformation) { if (this.clientInformation != null) { + if (this.clientInformation.equals(clientInformation)) + return; sensors.connectionsByClient.decrement(this.clientInformation); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index f537a4ecd3135..70eb588299d06 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -749,7 +749,8 @@ public void testInboundConnectionsCountInConnectionCreationMetric() throws Excep @Test public void testConnectionsByClientMetric() throws Exception { String node = "0"; - Map unknownNameAndVersion = softwareNameAndVersionTags("unknown", "unknown"); + Map unknownNameAndVersion = softwareNameAndVersionTags( + ClientInformation.UNKNOWN_NAME_OR_VERSION, ClientInformation.UNKNOWN_NAME_OR_VERSION); Map knownNameAndVersion = softwareNameAndVersionTags("A", "B"); try (ServerSocketChannel ss = ServerSocketChannel.open()) { diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index fb4cdd996e475..2778d73e3e431 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -104,44 +104,18 @@ public void testUnexpectedRequestType() throws IOException { @Test public void testOldestApiVersionsRequest() throws IOException { - final short version = ApiKeys.API_VERSIONS.oldestVersion(); - TransportLayer transportLayer = mock(TransportLayer.class, Answers.RETURNS_DEEP_STUBS); - Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, - Collections.singletonList(SCRAM_SHA_256.mechanismName())); - ChannelMetadataRegistry metadataRegistry = new DefaultChannelMetadataRegistry(); - SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, - SCRAM_SHA_256.mechanismName(), metadataRegistry); - - final RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, version, "clientId", 0); - final Struct headerStruct = header.toStruct(); - - final ApiVersionsRequest request = new ApiVersionsRequest.Builder().build(version); - final Struct requestStruct = request.data.toStruct(version); - - when(transportLayer.socketChannel().socket().getInetAddress()).thenReturn(InetAddress.getLoopbackAddress()); - - when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> { - invocation.getArgument(0).putInt(headerStruct.sizeOf() + requestStruct.sizeOf()); - return 4; - }).then(invocation -> { - headerStruct.writeTo(invocation.getArgument(0)); - requestStruct.writeTo(invocation.getArgument(0)); - return headerStruct.sizeOf() + requestStruct.sizeOf(); - }); - - authenticator.authenticate(); - - assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, - metadataRegistry.clientInformation().softwareName()); - assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, - metadataRegistry.clientInformation().softwareVersion()); - - verify(transportLayer, times(2)).read(any(ByteBuffer.class)); + testApiVersionsRequest(ApiKeys.API_VERSIONS.oldestVersion(), + ClientInformation.UNKNOWN_NAME_OR_VERSION, ClientInformation.UNKNOWN_NAME_OR_VERSION); } @Test public void testLatestApiVersionsRequest() throws IOException { - final short version = ApiKeys.API_VERSIONS.latestVersion(); + testApiVersionsRequest(ApiKeys.API_VERSIONS.latestVersion(), + "apache-kafka-java", AppInfoParser.getVersion()); + } + + public void testApiVersionsRequest(short version, String expectedSoftwareName, + String expectedSoftwareVersion) throws IOException { TransportLayer transportLayer = mock(TransportLayer.class, Answers.RETURNS_DEEP_STUBS); Map configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); @@ -168,10 +142,8 @@ public void testLatestApiVersionsRequest() throws IOException { authenticator.authenticate(); - assertEquals("apache-kafka-java", - metadataRegistry.clientInformation().softwareName()); - assertEquals(AppInfoParser.getVersion(), - metadataRegistry.clientInformation().softwareVersion()); + assertEquals(expectedSoftwareName, metadataRegistry.clientInformation().softwareName()); + assertEquals(expectedSoftwareVersion, metadataRegistry.clientInformation().softwareVersion()); verify(transportLayer, times(2)).read(any(ByteBuffer.class)); } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 5a5314cfebe21..08db320b563f4 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -197,8 +197,7 @@ object RequestChannel extends Logging { .append(",securityProtocol:").append(context.securityProtocol) .append(",principal:").append(session.principal) .append(",listener:").append(context.listenerName.value) - .append(",clientSoftwareName:").append(context.clientInformation.softwareName) - .append(",clientSoftwareVersion:").append(context.clientInformation.softwareVersion) + .append(",clientInformation:").append(context.clientInformation) if (temporaryMemoryBytes > 0) builder.append(",temporaryMemoryBytes:").append(temporaryMemoryBytes) if (messageConversionsTimeMs > 0) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index ff754e329574e..95e4450b87259 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -920,6 +920,8 @@ private[kafka] class Processor(val id: Int, channel.channelMetadataRegistry.clientInformation) val req = new RequestChannel.Request(processor = id, context = context, startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics) + // KIP-511: ApiVersionsRequest is intercepted here to catch the client software name + // and version. It is done here to avoid wiring things up to the api layer. if (header.apiKey == ApiKeys.API_VERSIONS) { val apiVersionsRequest = req.body[ApiVersionsRequest] if (apiVersionsRequest.isValid) {