From 1ad5d1b631651d51328bd939c3856843536002c6 Mon Sep 17 00:00:00 2001 From: benstopford Date: Tue, 13 Oct 2015 10:13:54 +0100 Subject: [PATCH] KAFKA-2637: Cipher suite setting should be configurable for SSL --- .../kafka/common/config/SSLConfigs.java | 4 ++-- .../kafka/common/security/ssl/SSLFactory.java | 20 +++++++++---------- .../main/scala/kafka/server/KafkaConfig.scala | 20 +++++++++---------- .../unit/kafka/server/KafkaConfigTest.scala | 3 ++- 4 files changed, 23 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java index dd7b71a46c568..77bb5836d89a6 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java @@ -13,8 +13,8 @@ package org.apache.kafka.common.config; -import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; public class SSLConfigs { /* @@ -34,7 +34,7 @@ public class SSLConfigs { public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM."; public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites"; - public static final String SSL_CIPHER_SUITES_DOC = "A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol." + public static final String SSL_CIPHER_SUITES_DOC = "A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol." + "By default all the available cipher suites are supported."; public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols"; diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java index f79b65cdfd014..b3e0895c1f82c 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java @@ -16,18 +16,17 @@ */ package org.apache.kafka.common.security.ssl; -import java.util.Map; -import java.util.List; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SSLConfigs; + +import javax.net.ssl.*; import java.io.FileInputStream; import java.io.IOException; import java.security.GeneralSecurityException; import java.security.KeyStore; - -import javax.net.ssl.*; - -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.Configurable; -import org.apache.kafka.common.config.SSLConfigs; +import java.util.List; +import java.util.Map; public class SSLFactory implements Configurable { @@ -60,12 +59,13 @@ public void configure(Map configs) throws KafkaException { if (configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG) != null) { List cipherSuitesList = (List) configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG); - this.cipherSuites = (String[]) cipherSuitesList.toArray(new String[cipherSuitesList.size()]); + if (!cipherSuitesList.isEmpty()) + this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]); } if (configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG) != null) { List enabledProtocolsList = (List) configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG); - this.enabledProtocols = (String[]) enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]); + this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]); } if (configs.containsKey(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)) { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1e8b2331486ff..627849cae5711 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -26,16 +26,12 @@ import kafka.consumer.ConsumerConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.CoreUtils import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.common.config.SSLConfigs -import org.apache.kafka.common.config.ConfigDef.Importance._ -import org.apache.kafka.common.config.ConfigDef.Range._ -import org.apache.kafka.common.config.ConfigDef.Type._ - -import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef} +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SSLConfigs} import org.apache.kafka.common.metrics.MetricsReporter import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.security.auth.PrincipalBuilder -import scala.collection.{mutable, immutable, JavaConversions, Map} + +import scala.collection.{Map, immutable} object Defaults { @@ -175,7 +171,7 @@ object Defaults { val SSLClientAuthRequested = "requested" val SSLClientAuthNone = "none" val SSLClientAuth = SSLClientAuthNone - + val SSLCipherSuites = "" } object KafkaConfig { @@ -494,10 +490,10 @@ object KafkaConfig { val SSLClientAuthDoc = SSLConfigs.SSL_CLIENT_AUTH_DOC private val configDef = { + import ConfigDef.Importance._ import ConfigDef.Range._ - import ConfigDef.ValidString._ import ConfigDef.Type._ - import ConfigDef.Importance._ + import ConfigDef.ValidString._ new ConfigDef() @@ -646,7 +642,7 @@ object KafkaConfig { .define(SSLKeyManagerAlgorithmProp, STRING, Defaults.SSLKeyManagerAlgorithm, MEDIUM, SSLKeyManagerAlgorithmDoc) .define(SSLTrustManagerAlgorithmProp, STRING, Defaults.SSLTrustManagerAlgorithm, MEDIUM, SSLTrustManagerAlgorithmDoc) .define(SSLClientAuthProp, STRING, Defaults.SSLClientAuth, in(Defaults.SSLClientAuthRequired, Defaults.SSLClientAuthRequested, Defaults.SSLClientAuthNone), MEDIUM, SSLClientAuthDoc) - + .define(SSLCipherSuitesProp, LIST, Defaults.SSLCipherSuites, MEDIUM, SSLCipherSuitesDoc) } def configNames() = { @@ -804,6 +800,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val sslKeyManagerAlgorithm = getString(KafkaConfig.SSLKeyManagerAlgorithmProp) val sslTrustManagerAlgorithm = getString(KafkaConfig.SSLTrustManagerAlgorithmProp) val sslClientAuth = getString(KafkaConfig.SSLClientAuthProp) + val sslCipher = getList(KafkaConfig.SSLCipherSuitesProp) /** ********* Quota Configuration **************/ val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp) @@ -943,6 +940,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka channelConfigs.put(SSLKeyManagerAlgorithmProp, sslKeyManagerAlgorithm) channelConfigs.put(SSLTrustManagerAlgorithmProp, sslTrustManagerAlgorithm) channelConfigs.put(SSLClientAuthProp, sslClientAuth) + channelConfigs.put(SSLCipherSuitesProp, sslCipher) channelConfigs } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 5b4f2db4607ae..d2cf9b583db23 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -22,7 +22,7 @@ import java.util.Properties import junit.framework.Assert._ import kafka.api.{ApiVersion, KAFKA_082} import kafka.message._ -import kafka.utils.{TestUtils, CoreUtils} +import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.{Assert, Test} @@ -504,6 +504,7 @@ class KafkaConfigTest { case KafkaConfig.SSLKeyManagerAlgorithmProp => case KafkaConfig.SSLTrustManagerAlgorithmProp => case KafkaConfig.SSLClientAuthProp => // ignore string + case KafkaConfig.SSLCipherSuitesProp => // ignore string case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") }