Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

package org.apache.kafka.common.config;

import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;

public class SSLConfigs {
/*
Expand All @@ -34,7 +34,7 @@ public class SSLConfigs {
public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.";

public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites";
public static final String SSL_CIPHER_SUITES_DOC = "A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol."
public static final String SSL_CIPHER_SUITES_DOC = "A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol."
+ "By default all the available cipher suites are supported.";

public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@
*/
package org.apache.kafka.common.security.ssl;

import java.util.Map;
import java.util.List;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SSLConfigs;

import javax.net.ssl.*;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.KeyStore;

import javax.net.ssl.*;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.SSLConfigs;
import java.util.List;
import java.util.Map;


public class SSLFactory implements Configurable {
Expand Down Expand Up @@ -60,12 +59,13 @@ public void configure(Map<String, ?> configs) throws KafkaException {

if (configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG) != null) {
List<String> cipherSuitesList = (List<String>) configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG);
this.cipherSuites = (String[]) cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
if (!cipherSuitesList.isEmpty())
this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
}

if (configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG) != null) {
List<String> enabledProtocolsList = (List<String>) configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
this.enabledProtocols = (String[]) enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
}

if (configs.containsKey(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)) {
Expand Down
20 changes: 9 additions & 11 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@ import kafka.consumer.ConsumerConfig
import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SSLConfigs
import org.apache.kafka.common.config.ConfigDef.Importance._
import org.apache.kafka.common.config.ConfigDef.Range._
import org.apache.kafka.common.config.ConfigDef.Type._

import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SSLConfigs}
import org.apache.kafka.common.metrics.MetricsReporter
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.security.auth.PrincipalBuilder
import scala.collection.{mutable, immutable, JavaConversions, Map}

import scala.collection.{Map, immutable}


object Defaults {
Expand Down Expand Up @@ -175,7 +171,7 @@ object Defaults {
val SSLClientAuthRequested = "requested"
val SSLClientAuthNone = "none"
val SSLClientAuth = SSLClientAuthNone

val SSLCipherSuites = ""
}

object KafkaConfig {
Expand Down Expand Up @@ -494,10 +490,10 @@ object KafkaConfig {
val SSLClientAuthDoc = SSLConfigs.SSL_CLIENT_AUTH_DOC

private val configDef = {
import ConfigDef.Importance._
import ConfigDef.Range._
import ConfigDef.ValidString._
import ConfigDef.Type._
import ConfigDef.Importance._
import ConfigDef.ValidString._

new ConfigDef()

Expand Down Expand Up @@ -646,7 +642,7 @@ object KafkaConfig {
.define(SSLKeyManagerAlgorithmProp, STRING, Defaults.SSLKeyManagerAlgorithm, MEDIUM, SSLKeyManagerAlgorithmDoc)
.define(SSLTrustManagerAlgorithmProp, STRING, Defaults.SSLTrustManagerAlgorithm, MEDIUM, SSLTrustManagerAlgorithmDoc)
.define(SSLClientAuthProp, STRING, Defaults.SSLClientAuth, in(Defaults.SSLClientAuthRequired, Defaults.SSLClientAuthRequested, Defaults.SSLClientAuthNone), MEDIUM, SSLClientAuthDoc)

.define(SSLCipherSuitesProp, LIST, Defaults.SSLCipherSuites, MEDIUM, SSLCipherSuitesDoc)
}

def configNames() = {
Expand Down Expand Up @@ -804,6 +800,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
val sslKeyManagerAlgorithm = getString(KafkaConfig.SSLKeyManagerAlgorithmProp)
val sslTrustManagerAlgorithm = getString(KafkaConfig.SSLTrustManagerAlgorithmProp)
val sslClientAuth = getString(KafkaConfig.SSLClientAuthProp)
val sslCipher = getList(KafkaConfig.SSLCipherSuitesProp)

/** ********* Quota Configuration **************/
val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
Expand Down Expand Up @@ -943,6 +940,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
channelConfigs.put(SSLKeyManagerAlgorithmProp, sslKeyManagerAlgorithm)
channelConfigs.put(SSLTrustManagerAlgorithmProp, sslTrustManagerAlgorithm)
channelConfigs.put(SSLClientAuthProp, sslClientAuth)
channelConfigs.put(SSLCipherSuitesProp, sslCipher)
channelConfigs
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Properties
import junit.framework.Assert._
import kafka.api.{ApiVersion, KAFKA_082}
import kafka.message._
import kafka.utils.{TestUtils, CoreUtils}
import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.{Assert, Test}
Expand Down Expand Up @@ -504,6 +504,7 @@ class KafkaConfigTest {
case KafkaConfig.SSLKeyManagerAlgorithmProp =>
case KafkaConfig.SSLTrustManagerAlgorithmProp =>
case KafkaConfig.SSLClientAuthProp => // ignore string
case KafkaConfig.SSLCipherSuitesProp => // ignore string

case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
}
Expand Down