Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
82737e5
KAFKA-1686. Implement SASL/Kerberos.
Aug 31, 2015
a3417d7
KAFKA-1686. Implement SASL/Kerberos.
Sep 3, 2015
8f718ce
KAFKA-1686. Implement SASL/Kerberos.
Sep 3, 2015
aa92895
Added licesense.
Sep 3, 2015
f178107
KAFKA-1686. Implement SASL/Kerberos.
Sep 3, 2015
71b6fdb
Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1…
Oct 3, 2015
9d260c6
KAFKA-1686. Fixes after the merge.
Oct 4, 2015
5723dd2
KAFKA-1686. Addressing comments.
Oct 9, 2015
8cf30d0
Merge remote-tracking branch 'apache/trunk' into KAFKA-1686-V1
ijuma Oct 9, 2015
2596c4a
Remove unused code, fix formatting and minor javadoc tweaks
ijuma Oct 9, 2015
2919bc3
Fix bad merge in `TestUtils`
ijuma Oct 9, 2015
9ed1a26
Remove -XX:-MaxFDLimit from `gradle.properties`
ijuma Oct 9, 2015
2d2fcec
Support `SSLSASL` in `ChannelBuilders`, reduce duplication in `TestUt…
ijuma Oct 9, 2015
6a13667
Merge pull request #1 from ijuma/KAFKA-1686-V1
harshach Oct 9, 2015
32ab6f4
KAFKA-1686. Added SaslConsumerTest, fixed a bug in SecurityProtocol.
Oct 9, 2015
58064b4
KAFKA-1686. removing unnecessary logs.
Oct 9, 2015
dc05e07
Merge remote-tracking branch 'apache/trunk' into KAFKA-1686-V1
ijuma Oct 11, 2015
9e6ba51
A number of code clean-ups
ijuma Oct 12, 2015
fc40c98
Return non-anonymous `KafkaPrincipal` in `SaslClientAuthenticator.pri…
ijuma Oct 12, 2015
e80cad9
Merge pull request #2 from ijuma/KAFKA-1686-V1
harshach Oct 13, 2015
1d53bce
KAFKA-1686. Added default sasl configs , addressed reviews.
Oct 13, 2015
e637120
Simplify `Shell` by removing unused functionality and other clean-ups.
ijuma Oct 13, 2015
a3bd8d2
Config clean-ups
ijuma Oct 13, 2015
6dea484
Tweak logging and make fields final in `Login`
ijuma Oct 13, 2015
d5768c8
Minor simplification of `SaslClientAuthenticator.complete` as per Jun…
ijuma Oct 13, 2015
37980d7
Tweak assignment of `Login.lastLogin`
ijuma Oct 13, 2015
ae430be
Remove `currentWallTime()` and `currentElapsedTime()` from `Time` for…
ijuma Oct 13, 2015
190fe86
Rename SSLSASL to SASL_SSL and PLAINTEXTSASL to SASL_PLAIN
ijuma Oct 13, 2015
06353e4
Merge remote-tracking branch 'apache/trunk' into KAFKA-1686-V1
ijuma Oct 13, 2015
ba29a43
Call `removeInterestOps` when we complete.
ijuma Oct 13, 2015
592c52c
Remove methods from copycat.util.MockTime
ijuma Oct 13, 2015
2fc9708
Rename SASL_PLAIN to SASL_PLAINTEXT
ijuma Oct 13, 2015
da60654
Various improvements and fixes in SaslServerAuthenticator
ijuma Oct 14, 2015
70d34b3
Merge pull request #3 from ijuma/KAFKA-1686-V1
harshach Oct 14, 2015
03f6c08
Avoid string concatenation in `Login` logging statements
ijuma Oct 15, 2015
cd26542
Fix `LoginManager` to support multiple `Mode`s simultaneously and fix…
ijuma Oct 15, 2015
1885fdb
Clean-up logging in `SaslServerCallbackHandler`
ijuma Oct 15, 2015
3f26fd3
Remove `Configuration.setConfiguration` call
ijuma Oct 15, 2015
f12eefa
Remove interestOps after data is sent in `Sasl*Authenticator`
ijuma Oct 15, 2015
b889f5d
Clean-up logging in `SaslClientAuthenticator` and minor code clean-up
ijuma Oct 15, 2015
6015ab6
Merge pull request #4 from ijuma/KAFKA-1686-V1
harshach Oct 16, 2015
b121710
KAFKA-1686. Renamed SaslConsumerTest to SaslProducerTest.
Oct 16, 2015
538b6eb
KAFKA-1686. added verify to SaslIntegrationTest.
Oct 16, 2015
80949b4
Introduce `LoginType` to use the right JAAS section in broker
ijuma Oct 16, 2015
7ce068c
Remove unused `KerberosName.getDefaultRealm()` and add TODO to `JaasU…
ijuma Oct 16, 2015
0a1678e
Refactor `KerberosName` in order to make authToLocal rules configurable
ijuma Oct 16, 2015
a7cb82c
Tweak `KafkaChannel.prepare` to attempt authentication after handshak…
ijuma Oct 16, 2015
e94fe3e
Fix issue where `SaslChannelBuilder` was not calling `SSLTransportLay…
ijuma Oct 16, 2015
1152125
Introduce inter-broker SASL tests
ijuma Oct 16, 2015
86559f2
Merge pull request #5 from ijuma/KAFKA-1686-V1
harshach Oct 16, 2015
73614aa
Merge remote-tracking branch 'apache/trunk' into KAFKA-1686-V1
ijuma Oct 18, 2015
cffc5e8
Refactor `ProducerSendTest` to support multiple security protocols an…
ijuma Oct 16, 2015
699da1a
Refactor `ConsumerTest` to make it easy to test various security prot…
ijuma Oct 19, 2015
47fee85
Fix issue where interestOps was not being turned off when it should
ijuma Oct 19, 2015
a2a7b88
Document `authenticate` in `SaslClientAuthenticator` and `SaslServerA…
ijuma Oct 19, 2015
cdb8b86
Make it possible to configure serviceName via KafkaConfig
ijuma Oct 19, 2015
9f911dd
Merge remote-tracking branch 'apache/trunk' into KAFKA-1686-V1
ijuma Oct 19, 2015
15cf778
Rename `SSLConsumerTest` to `SslConsumerTest`
ijuma Oct 19, 2015
b6f408a
Document why we exclude `api-ldap-schema-data`
ijuma Oct 19, 2015
356f2f7
Merge remote-tracking branch 'apache/trunk' into KAFKA-1686-V1
ijuma Oct 20, 2015
b74a5d2
Force reload of JAAS configuration file if we can't find our entry
ijuma Oct 20, 2015
9464c7a
Remove `Sasl*ProducerTest` to keep build times down
ijuma Oct 20, 2015
5f3009f
Merge pull request #6 from ijuma/KAFKA-1686-V1
harshach Oct 21, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ project(':core') {
testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
testCompile project(':clients')
testCompile project(':clients').sourceSets.test.output
testCompile 'org.apache.hadoop:hadoop-minikdc:2.7.1'
testRuntime "$slf4jlog4j"

zinc 'com.typesafe.zinc:zinc:0.3.7'
}

Expand All @@ -282,6 +282,9 @@ project(':core') {
compile.exclude module: 'jmxtools'
compile.exclude module: 'mail'
compile.exclude module: 'netty'
// To prevent a UniqueResourceException due the same resource existing in both
// org.apache.directory.api/api-all and org.apache.directory.api/api-ldap-schema-data
testCompile.exclude module: 'api-ldap-schema-data'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment on why we need to exclude this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment explaining this in my latest PR.

}

tasks.create(name: "copyDependantLibs", type: Copy) {
Expand Down
6 changes: 4 additions & 2 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@
<allow pkg="org.junit" />
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />

<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security.auth" />
<allow pkg="javax.security" />
<allow pkg="org.ietf.jgss" />

<!-- no one depends on the server -->
<disallow pkg="kafka" />

<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.utils" />

<subpackage name="common">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.LoginType;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.security.ssl.SSLFactory;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,9 +75,9 @@ public static void closeQuietly(Closeable c, String name, AtomicReference<Throwa
*/
public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
SecurityProtocol securityProtocol = SecurityProtocol.valueOf((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
if (securityProtocol != SecurityProtocol.SSL && securityProtocol != SecurityProtocol.PLAINTEXT)
throw new ConfigException("Invalid SecurityProtocol " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
return ChannelBuilders.create(securityProtocol, SSLFactory.Mode.CLIENT, configs);
if (securityProtocol == SecurityProtocol.TRACE)
throw new ConfigException("Invalid SecurityProtocol " + securityProtocol);
return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.config.SSLConfigs;
import org.apache.kafka.common.config.SaslConfigs;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -302,6 +303,13 @@ public class ConsumerConfig extends AbstractConfig {
.define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
.define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
.define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
.define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
.define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
.define(SaslConfigs.AUTH_TO_LOCAL, Type.LIST, SaslConfigs.DEFAULT_AUTH_TO_LOCAL, Importance.MEDIUM, SaslConfigs.AUTH_TO_LOCAL_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
40 * 1000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.SSLConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
Expand Down Expand Up @@ -279,6 +280,13 @@ public class ProducerConfig extends AbstractConfig {
.define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
.define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
.define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
.define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
.define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
.define(SaslConfigs.AUTH_TO_LOCAL, Type.LIST, SaslConfigs.DEFAULT_AUTH_TO_LOCAL, Importance.MEDIUM, SaslConfigs.AUTH_TO_LOCAL_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package org.apache.kafka.common.config;

import java.util.Collections;
import java.util.List;

public class SaslConfigs {
/*
* NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
*/

public static final String SASL_KAFKA_SERVER_REALM = "sasl.kafka.server.realm";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harshach, this is not actually used at the moment. Can you please point me to where it should be used and I can quickly address it?

public static final String SASL_KAFKA_SERVER_DOC = "The sasl kafka server realm. "
+ "Default will be from kafka jaas config";

public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
public static final String SASL_KERBEROS_SERVICE_NAME_DOC = "The Kerberos principal name that Kafka runs as. "
+ "This can be defined either in the JAAS config or in the Kakfa config.";

public static final String SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";
public static final String SASL_KERBEROS_KINIT_CMD_DOC = "Kerberos kinit command path. "
+ "Default will be /usr/bin/kinit";
public static final String DEFAULT_KERBEROS_KINIT_CMD = "/usr/bin/kinit";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we specify those through Kafka config file or just the jaas config file? It seems that the latter is more natural since it consolidates all SASL related stuff in one file?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jaas config special file in that it needs a different syntax like sections that we define. So it should only need to have login details like keytab files not kafka specific configs.


public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR = "sasl.kerberos.ticket.renew.window.factor";
public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC = "LoginThread will sleep until specified window factor of time from last refresh"
+ " to ticket's expiry has been reached, at which time it will wake and try to renew the ticket.";
public static final double DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR = 0.80;

public static final String SASL_KERBEROS_TICKET_RENEW_JITTER = "sasl.kerberos.ticket.renew.jitter";
public static final String SASL_KERBEROS_TICKET_RENEW_JITTER_DOC = "Percentage of random jitter added to the renewal time";
public static final double DEFAULT_KERBEROS_TICKET_RENEW_JITTER = 0.05;

public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN = "sasl.kerberos.min.time.before.relogin";
public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC = "LoginThread sleep time between refresh attempts";
public static final long DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN = 1 * 60 * 1000L;

public static final String AUTH_TO_LOCAL = "kafka.security.auth.to.local";
public static final String AUTH_TO_LOCAL_DOC = "Rules for the mapping between principal names and operating system user names";
public static final List<String> DEFAULT_AUTH_TO_LOCAL = Collections.singletonList("DEFAULT");

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
*/

import java.io.IOException;
import java.util.Map;
import java.security.Principal;

import org.apache.kafka.common.security.auth.PrincipalBuilder;
Expand All @@ -30,11 +31,13 @@
public interface Authenticator {

/**
* configures Authenticator using principalbuilder and transportLayer.
* @param TransportLayer transportLayer
* @param PrincipalBuilder principalBuilder
* Configures Authenticator using the provided parameters.
*
* @param transportLayer The transport layer used to read or write tokens
* @param principalBuilder The builder used to construct `Principal`
* @param configs Additional configuration parameters as key/value pairs
*/
void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder);
void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs);

/**
* Implements any authentication mechanism. Use transportLayer to read or write tokens.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public interface ChannelBuilder {
* returns a Channel with TransportLayer and Authenticator configured.
* @param id channel id
* @param key SelectionKey
* @param maxReceiveSize
* @return KafkaChannel
*/
KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.apache.kafka.common.network;

import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.ssl.SSLFactory;

import java.util.Map;

Expand All @@ -24,20 +23,28 @@ private ChannelBuilders() { }

/**
* @param securityProtocol the securityProtocol
* @param mode the SSL mode, it must be non-null if `securityProcol` is `SSL` and it is ignored otherwise
* @param mode the mode, it must be non-null if `securityProtocol` is not `PLAINTEXT`;
* it is ignored otherwise
* @param loginType the loginType, it must be non-null if `securityProtocol` is SASL_*; it is ignored otherwise
* @param configs client/server configs
* @return the configured `ChannelBuilder`
* @throws IllegalArgumentException if `mode` invariants described above is not maintained
*/
public static ChannelBuilder create(SecurityProtocol securityProtocol, SSLFactory.Mode mode, Map<String, ?> configs) {
ChannelBuilder channelBuilder = null;
public static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, LoginType loginType, Map<String, ?> configs) {
ChannelBuilder channelBuilder;

switch (securityProtocol) {
case SSL:
if (mode == null)
throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol` is `SSL`");
requireNonNullMode(mode, securityProtocol);
channelBuilder = new SSLChannelBuilder(mode);
break;
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
if (loginType == null)
throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol);
break;
case PLAINTEXT:
case TRACE:
channelBuilder = new PlaintextChannelBuilder();
Expand All @@ -49,4 +56,10 @@ public static ChannelBuilder create(SecurityProtocol securityProtocol, SSLFactor
channelBuilder.configure(configs);
return channelBuilder;
}

