Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public interface ChannelBuilder extends AutoCloseable, Configurable {
* @param memoryPool memory pool from which to allocate buffers, or null for none
* @return KafkaChannel
*/
KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException;
KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,
MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException;

/**
* Closes ChannelBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;

import java.io.Closeable;

/**
* Metadata about a channel is provided in various places in the network stack. This
* registry is used as a common place to collect them.
*/
public interface ChannelMetadataRegistry extends Closeable {

/**
* Register information about the SSL cipher we are using.
* Re-registering the information will overwrite the previous one.
*/
void registerCipherInformation(CipherInformation cipherInformation);

/**
* Get the currently registered cipher information.
*/
CipherInformation cipherInformation();

/**
* Register information about the client client we are using.
* Depending on the clients, the ApiVersionsRequest could be received
* multiple times or not at all. Re-registering the information will
* overwrite the previous one.
*/
void registerClientInformation(ClientInformation clientInformation);

/**
* Get the currently registered client information.
*/
ClientInformation clientInformation();

/**
* Unregister everything that has been registered and close the registry.
*/
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public class CipherInformation {
private final String protocol;

public CipherInformation(String cipher, String protocol) {
this.cipher = cipher;
this.protocol = protocol;
this.cipher = cipher == null || cipher.isEmpty() ? "unknown" : cipher;
this.protocol = protocol == null || protocol.isEmpty() ? "unknown" : protocol;
}

public String cipher() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.network;

import java.util.Objects;

public class ClientInformation {
public static final String UNKNOWN_NAME_OR_VERSION = "unknown";
public static final ClientInformation EMPTY = new ClientInformation(UNKNOWN_NAME_OR_VERSION, UNKNOWN_NAME_OR_VERSION);

private final String softwareName;
private final String softwareVersion;

public ClientInformation(String softwareName, String softwareVersion) {
this.softwareName = softwareName.isEmpty() ? UNKNOWN_NAME_OR_VERSION : softwareName;
this.softwareVersion = softwareVersion.isEmpty() ? UNKNOWN_NAME_OR_VERSION : softwareVersion;
}

public String softwareName() {
return this.softwareName;
}

public String softwareVersion() {
return this.softwareVersion;
}

@Override
public String toString() {
return "ClientInformation(softwareName=" + softwareName +
", softwareVersion=" + softwareVersion + ")";
}

@Override
public int hashCode() {
return Objects.hash(softwareName, softwareVersion);
}

@Override
public boolean equals(Object o) {
if (o == null) {
return false;
}
if (!(o instanceof ClientInformation)) {
return false;
}
ClientInformation other = (ClientInformation) o;
return other.softwareName.equals(softwareName) &&
other.softwareVersion.equals(softwareVersion);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;

public class DefaultChannelMetadataRegistry implements ChannelMetadataRegistry {
private CipherInformation cipherInformation;
private ClientInformation clientInformation;

@Override
public void registerCipherInformation(final CipherInformation cipherInformation) {
if (this.cipherInformation != null) {
this.cipherInformation = cipherInformation;
}
}

@Override
public CipherInformation cipherInformation() {
return this.cipherInformation;
}

@Override
public void registerClientInformation(final ClientInformation clientInformation) {
this.clientInformation = clientInformation;
}

@Override
public ClientInformation clientInformation() {
return this.clientInformation;
}

@Override
public void close() {
this.cipherInformation = null;
this.clientInformation = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -122,6 +121,7 @@ public enum ChannelMuteEvent {
private long networkThreadTimeNanos;
private final int maxReceiveSize;
private final MemoryPool memoryPool;
private final ChannelMetadataRegistry metadataRegistry;
private NetworkReceive receive;
private Send send;
// Track connection and mute state of channels to enable outstanding requests on channels to be
Expand All @@ -134,22 +134,24 @@ public enum ChannelMuteEvent {
private boolean midWrite;
private long lastReauthenticationStartNanos;

public KafkaChannel(String id, TransportLayer transportLayer, Supplier<Authenticator> authenticatorCreator, int maxReceiveSize, MemoryPool memoryPool) {
public KafkaChannel(String id, TransportLayer transportLayer, Supplier<Authenticator> authenticatorCreator,
int maxReceiveSize, MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) {
this.id = id;
this.transportLayer = transportLayer;
this.authenticatorCreator = authenticatorCreator;
this.authenticator = authenticatorCreator.get();
this.networkThreadTimeNanos = 0L;
this.maxReceiveSize = maxReceiveSize;
this.memoryPool = memoryPool;
this.metadataRegistry = metadataRegistry;
this.disconnected = false;
this.muteState = ChannelMuteState.NOT_MUTED;
this.state = ChannelState.NOT_CONNECTED;
}

public void close() throws IOException {
this.disconnected = true;
Utils.closeAll(transportLayer, authenticator, receive);
Utils.closeAll(transportLayer, authenticator, receive, metadataRegistry);
}

/**
Expand Down Expand Up @@ -653,7 +655,7 @@ private void swapAuthenticatorsAndBeginReauthentication(ReauthenticationContext
authenticator.reauthenticate(reauthenticationContext);
}

public Optional<CipherInformation> cipherInformation() {
return transportLayer.cipherInformation();
public ChannelMetadataRegistry channelMetadataRegistry() {
return metadataRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ public void configure(Map<String, ?> configs) throws KafkaException {
}

@Override
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,
MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException {
try {
PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
Supplier<Authenticator> authenticatorCreator = () -> new PlaintextAuthenticator(configs, transportLayer, listenerName);
return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize,
memoryPool != null ? memoryPool : MemoryPool.NONE);
memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry);
} catch (Exception e) {
log.warn("Failed to create channel due to ", e);
throw new KafkaException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.nio.channels.SelectionKey;

import java.security.Principal;
import java.util.Optional;

import org.apache.kafka.common.security.auth.KafkaPrincipal;

Expand Down Expand Up @@ -215,9 +214,4 @@ public boolean hasBytesBuffered() {
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);
}

@Override
public Optional<CipherInformation> cipherInformation() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,21 @@ public ListenerName listenerName() {
}

@Override
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,
MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException {
try {
SocketChannel socketChannel = (SocketChannel) key.channel();
Socket socket = socketChannel.socket();
TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel, metadataRegistry);
Supplier<Authenticator> authenticatorCreator;
if (mode == Mode.SERVER) {
authenticatorCreator = () -> buildServerAuthenticator(configs,
Collections.unmodifiableMap(saslCallbackHandlers),
id,
transportLayer,
Collections.unmodifiableMap(subjects),
Collections.unmodifiableMap(connectionsMaxReauthMsByMechanism));
Collections.unmodifiableMap(connectionsMaxReauthMsByMechanism),
metadataRegistry);
} else {
LoginManager loginManager = loginManagers.get(clientSaslMechanism);
authenticatorCreator = () -> buildClientAuthenticator(configs,
Expand All @@ -205,7 +207,8 @@ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize
transportLayer,
subjects.get(clientSaslMechanism));
}
return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize,
memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry);
} catch (Exception e) {
log.info("Failed to create channel due to ", e);
throw new KafkaException(e);
Expand All @@ -222,10 +225,13 @@ public void close() {
}

// Visible to override for testing
protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel,
ChannelMetadataRegistry metadataRegistry) throws IOException {
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
return SslTransportLayer.create(id, key,
sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()));
sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
socketChannel.socket().getPort()),
metadataRegistry);
} else {
return new PlaintextTransportLayer(key);
}
Expand All @@ -237,9 +243,11 @@ protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> config
String id,
TransportLayer transportLayer,
Map<String, Subject> subjects,
Map<String, Long> connectionsMaxReauthMsByMechanism) {
Map<String, Long> connectionsMaxReauthMsByMechanism,
ChannelMetadataRegistry metadataRegistry) {
return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects,
kerberosShortNamer, listenerName, securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, time);
kerberosShortNamer, listenerName, securityProtocol, transportLayer,
connectionsMaxReauthMsByMechanism, metadataRegistry, time);
}

// Visible to override for testing
Expand Down
Loading