Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ public void configure(Map<String, ?> configs) throws KafkaException {
hasKerberos = clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM);
}

String defaultRealm;
try {
defaultRealm = JaasUtils.defaultRealm();
} catch (Exception ke) {
defaultRealm = "";
}

if (hasKerberos) {
String defaultRealm;
try {
defaultRealm = JaasUtils.defaultKerberosRealm();
} catch (Exception ke) {
defaultRealm = "";
}
@SuppressWarnings("unchecked")
List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES);
if (principalToLocalRules != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static String jaasConfig(String loginContextName, String key) throws IOEx
return null;
}

public static String defaultRealm()
public static String defaultKerberosRealm()
throws ClassNotFoundException, NoSuchMethodException,
IllegalArgumentException, IllegalAccessException,
InvocationTargetException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.network;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.test.TestSslUtils;

public class CertStores {

private final Map<String, Object> sslConfig;

public CertStores(boolean server, String host) throws Exception {
String name = server ? "server" : "client";
Mode mode = server ? Mode.SERVER : Mode.CLIENT;
File truststoreFile = File.createTempFile(name + "TS", ".jks");
sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name, host);
if (server)
sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
}

public Map<String, Object> getTrustingConfig(CertStores truststoreConfig) {
Map<String, Object> config = new HashMap<>(sslConfig);
config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
return config;
}

public Map<String, Object> getUntrustingConfig() {
return sslConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.network;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;

/**
* Common utility functions used by transport layer and authenticator tests.
*/
public class NetworkTestUtils {

public static NioEchoServer createEchoServer(SecurityProtocol securityProtocol, Map<String, Object> serverConfigs) throws Exception {
NioEchoServer server = new NioEchoServer(securityProtocol, serverConfigs, "localhost");
server.start();
return server;
}

public static Selector createSelector(ChannelBuilder channelBuilder) {
return new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
}

public static void checkClientConnection(Selector selector, String node, int minMessageSize, int messageCount) throws Exception {

String prefix = TestUtils.randomString(minMessageSize);
int requests = 0;
int responses = 0;
// wait for handshake to finish
while (!selector.isChannelReady(node)) {
selector.poll(1000L);
}
selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-0").getBytes())));
requests++;
while (responses < messageCount) {
selector.poll(0L);
assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());

for (NetworkReceive receive : selector.completedReceives()) {
assertEquals(prefix + "-" + responses, new String(Utils.toArray(receive.payload())));
responses++;
}

for (int i = 0; i < selector.completedSends().size() && requests < messageCount && selector.isChannelReady(node); i++, requests++) {
selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-" + requests).getBytes())));
}
}
}

public static void waitForChannelClose(Selector selector, String node) throws IOException {
boolean closed = false;
for (int i = 0; i < 30; i++) {
selector.poll(1000L);
if (selector.channel(node) == null) {
closed = true;
break;
}
}
assertTrue(closed);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;

/**
* Non-blocking EchoServer implementation that uses ChannelBuilder to create channels
* with the configured security protocol.
*
*/
public class NioEchoServer extends Thread {
private final int port;
private final ServerSocketChannel serverSocketChannel;
private final List<SocketChannel> newChannels;
private final List<SocketChannel> socketChannels;
private final AcceptorThread acceptorThread;
private final Selector selector;
private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>();

public NioEchoServer(SecurityProtocol securityProtocol, Map<String, ?> configs, String serverHost) throws Exception {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
this.port = serverSocketChannel.socket().getLocalPort();
this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
ChannelBuilder channelBuilder = ChannelBuilders.create(securityProtocol, Mode.SERVER, LoginType.SERVER, configs, null, true);
this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
setName("echoserver");
setDaemon(true);
acceptorThread = new AcceptorThread();
}

public int port() {
return port;
}

@Override
public void run() {
try {
acceptorThread.start();
while (serverSocketChannel.isOpen()) {
selector.poll(1000);
for (SocketChannel socketChannel : newChannels) {
String id = id(socketChannel);
selector.register(id, socketChannel);
socketChannels.add(socketChannel);
}
newChannels.clear();
while (true) {
NetworkSend send = inflightSends.peek();
if (send != null && !selector.channel(send.destination()).hasSend()) {
send = inflightSends.poll();
selector.send(send);
} else
break;
}
List<NetworkReceive> completedReceives = selector.completedReceives();
for (NetworkReceive rcv : completedReceives) {
NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
if (!selector.channel(send.destination()).hasSend())
selector.send(send);
else
inflightSends.add(send);
}
}
} catch (IOException e) {
// ignore
}
}

private String id(SocketChannel channel) {
return channel.socket().getLocalAddress().getHostAddress() + ":" + channel.socket().getLocalPort() + "-" +
channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort();
}

public void closeConnections() throws IOException {
for (SocketChannel channel : socketChannels)
channel.close();
socketChannels.clear();
}

public void close() throws IOException, InterruptedException {
this.serverSocketChannel.close();
closeConnections();
acceptorThread.interrupt();
acceptorThread.join();
interrupt();
join();
}

private class AcceptorThread extends Thread {
public AcceptorThread() throws IOException {
setName("acceptor");
}
public void run() {
try {
java.nio.channels.Selector acceptSelector = java.nio.channels.Selector.open();
serverSocketChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
while (serverSocketChannel.isOpen()) {
if (acceptSelector.select(1000) > 0) {
Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isAcceptable()) {
SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
socketChannel.configureBlocking(false);
newChannels.add(socketChannel);
selector.wakeup();
}
it.remove();
}
}
}
} catch (IOException e) {
// ignore
}
}
}
}
Loading