private static void requireNonNullMode(Mode mode, SecurityProtocol securityProtocol) {
if (mode == null)
throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.security.Principal;
import java.io.IOException;
import java.util.Map;

import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.common.KafkaException;
Expand All @@ -29,7 +30,7 @@ public class DefaultAuthenticator implements Authenticator {
private PrincipalBuilder principalBuilder;
private Principal principal;

public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder) {
public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) {
this.transportLayer = transportLayer;
this.principalBuilder = principalBuilder;
}
Expand All @@ -54,7 +55,7 @@ public void close() throws IOException {}

/**
* DefaultAuthenticator doesn't implement any additional authentication mechanism.
* @returns true
* @return true
*/
public boolean complete() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,9 @@ public Principal principal() throws IOException {
}

/**
* Does handshake of transportLayer and Authentication using configured authenticator
* Does handshake of transportLayer and authentication using configured authenticator
*/
public void prepare() throws IOException {
if (transportLayer.ready() && authenticator.complete())
return;
if (!transportLayer.ready())
transportLayer.handshake();
if (transportLayer.ready() && !authenticator.complete())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;

import org.apache.kafka.common.security.JaasUtils;

/**
* The type of the login context, it should be SERVER for the broker and CLIENT for the clients (i.e. consumer and
* producer). It provides the the login context name which defines the section of the JAAS configuration file to be used
* for login.
*/
public enum LoginType {
CLIENT(JaasUtils.LOGIN_CONTEXT_CLIENT),
SERVER(JaasUtils.LOGIN_CONTEXT_SERVER);

private final String contextName;

LoginType(String contextName) {
this.contextName = contextName;
}

public String contextName() {
return contextName;
}
}
19 changes: 19 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/network/Mode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;

public enum Mode { CLIENT, SERVER };
Loading