sslConfigs = new HashMap<>();
+ sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocol);
+ sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList(tlsProtocol));
+
+ if (mode != Mode.CLIENT || useClientCert) {
+ KeyPair keyPair = generateKeyPair(algorithm);
+ X509Certificate cert = certBuilder.generate("CN=" + cn + ", O=A " + mode.name().toLowerCase(Locale.ROOT), keyPair);
+
+ Password privateKeyPem = new Password(pem(keyPair.getPrivate(), keyPassword));
+ Password certPem = new Password(pem(cert));
+ sslConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, PEM_TYPE);
+ sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, PEM_TYPE);
+ sslConfigs.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, privateKeyPem);
+ sslConfigs.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, certPem);
+ sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
+ sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, certPem);
+ }
+ return sslConfigs;
+ }
}
public static final class TestSslEngineFactory implements SslEngineFactory {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 76b93dabc228f..0e31d6e55d2a7 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -522,9 +522,12 @@ object KafkaConfig {
val SslKeystoreLocationProp = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG
val SslKeystorePasswordProp = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
val SslKeyPasswordProp = SslConfigs.SSL_KEY_PASSWORD_CONFIG
+ val SslKeystoreKeyProp = SslConfigs.SSL_KEYSTORE_KEY_CONFIG
+ val SslKeystoreCertificateChainProp = SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG
val SslTruststoreTypeProp = SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG
val SslTruststoreLocationProp = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG
val SslTruststorePasswordProp = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG
+ val SslTruststoreCertificatesProp = SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG
val SslKeyManagerAlgorithmProp = SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG
val SslTrustManagerAlgorithmProp = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG
val SslEndpointIdentificationAlgorithmProp = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
@@ -927,9 +930,12 @@ object KafkaConfig {
val SslKeystoreLocationDoc = SslConfigs.SSL_KEYSTORE_LOCATION_DOC
val SslKeystorePasswordDoc = SslConfigs.SSL_KEYSTORE_PASSWORD_DOC
val SslKeyPasswordDoc = SslConfigs.SSL_KEY_PASSWORD_DOC
+ val SslKeystoreKeyDoc = SslConfigs.SSL_KEYSTORE_KEY_DOC
+ val SslKeystoreCertificateChainDoc = SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC
val SslTruststoreTypeDoc = SslConfigs.SSL_TRUSTSTORE_TYPE_DOC
val SslTruststorePasswordDoc = SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC
val SslTruststoreLocationDoc = SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC
+ val SslTruststoreCertificatesDoc = SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC
val SslKeyManagerAlgorithmDoc = SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC
val SslTrustManagerAlgorithmDoc = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC
val SslEndpointIdentificationAlgorithmDoc = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC
@@ -1195,9 +1201,12 @@ object KafkaConfig {
.define(SslKeystoreLocationProp, STRING, null, MEDIUM, SslKeystoreLocationDoc)
.define(SslKeystorePasswordProp, PASSWORD, null, MEDIUM, SslKeystorePasswordDoc)
.define(SslKeyPasswordProp, PASSWORD, null, MEDIUM, SslKeyPasswordDoc)
+ .define(SslKeystoreKeyProp, PASSWORD, null, MEDIUM, SslKeystoreKeyDoc)
+ .define(SslKeystoreCertificateChainProp, PASSWORD, null, MEDIUM, SslKeystoreCertificateChainDoc)
.define(SslTruststoreTypeProp, STRING, Defaults.SslTruststoreType, MEDIUM, SslTruststoreTypeDoc)
.define(SslTruststoreLocationProp, STRING, null, MEDIUM, SslTruststoreLocationDoc)
.define(SslTruststorePasswordProp, PASSWORD, null, MEDIUM, SslTruststorePasswordDoc)
+ .define(SslTruststoreCertificatesProp, PASSWORD, null, MEDIUM, SslTruststoreCertificatesDoc)
.define(SslKeyManagerAlgorithmProp, STRING, Defaults.SslKeyManagerAlgorithm, MEDIUM, SslKeyManagerAlgorithmDoc)
.define(SslTrustManagerAlgorithmProp, STRING, Defaults.SslTrustManagerAlgorithm, MEDIUM, SslTrustManagerAlgorithmDoc)
.define(SslEndpointIdentificationAlgorithmProp, STRING, Defaults.SslEndpointIdentificationAlgorithm, LOW, SslEndpointIdentificationAlgorithmDoc)
diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index 35d4c8d6d4dd5..eec16239e147d 100644
--- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -16,10 +16,13 @@
*/
package kafka.api
+import java.util.Properties
+
import kafka.utils.JaasTestUtils
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.ScramMechanism
+import org.apache.kafka.test.TestSslUtils
import scala.jdk.CollectionConverters._
import org.junit.Before
@@ -36,6 +39,14 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
// Create broker credentials before starting brokers
createScramCredentials(zkConnect, kafkaPrincipal.getName, kafkaPassword)
+ TestSslUtils.convertToPemWithoutFiles(producerConfig)
+ TestSslUtils.convertToPemWithoutFiles(consumerConfig)
+ TestSslUtils.convertToPemWithoutFiles(adminClientConfig)
+ }
+
+ override def configureListeners(props: collection.Seq[Properties]): Unit = {
+ props.foreach(TestSslUtils.convertToPemWithoutFiles)
+ super.configureListeners(props)
}
override def createPrivilegedAdminClient() = createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, kafkaPassword)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 2c390c4b700aa..604c9de877453 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -206,7 +206,11 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
def verifySslConfig(prefix: String, expectedProps: Properties, configDesc: Config): Unit = {
- KEYSTORE_PROPS.forEach { configName =>
+ // Validate file-based SSL keystore configs
+ val keyStoreProps = new util.HashSet[String](KEYSTORE_PROPS)
+ keyStoreProps.remove(SSL_KEYSTORE_KEY_CONFIG)
+ keyStoreProps.remove(SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG)
+ keyStoreProps.forEach { configName =>
val desc = configEntry(configDesc, s"$prefix$configName")
val isSensitive = configName.contains("password")
verifyConfig(configName, desc, isSensitive, isReadOnly = prefix.nonEmpty, if (prefix.isEmpty) invalidSslProperties else sslProperties1)
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 4a388e8055b94..e0d058115e774 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -84,14 +84,23 @@ class KafkaTest {
val propertiesFile = prepareDefaultConfig()
val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "ssl.keystore.password=keystore_password",
"--override", "ssl.key.password=key_password",
- "--override", "ssl.truststore.password=truststore_password")))
+ "--override", "ssl.truststore.password=truststore_password",
+ "--override", "ssl.keystore.certificate.chain=certificate_chain",
+ "--override", "ssl.keystore.key=private_key",
+ "--override", "ssl.truststore.certificates=truststore_certificates")))
assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeyPasswordProp).toString)
assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeystorePasswordProp).toString)
assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslTruststorePasswordProp).toString)
+ assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeystoreKeyProp).toString)
+ assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeystoreCertificateChainProp).toString)
+ assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslTruststoreCertificatesProp).toString)
assertEquals("key_password", config.getPassword(KafkaConfig.SslKeyPasswordProp).value)
assertEquals("keystore_password", config.getPassword(KafkaConfig.SslKeystorePasswordProp).value)
assertEquals("truststore_password", config.getPassword(KafkaConfig.SslTruststorePasswordProp).value)
+ assertEquals("private_key", config.getPassword(KafkaConfig.SslKeystoreKeyProp).value)
+ assertEquals("certificate_chain", config.getPassword(KafkaConfig.SslKeystoreCertificateChainProp).value)
+ assertEquals("truststore_certificates", config.getPassword(KafkaConfig.SslTruststoreCertificatesProp).value)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 72380a7fff17f..fdacd813552d9 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -740,9 +740,12 @@ class KafkaConfigTest {
case KafkaConfig.SslKeystoreLocationProp => // ignore string
case KafkaConfig.SslKeystorePasswordProp => // ignore string
case KafkaConfig.SslKeyPasswordProp => // ignore string
+ case KafkaConfig.SslKeystoreCertificateChainProp => // ignore string
+ case KafkaConfig.SslKeystoreKeyProp => // ignore string
case KafkaConfig.SslTruststoreTypeProp => // ignore string
case KafkaConfig.SslTruststorePasswordProp => // ignore string
case KafkaConfig.SslTruststoreLocationProp => // ignore string
+ case KafkaConfig.SslTruststoreCertificatesProp => // ignore string
case KafkaConfig.SslKeyManagerAlgorithmProp =>
case KafkaConfig.SslTrustManagerAlgorithmProp =>
case KafkaConfig.SslClientAuthProp => // ignore string
diff --git a/docs/security.html b/docs/security.html
index f0a1e5f7a2373..f70e13f3e5bca 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -282,6 +282,22 @@ Host Name Verification
For some tooling assistance on this topic, please check out the easyRSA project which has
extensive scripting in place to help with these steps.
+ SSL key and certificates in PEM format
+ From 2.7.0 onwards, SSL key and trust stores can be configured for Kafka brokers and clients directly in the configuration in PEM format.
+ This avoids the need to store separate files on the file system and benefits from password protection features of Kafka configuration.
+ PEM may also be used as the store type for file-based key and trust stores in addition to JKS and PKCS12. To configure PEM key store directly in the
+ broker or client configuration, private key in PEM format should be provided in ssl.keystore.key and the certificate chain in PEM format
+ should be provided in ssl.keystore.certificate.chain. To configure trust store, trust certificates, e.g. public certificate of CA,
+ should be provided in ssl.truststore.certificates. Since PEM is typically stored as multi-line base-64 strings, the configuration value
+ can be included in Kafka configuration as multi-line strings with lines terminating in backslash ('\') for line continuation.
+
+ Store password configs ssl.keystore.password and ssl.truststore.password are not used for PEM.
+ If private key is encrypted using a password, the key password must be provided in ssl.key.password. Private keys may be provided
+ in unencrypted form without a password when PEM is specified directly in the config value. In production deployments, configs should be encrypted or
+ externalized using password protection feature in Kafka in this case. Note that the default SSL engine factory has limited capabilities for decryption
+ of encrypted private keys when external tools like OpenSSL are used for encryption. Third party libraries like BouncyCastle may be integrated witn a
+ custom SslEngineFactory to support a wider range of encrypted private keys.
+
The above paragraphs show the process to create your own CA and use it to sign certificates for your cluster.