From ba3773c3fead43d4d30cc246f9e9fd9eb1736fa2 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 24 Sep 2020 00:00:22 +0100 Subject: [PATCH 1/3] KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651) --- checkstyle/import-control.xml | 3 + checkstyle/suppressions.xml | 4 +- .../kafka/common/config/SslConfigs.java | 33 +- .../common/network/SslChannelBuilder.java | 2 + .../security/ssl/DefaultSslEngineFactory.java | 315 +++++++++++++++-- .../kafka/common/network/CertStores.java | 79 ++++- .../common/network/SslTransportLayerTest.java | 302 ++++++++-------- .../ssl/DefaultSslEngineFactoryTest.java | 324 ++++++++++++++++++ .../common/security/ssl/SslFactoryTest.java | 99 +++++- .../org/apache/kafka/test/TestSslUtils.java | 304 +++++++++++++--- ...aslScramSslEndToEndAuthorizationTest.scala | 5 + .../DynamicBrokerReconfigurationTest.scala | 6 +- docs/security.html | 16 + 13 files changed, 1252 insertions(+), 240 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index a7dc27fcd69f4..2d67a23ea2d84 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -109,6 +109,9 @@ + + + diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index d6b4f97232ecf..2b19a8204df7b 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -63,7 +63,7 @@ files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/> + files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory).java"/> @@ -96,7 +96,7 @@ files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/> + files="MemoryRecordsTest|MetricsTest|TestSslUtils"/> diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index 6dc0f5807c71e..6e9e17638ffc3 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.config; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.utils.Java; import org.apache.kafka.common.utils.Utils; @@ -90,17 +91,31 @@ public class SslConfigs { + "This is optional for client."; public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS"; + public static final String SSL_KEYSTORE_KEY_CONFIG = "ssl.keystore.key"; + public static final String SSL_KEYSTORE_KEY_DOC = "Private key in the format specified by 'ssl.keystore.type'. " + + "Default SSL engine factory supports only PEM format with PKCS#8 keys. If the key is encrypted, " + + "key password must be specified using 'ssl.key.password'"; + + public static final String SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG = "ssl.keystore.certificate.chain"; + public static final String SSL_KEYSTORE_CERTIFICATE_DOC = "Certificate chain in the format specified by 'ssl.keystore.type'. " + + "Default SSL engine factory supports only PEM format with a list of X.509 certificates"; + + public static final String SSL_TRUSTSTORE_CERTIFICATES_CONFIG = "ssl.truststore.certificates"; + public static final String SSL_TRUSTSTORE_CERTIFICATES_DOC = "Trusted certificates in the format specified by 'ssl.truststore.type'. " + + "Default SSL engine factory supports only PEM format with X.509 certificates."; + public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location"; public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. " + "This is optional for client and can be used for two-way authentication for client."; public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password"; public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file. " - + "This is optional for client and only needed if ssl.keystore.location is configured. "; + + "This is optional for client and only needed if 'ssl.keystore.location' is configured. " + + " Key store password is not supported for PEM format."; public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password"; - public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file. " - + "This is optional for client."; + public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file or" + + "the PEM key specified in `ssl.keystore.key'. This is required for clients only if two-way authentication is configured."; public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type"; public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file."; @@ -110,7 +125,9 @@ public class SslConfigs { public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. "; public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password"; - public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled."; + public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. " + + "If a password is not set, trust store file configured will still be used, but integrity checking is disabled. " + + "Trust store password is not supported for PEM format."; public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm"; public static final String SSL_KEYMANAGER_ALGORITHM_DOC = "The algorithm used by key manager factory for SSL connections. " @@ -152,6 +169,9 @@ public static void addClientSslSupport(ConfigDef config) { .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC) .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC) .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC) + .define(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_KEY_DOC) + .define(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_CERTIFICATE_DOC) + .define(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC) .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC) .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC) .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC) @@ -169,7 +189,10 @@ public static void addClientSslSupport(ConfigDef config) { SslConfigs.SSL_KEY_PASSWORD_CONFIG, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, - SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG); + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, + SslConfigs.SSL_KEYSTORE_KEY_CONFIG, + SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG); public static final Set NON_RECONFIGURABLE_CONFIGS = Utils.mkSet( BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index 5a1a46b5846c9..327fff8f92128 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -70,6 +70,8 @@ public void configure(Map configs) throws KafkaException { sslPrincipalMapper = SslPrincipalMapper.fromRules(sslPrincipalMappingRules); this.sslFactory = new SslFactory(mode, null, isInterBrokerListener); this.sslFactory.configure(this.configs); + } catch (KafkaException e) { + throw e; } catch (Exception e) { throw new KafkaException(e); } diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java index 02294e5b403ef..59ee20fe068d2 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java @@ -21,36 +21,58 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.security.auth.SslEngineFactory; import org.apache.kafka.common.utils.SecurityUtils; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.KeyManager; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLParameters; -import javax.net.ssl.TrustManagerFactory; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; import java.security.GeneralSecurityException; +import java.security.Key; +import java.security.KeyFactory; import java.security.KeyStore; +import java.security.PrivateKey; import java.security.SecureRandom; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.spec.InvalidKeySpecException; +import java.security.spec.PKCS8EncodedKeySpec; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; +import javax.crypto.Cipher; +import javax.crypto.EncryptedPrivateKeyInfo; +import javax.crypto.SecretKey; +import javax.crypto.SecretKeyFactory; +import javax.crypto.spec.PBEKeySpec; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.TrustManagerFactory; + public final class DefaultSslEngineFactory implements SslEngineFactory { private static final Logger log = LoggerFactory.getLogger(DefaultSslEngineFactory.class); + public static final String PEM_TYPE = "PEM"; private Map configs; private String protocol; @@ -139,13 +161,16 @@ public void configure(Map configs) { this.keystore = createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), (Password) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), - (Password) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)); + (Password) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), + (Password) configs.get(SslConfigs.SSL_KEYSTORE_KEY_CONFIG), + (Password) configs.get(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG)); this.truststore = createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), (String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), - (Password) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + (Password) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), + (Password) configs.get(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG)); - this.sslContext = createSSLContext(); + this.sslContext = createSSLContext(keystore, truststore); } @Override @@ -209,7 +234,7 @@ private static SecureRandom createSecureRandom(String key) { } } - private SSLContext createSSLContext() { + private SSLContext createSSLContext(SecurityStore keystore, SecurityStore truststore) { try { SSLContext sslContext; if (provider != null) @@ -223,9 +248,7 @@ private SSLContext createSSLContext() { this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm(); KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm); if (keystore != null) { - KeyStore ks = keystore.get(); - Password keyPassword = keystore.keyPassword != null ? keystore.keyPassword : keystore.password; - kmf.init(ks, keyPassword.value().toCharArray()); + kmf.init(keystore.get(), keystore.keyPassword()); } else { kmf.init(null, null); } @@ -246,56 +269,104 @@ private SSLContext createSSLContext() { } } - private static SecurityStore createKeystore(String type, String path, Password password, Password keyPassword) { - if (path == null && password != null) { - throw new KafkaException("SSL key store is not specified, but key store password is specified."); + // Visibility to override for testing + protected SecurityStore createKeystore(String type, String path, Password password, Password keyPassword, Password privateKey, Password certificateChain) { + if (privateKey != null) { + if (!PEM_TYPE.equals(type)) + throw new InvalidConfigurationException("SSL private key can be specified only for PEM, but key store type is " + type + "."); + else if (certificateChain == null) + throw new InvalidConfigurationException("SSL private key is specified, but certificate chain is not specified."); + else if (path != null) + throw new InvalidConfigurationException("Both SSL key store location and separate private key are specified."); + else if (password != null) + throw new InvalidConfigurationException("SSL key store password cannot be specified with PEM format, only key password may be specified."); + else + return new PemStore(certificateChain, privateKey, keyPassword); + } else if (certificateChain != null) { + throw new InvalidConfigurationException("SSL certificate chain is specified, but private key is not specified"); + } else if (PEM_TYPE.equals(type) && path != null) { + if (password != null) + throw new InvalidConfigurationException("SSL key store password cannot be specified with PEM format, only key password may be specified"); + else if (keyPassword == null) + throw new InvalidConfigurationException("SSL PEM key store is specified, but key password is not specified."); + else + return new FileBasedPemStore(path, keyPassword, true); + } else if (path == null && password != null) { + throw new InvalidConfigurationException("SSL key store is not specified, but key store password is specified."); } else if (path != null && password == null) { - throw new KafkaException("SSL key store is specified, but key store password is not specified."); + throw new InvalidConfigurationException("SSL key store is specified, but key store password is not specified."); } else if (path != null && password != null) { - return new SecurityStore(type, path, password, keyPassword); + return new FileBasedStore(type, path, password, keyPassword, true); } else return null; // path == null, clients may use this path with brokers that don't require client auth } - private static SecurityStore createTruststore(String type, String path, Password password) { - if (path == null && password != null) { - throw new KafkaException("SSL trust store is not specified, but trust store password is specified."); + private static SecurityStore createTruststore(String type, String path, Password password, Password trustStoreCerts) { + if (trustStoreCerts != null) { + if (!PEM_TYPE.equals(type)) + throw new InvalidConfigurationException("SSL trust store certs can be specified only for PEM, but trust store type is " + type + "."); + else if (path != null) + throw new InvalidConfigurationException("Both SSL trust store location and separate trust certificates are specified."); + else if (password != null) + throw new InvalidConfigurationException("SSL trust store password cannot be specified for PEM format."); + else + return new PemStore(trustStoreCerts); + } else if (PEM_TYPE.equals(type) && path != null) { + if (password != null) + throw new InvalidConfigurationException("SSL trust store password cannot be specified for PEM format."); + else + return new FileBasedPemStore(path, null, false); + } else if (path == null && password != null) { + throw new InvalidConfigurationException("SSL trust store is not specified, but trust store password is specified."); } else if (path != null) { - return new SecurityStore(type, path, password, null); + return new FileBasedStore(type, path, password, null, false); } else return null; } + static interface SecurityStore { + KeyStore get(); + char[] keyPassword(); + boolean modified(); + } + // package access for testing - static class SecurityStore { + static class FileBasedStore implements SecurityStore { private final String type; - private final String path; + protected final String path; private final Password password; - private final Password keyPassword; + protected final Password keyPassword; private final Long fileLastModifiedMs; private final KeyStore keyStore; - SecurityStore(String type, String path, Password password, Password keyPassword) { + FileBasedStore(String type, String path, Password password, Password keyPassword, boolean isKeyStore) { Objects.requireNonNull(type, "type must not be null"); this.type = type; this.path = path; this.password = password; this.keyPassword = keyPassword; fileLastModifiedMs = lastModifiedMs(path); - this.keyStore = load(); + this.keyStore = load(isKeyStore); } - KeyStore get() { + @Override + public KeyStore get() { return keyStore; } + @Override + public char[] keyPassword() { + Password passwd = keyPassword != null ? keyPassword : password; + return passwd == null ? null : passwd.value().toCharArray(); + } + /** * Loads this keystore * @return the keystore * @throws KafkaException if the file could not be read or if the keystore could not be loaded * using the specified configs (e.g. if the password or keystore type is invalid) */ - private KeyStore load() { + protected KeyStore load(boolean isKeyStore) { try (InputStream in = Files.newInputStream(Paths.get(path))) { KeyStore ks = KeyStore.getInstance(type); // If a password is not set access to the truststore is still available, but integrity checking is disabled. @@ -316,7 +387,7 @@ private Long lastModifiedMs(String path) { } } - boolean modified() { + public boolean modified() { Long modifiedMs = lastModifiedMs(path); return modifiedMs != null && !Objects.equals(modifiedMs, this.fileLastModifiedMs); } @@ -328,4 +399,186 @@ public String toString() { ", modificationTime=" + (fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs)) + ")"; } } -} \ No newline at end of file + + static class FileBasedPemStore extends FileBasedStore { + FileBasedPemStore(String path, Password keyPassword, boolean isKeyStore) { + super(PEM_TYPE, path, null, keyPassword, isKeyStore); + } + + @Override + protected KeyStore load(boolean isKeyStore) { + try { + Password storeContents = new Password(Utils.readFileAsString(path)); + PemStore pemStore = isKeyStore ? new PemStore(storeContents, storeContents, keyPassword) : + new PemStore(storeContents); + return pemStore.keyStore; + } catch (Exception e) { + throw new InvalidConfigurationException("Failed to load PEM SSL keystore " + path, e); + } + } + } + + static class PemStore implements SecurityStore { + private static final PemParser CERTIFICATE_PARSER = new PemParser("CERTIFICATE"); + private static final PemParser PRIVATE_KEY_PARSER = new PemParser("PRIVATE KEY"); + private static final List KEY_FACTORIES = Arrays.asList( + keyFactory("RSA"), + keyFactory("DSA"), + keyFactory("EC") + ); + + private final char[] keyPassword; + private final KeyStore keyStore; + + PemStore(Password certificateChain, Password privateKey, Password keyPassword) { + this.keyPassword = keyPassword == null ? null : keyPassword.value().toCharArray(); + keyStore = createKeyStoreFromPem(privateKey.value(), certificateChain.value(), this.keyPassword); + } + + PemStore(Password trustStoreCerts) { + this.keyPassword = null; + keyStore = createTrustStoreFromPem(trustStoreCerts.value()); + } + + @Override + public KeyStore get() { + return keyStore; + } + + @Override + public char[] keyPassword() { + return keyPassword; + } + + @Override + public boolean modified() { + return false; + } + + private KeyStore createKeyStoreFromPem(String privateKeyPem, String certChainPem, char[] keyPassword) { + try { + KeyStore ks = KeyStore.getInstance("PKCS12"); + ks.load(null, null); + Key key = privateKey(privateKeyPem, keyPassword); + Certificate[] certChain = certs(certChainPem); + ks.setKeyEntry("kafka", key, keyPassword, certChain); + return ks; + } catch (Exception e) { + throw new InvalidConfigurationException("Invalid PEM keystore configs", e); + } + } + + private KeyStore createTrustStoreFromPem(String trustedCertsPem) { + try { + KeyStore ts = KeyStore.getInstance("PKCS12"); + ts.load(null, null); + Certificate[] certs = certs(trustedCertsPem); + for (int i = 0; i < certs.length; i++) { + ts.setCertificateEntry("kafka" + i, certs[i]); + } + return ts; + } catch (InvalidConfigurationException e) { + throw e; + } catch (Exception e) { + throw new InvalidConfigurationException("Invalid PEM keystore configs", e); + } + } + + private Certificate[] certs(String pem) throws GeneralSecurityException { + List certEntries = CERTIFICATE_PARSER.pemEntries(pem); + if (certEntries.isEmpty()) + throw new InvalidConfigurationException("At least one certificate expected, but none found"); + + Certificate[] certs = new Certificate[certEntries.size()]; + for (int i = 0; i < certs.length; i++) { + certs[i] = CertificateFactory.getInstance("X.509") + .generateCertificate(new ByteArrayInputStream(certEntries.get(i))); + } + return certs; + } + + private PrivateKey privateKey(String pem, char[] keyPassword) throws Exception { + List keyEntries = PRIVATE_KEY_PARSER.pemEntries(pem); + if (keyEntries.isEmpty()) + throw new InvalidConfigurationException("Private key not provided"); + if (keyEntries.size() != 1) + throw new InvalidConfigurationException("Expected one private key, but found " + keyEntries.size()); + + byte[] keyBytes = keyEntries.get(0); + PKCS8EncodedKeySpec keySpec; + if (keyPassword == null) { + keySpec = new PKCS8EncodedKeySpec(keyBytes); + } else { + EncryptedPrivateKeyInfo keyInfo = new EncryptedPrivateKeyInfo(keyBytes); + String algorithm = keyInfo.getAlgName(); + SecretKeyFactory keyFactory = SecretKeyFactory.getInstance(algorithm); + SecretKey pbeKey = keyFactory.generateSecret(new PBEKeySpec(keyPassword)); + Cipher cipher = Cipher.getInstance(algorithm); + cipher.init(Cipher.DECRYPT_MODE, pbeKey, keyInfo.getAlgParameters()); + keySpec = keyInfo.getKeySpec(cipher); + } + + InvalidKeySpecException firstException = null; + for (KeyFactory factory : KEY_FACTORIES) { + try { + return factory.generatePrivate(keySpec); + } catch (InvalidKeySpecException e) { + if (firstException == null) + firstException = e; + } + } + throw new InvalidConfigurationException("Private key could not be loaded", firstException); + } + + private static KeyFactory keyFactory(String algorithm) { + try { + return KeyFactory.getInstance(algorithm); + } catch (Exception e) { + throw new InvalidConfigurationException("Could not create key factory for algorithm " + algorithm, e); + } + } + } + + /** + * Parser to process certificate/private key entries from PEM files + * Examples: + * -----BEGIN CERTIFICATE----- + * Base64 cert + * -----END CERTIFICATE----- + * + * -----BEGIN ENCRYPTED PRIVATE KEY----- + * Base64 private key + * -----BEGIN ENCRYPTED PRIVATE KEY----- + * Additional data may be included before headers, so we match all entres within the PEM. + */ + static class PemParser { + private final String name; + private final Pattern pattern; + + PemParser(String name) { + this.name = name; + String beginOrEndFormat = "-+%s\\s*.*%s[^-]*-+\\s+"; + String nameIgnoreSpace = name.replace(" ", "\\s+"); + + String encodingParams = "\\s*[^\\r\\n]*:[^\\r\\n]*[\\r\\n]+"; + String base64Pattern = "([a-zA-Z0-9/+=\\s]*)"; + String patternStr = String.format(beginOrEndFormat, "BEGIN", nameIgnoreSpace) + + String.format("(?:%s)*", encodingParams) + + base64Pattern + + String.format(beginOrEndFormat, "END", nameIgnoreSpace); + pattern = Pattern.compile(patternStr); + } + + private List pemEntries(String pem) { + Matcher matcher = pattern.matcher(pem + "\n"); // allow last newline to be omitted in value + List entries = new ArrayList<>(); + while (matcher.find()) { + String base64Str = matcher.group(1).replaceAll("\\s", ""); + entries.add(Base64.getDecoder().decode(base64Str)); + } + if (entries.isEmpty()) + throw new InvalidConfigurationException("No matching " + name + " entries in PEM file"); + return entries; + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/network/CertStores.java b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java index 972748d915827..4e13a909d082d 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/CertStores.java +++ b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.network; +import java.util.ArrayList; +import java.util.List; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestSslUtils; @@ -25,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import org.apache.kafka.test.TestSslUtils.SslConfigsBuilder; public class CertStores { @@ -32,12 +35,15 @@ public class CertStores { SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, - SslConfigs.SSL_KEY_PASSWORD_CONFIG); + SslConfigs.SSL_KEY_PASSWORD_CONFIG, + SslConfigs.SSL_KEYSTORE_KEY_CONFIG, + SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG); public static final Set TRUSTSTORE_PROPS = Utils.mkSet( SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, - SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG); + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG); private final Map sslConfig; @@ -54,13 +60,30 @@ public CertStores(boolean server, String commonName, InetAddress hostAddress) th } private CertStores(boolean server, String commonName, TestSslUtils.CertificateBuilder certBuilder) throws Exception { + this(server, commonName, "RSA", certBuilder, false); + } + + private CertStores(boolean server, String commonName, String keyAlgorithm, TestSslUtils.CertificateBuilder certBuilder, boolean usePem) throws Exception { String name = server ? "server" : "client"; Mode mode = server ? Mode.SERVER : Mode.CLIENT; - File truststoreFile = File.createTempFile(name + "TS", ".jks"); - sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name, commonName, certBuilder); + File truststoreFile = usePem ? null : File.createTempFile(name + "TS", ".jks"); + sslConfig = new SslConfigsBuilder(mode) + .useClientCert(!server) + .certAlias(name) + .cn(commonName) + .createNewTrustStore(truststoreFile) + .certBuilder(certBuilder) + .algorithm(keyAlgorithm) + .usePem(usePem) + .build(); } + public Map getTrustingConfig(CertStores truststoreConfig) { + return getTrustingConfig(truststoreConfig, false); + } + + public Map getTrustingConfig(CertStores truststoreConfig, boolean usePemCerts) { Map config = new HashMap<>(sslConfig); for (String propName : TRUSTSTORE_PROPS) { config.put(propName, truststoreConfig.sslConfig.get(propName)); @@ -87,4 +110,52 @@ public Map trustStoreProps() { } return props; } + + public static class Builder { + private final boolean isServer; + private String cn; + private List sanDns; + private InetAddress sanIp; + private String keyAlgorithm; + private boolean usePem; + + public Builder(boolean isServer) { + this.isServer = isServer; + this.sanDns = new ArrayList<>(); + this.keyAlgorithm = "RSA"; + } + + public Builder cn(String cn) { + this.cn = cn; + return this; + } + + public Builder addHostName(String hostname) { + this.sanDns.add(hostname); + return this; + } + + public Builder hostAddress(InetAddress hostAddress) { + this.sanIp = hostAddress; + return this; + } + + public Builder keyAlgorithm(String keyAlgorithm) { + this.keyAlgorithm = keyAlgorithm; + return this; + } + + public Builder usePem(boolean usePem) { + this.usePem = usePem; + return this; + } + + public CertStores build() throws Exception { + TestSslUtils.CertificateBuilder certBuilder = new TestSslUtils.CertificateBuilder() + .sanDnsNames(sanDns.toArray(new String[0])); + if (sanIp != null) + certBuilder = certBuilder.sanIpAddress(sanIp); + return new CertStores(isServer, cn, keyAlgorithm, certBuilder, usePem); + } + } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 34461b262052f..60a308ad40a02 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -16,15 +16,16 @@ */ package org.apache.kafka.common.network; -import java.io.File; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.TestSecurityConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.utils.Java; import org.apache.kafka.common.utils.LogContext; @@ -39,10 +40,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLParameters; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -54,16 +53,22 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; /** * Tests for the SSL transport layer. These use a test harness that runs a simple socket server that echos back responses. @@ -75,6 +80,7 @@ public class SslTransportLayerTest { private static Time time = Time.SYSTEM; private final String tlsProtocol; + private final boolean useInlinePem; private NioEchoServer server; private Selector selector; private CertStores serverCertStores; @@ -83,32 +89,40 @@ public class SslTransportLayerTest { private Map sslServerConfigs; private Map sslConfigOverrides; - @Parameterized.Parameters(name = "tlsProtocol={0}") + @Parameterized.Parameters(name = "tlsProtocol={0}, useInlinePem={1}") public static Collection data() { List values = new ArrayList<>(); - values.add(new Object[] {"TLSv1.2"}); + values.add(new Object[] {"TLSv1.2", false}); + values.add(new Object[] {"TLSv1.2", true}); if (Java.IS_JAVA11_COMPATIBLE) { - values.add(new Object[] {"TLSv1.3"}); + values.add(new Object[] {"TLSv1.3", false}); } return values; } - public SslTransportLayerTest(String tlsProtocol) { + public SslTransportLayerTest(String tlsProtocol, boolean useInlinePem) { this.tlsProtocol = tlsProtocol; + this.useInlinePem = useInlinePem; sslConfigOverrides = new HashMap<>(); sslConfigOverrides.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocol); sslConfigOverrides.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList(tlsProtocol)); } + private CertStores.Builder certBuilder(boolean isServer, String cn) { + return new CertStores.Builder(isServer) + .cn(cn) + .usePem(useInlinePem); + } @Before public void setup() throws Exception { // Create certificates for use by client and server. Add server cert to client truststore and vice versa. - serverCertStores = new CertStores(true, "server", "localhost"); - clientCertStores = new CertStores(false, "client", "localhost"); + serverCertStores = certBuilder(true, "server").addHostName("localhost").build(); + clientCertStores = certBuilder(false, "client").addHostName("localhost").build(); sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores); sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores); - sslServerConfigs.putAll(sslConfigOverrides); - sslClientConfigs.putAll(sslConfigOverrides); + sslServerConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, DefaultSslEngineFactory.class); + sslClientConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, DefaultSslEngineFactory.class); + LogContext logContext = new LogContext(); ChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false, logContext); channelBuilder.configure(sslClientConfigs); @@ -147,8 +161,8 @@ public void testValidEndpointIdentificationSanDns() throws Exception { @Test public void testValidEndpointIdentificationSanIp() throws Exception { String node = "0"; - serverCertStores = new CertStores(true, "server", InetAddress.getByName("127.0.0.1")); - clientCertStores = new CertStores(false, "client", InetAddress.getByName("127.0.0.1")); + serverCertStores = certBuilder(true, "server").hostAddress(InetAddress.getByName("127.0.0.1")).build(); + clientCertStores = certBuilder(false, "client").hostAddress(InetAddress.getByName("127.0.0.1")).build(); sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores); sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores); server = createEchoServer(SecurityProtocol.SSL); @@ -167,17 +181,12 @@ public void testValidEndpointIdentificationSanIp() throws Exception { @Test public void testValidEndpointIdentificationCN() throws Exception { String node = "0"; - serverCertStores = new CertStores(true, "localhost"); - clientCertStores = new CertStores(false, "localhost"); + serverCertStores = certBuilder(true, "localhost").build(); + clientCertStores = certBuilder(false, "localhost").build(); sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores); sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores); - server = createEchoServer(SecurityProtocol.SSL); sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + verifySslConfigs(); } /** @@ -217,8 +226,8 @@ public void testClientEndpointNotValidated() throws Exception { String node = "0"; // Create client certificate with an invalid hostname - clientCertStores = new CertStores(false, "non-existent.com"); - serverCertStores = new CertStores(true, "localhost"); + clientCertStores = certBuilder(false, "non-existent.com").build(); + serverCertStores = certBuilder(true, "localhost").build(); sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores); sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores); @@ -252,18 +261,12 @@ protected TestSslTransportLayer newTransportLayer(String id, SelectionKey key, S @Test public void testInvalidEndpointIdentification() throws Exception { String node = "0"; - serverCertStores = new CertStores(true, "server", "notahost"); - clientCertStores = new CertStores(false, "client", "localhost"); + serverCertStores = certBuilder(true, "server").addHostName("notahost").build(); + clientCertStores = certBuilder(false, "client").addHostName("localhost").build(); sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores); sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores); sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); - server = createEchoServer(SecurityProtocol.SSL); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); - server.verifyAuthenticationMetrics(0, 1); + verifySslConfigsWithHandshakeFailure(); } /** @@ -272,8 +275,8 @@ public void testInvalidEndpointIdentification() throws Exception { */ @Test public void testEndpointIdentificationDisabled() throws Exception { - serverCertStores = new CertStores(true, "server", "notahost"); - clientCertStores = new CertStores(false, "client", "localhost"); + serverCertStores = certBuilder(true, "server").addHostName("notahost").build(); + clientCertStores = certBuilder(false, "client").addHostName("localhost").build(); sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores); sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores); @@ -311,12 +314,7 @@ public void testEndpointIdentificationDisabled() throws Exception { public void testClientAuthenticationRequiredValidProvided() throws Exception { String node = "0"; sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); - server = createEchoServer(SecurityProtocol.SSL); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + verifySslConfigs(); } /** @@ -340,9 +338,7 @@ public void testListenerConfigOverride() throws Exception { selector.close(); // Remove client auth, so connection should fail - sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); - sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); - sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); + CertStores.KEYSTORE_PROPS.forEach(sslClientConfigs::remove); createSelector(sslClientConfigs); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); @@ -369,13 +365,7 @@ public void testClientAuthenticationRequiredUntrustedProvided() throws Exception sslServerConfigs = serverCertStores.getUntrustingConfig(); sslServerConfigs.putAll(sslConfigOverrides); sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); - server = createEchoServer(SecurityProtocol.SSL); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); - server.verifyAuthenticationMetrics(0, 1); + verifySslConfigsWithHandshakeFailure(); } /** @@ -386,17 +376,8 @@ public void testClientAuthenticationRequiredUntrustedProvided() throws Exception public void testClientAuthenticationRequiredNotProvided() throws Exception { String node = "0"; sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); - server = createEchoServer(SecurityProtocol.SSL); - - sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); - sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); - sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); - server.verifyAuthenticationMetrics(0, 1); + CertStores.KEYSTORE_PROPS.forEach(sslClientConfigs::remove); + verifySslConfigsWithHandshakeFailure(); } /** @@ -409,12 +390,7 @@ public void testClientAuthenticationDisabledUntrustedProvided() throws Exception sslServerConfigs = serverCertStores.getUntrustingConfig(); sslServerConfigs.putAll(sslConfigOverrides); sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); - server = createEchoServer(SecurityProtocol.SSL); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + verifySslConfigs(); } /** @@ -425,16 +401,9 @@ public void testClientAuthenticationDisabledUntrustedProvided() throws Exception public void testClientAuthenticationDisabledNotProvided() throws Exception { String node = "0"; sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); - server = createEchoServer(SecurityProtocol.SSL); - sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); - sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); - sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + CertStores.KEYSTORE_PROPS.forEach(sslClientConfigs::remove); + verifySslConfigs(); } /** @@ -445,12 +414,7 @@ public void testClientAuthenticationDisabledNotProvided() throws Exception { public void testClientAuthenticationRequestedValidProvided() throws Exception { String node = "0"; sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); - server = createEchoServer(SecurityProtocol.SSL); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + verifySslConfigs(); } /** @@ -461,16 +425,81 @@ public void testClientAuthenticationRequestedValidProvided() throws Exception { public void testClientAuthenticationRequestedNotProvided() throws Exception { String node = "0"; sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); - server = createEchoServer(SecurityProtocol.SSL); - sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); - sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); - sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); + CertStores.KEYSTORE_PROPS.forEach(sslClientConfigs::remove); + verifySslConfigs(); + } - NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + /** + * Tests key-pair created using DSA. + */ + @Test + public void testDsaKeyPair() throws Exception { + // DSA algorithms are not supported for TLSv1.3. + assumeTrue(tlsProtocol.equals("TLSv1.2")); + serverCertStores = certBuilder(true, "server").keyAlgorithm("DSA").build(); + clientCertStores = certBuilder(false, "client").keyAlgorithm("DSA").build(); + sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores); + sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores); + sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); + verifySslConfigs(); + } + + /** + * Tests key-pair created using EC. + */ + @Test + public void testECKeyPair() throws Exception { + serverCertStores = certBuilder(true, "server").keyAlgorithm("EC").build(); + clientCertStores = certBuilder(false, "client").keyAlgorithm("EC").build(); + sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores); + sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores); + sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); + verifySslConfigs(); + } + + /** + * Tests PEM key store and trust store files which don't have store passwords. + */ + @Test + public void testPemFiles() throws Exception { + TestSslUtils.convertToPem(sslServerConfigs, true, true); + TestSslUtils.convertToPem(sslClientConfigs, true, true); + sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); + verifySslConfigs(); + } + + /** + * Test with PEM key store files without key password for client key store. We don't allow this + * with PEM files since unprotected private key on disk is not safe. We do allow with inline + * PEM config since key config can be encrypted or externalized similar to other password configs. + */ + @Test + public void testPemFilesWithoutClientKeyPassword() throws Exception { + TestSslUtils.convertToPem(sslServerConfigs, !useInlinePem, true); + TestSslUtils.convertToPem(sslClientConfigs, !useInlinePem, false); + sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); + server = createEchoServer(SecurityProtocol.SSL); + if (useInlinePem) + verifySslConfigs(); + else + assertThrows(KafkaException.class, () -> createSelector(sslClientConfigs)); + } + + /** + * Test with PEM key store files without key password for server key store.We don't allow this + * with PEM files since unprotected private key on disk is not safe. We do allow with inline + * PEM config since key config can be encrypted or externalized similar to other password configs. + */ + @Test + public void testPemFilesWithoutServerKeyPassword() throws Exception { + TestSslUtils.convertToPem(sslServerConfigs, !useInlinePem, false); + TestSslUtils.convertToPem(sslClientConfigs, !useInlinePem, true); + + if (useInlinePem) + verifySslConfigs(); + else + assertThrows(KafkaException.class, () -> createEchoServer(SecurityProtocol.SSL)); } /** @@ -525,12 +554,7 @@ public void testNullTruststorePassword() throws Exception { sslClientConfigs.remove(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG); sslServerConfigs.remove(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG); - server = createEchoServer(SecurityProtocol.SSL); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + verifySslConfigs(); } /** @@ -541,20 +565,19 @@ public void testNullTruststorePassword() throws Exception { public void testInvalidKeyPassword() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new Password("invalid")); - server = createEchoServer(SecurityProtocol.SSL); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); - server.verifyAuthenticationMetrics(0, 1); + if (useInlinePem) { + // We fail fast for PEM + assertThrows(InvalidConfigurationException.class, () -> createEchoServer(SecurityProtocol.SSL)); + return; + } + verifySslConfigsWithHandshakeFailure(); } /** * Tests that connection success with the default TLS version. */ @Test - public void testTLSDefaults() throws Exception { + public void testTlsDefaults() throws Exception { sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); @@ -1022,7 +1045,7 @@ public void testServerKeystoreDynamicUpdate() throws Exception { oldClientSelector.connect(oldNode, addr, BUFFER_SIZE, BUFFER_SIZE); NetworkTestUtils.checkClientConnection(selector, oldNode, 100, 10); - CertStores newServerCertStores = new CertStores(true, "server", "localhost"); + CertStores newServerCertStores = certBuilder(true, "server").addHostName("localhost").build(); Map newKeystoreConfigs = newServerCertStores.keyStoreProps(); assertTrue("SslChannelBuilder not reconfigurable", serverChannelBuilder instanceof ListenerReconfigurable); ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable) serverChannelBuilder; @@ -1043,7 +1066,7 @@ public void testServerKeystoreDynamicUpdate() throws Exception { // Verify that old client continues to work NetworkTestUtils.checkClientConnection(oldClientSelector, oldNode, 100, 10); - CertStores invalidCertStores = new CertStores(true, "server", "127.0.0.1"); + CertStores invalidCertStores = certBuilder(true, "server").addHostName("127.0.0.1").build(); Map invalidConfigs = getTrustingConfig(invalidCertStores, clientCertStores); verifyInvalidReconfigure(reconfigurableBuilder, invalidConfigs, "keystore with different SubjectAltName"); @@ -1078,8 +1101,16 @@ public void testServerKeystoreDynamicUpdateWithNewSubjectAltName() throws Except selector.close(); TestSslUtils.CertificateBuilder certBuilder = new TestSslUtils.CertificateBuilder().sanDnsNames("localhost", "*.example.com"); - File truststoreFile = new File((String) sslClientConfigs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); - Map newConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, truststoreFile, "server", "server", certBuilder); + String truststorePath = (String) sslClientConfigs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG); + File truststoreFile = truststorePath != null ? new File(truststorePath) : null; + TestSslUtils.SslConfigsBuilder builder = new TestSslUtils.SslConfigsBuilder(Mode.SERVER) + .useClientCert(false) + .certAlias("server") + .cn("server") + .certBuilder(certBuilder) + .createNewTrustStore(truststoreFile) + .usePem(useInlinePem); + Map newConfigs = builder.build(); Map newKeystoreConfigs = new HashMap<>(); for (String propName : CertStores.KEYSTORE_PROPS) { newKeystoreConfigs.put(propName, newConfigs.get(propName)); @@ -1097,7 +1128,9 @@ public void testServerKeystoreDynamicUpdateWithNewSubjectAltName() throws Except NetworkTestUtils.checkClientConnection(selector, node2, 100, 10); TestSslUtils.CertificateBuilder invalidBuilder = new TestSslUtils.CertificateBuilder().sanDnsNames("localhost"); - Map invalidConfig = TestSslUtils.createSslConfig(false, false, Mode.SERVER, truststoreFile, "server", "server", invalidBuilder); + if (!useInlinePem) + builder.useExistingTrustStore(truststoreFile); + Map invalidConfig = builder.certBuilder(invalidBuilder).build(); Map invalidKeystoreConfigs = new HashMap<>(); for (String propName : CertStores.KEYSTORE_PROPS) { invalidKeystoreConfigs.put(propName, invalidConfig.get(propName)); @@ -1131,7 +1164,7 @@ public void testServerTruststoreDynamicUpdate() throws Exception { oldClientSelector.connect(oldNode, addr, BUFFER_SIZE, BUFFER_SIZE); NetworkTestUtils.checkClientConnection(selector, oldNode, 100, 10); - CertStores newClientCertStores = new CertStores(true, "client", "localhost"); + CertStores newClientCertStores = certBuilder(true, "client").addHostName("localhost").build(); sslClientConfigs = getTrustingConfig(newClientCertStores, serverCertStores); Map newTruststoreConfigs = newClientCertStores.trustStoreProps(); assertTrue("SslChannelBuilder not reconfigurable", serverChannelBuilder instanceof ListenerReconfigurable); @@ -1174,13 +1207,7 @@ public void testServerTruststoreDynamicUpdate() throws Exception { public void testCustomClientSslEngineFactory() throws Exception { String node = "0"; sslClientConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class); - - server = createEchoServer(SecurityProtocol.SSL); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + verifySslConfigs(); } /** @@ -1190,13 +1217,7 @@ public void testCustomClientSslEngineFactory() throws Exception { public void testCustomServerSslEngineFactory() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class); - - server = createEchoServer(SecurityProtocol.SSL); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + verifySslConfigs(); } /** @@ -1207,13 +1228,7 @@ public void testCustomClientAndServerSslEngineFactory() throws Exception { String node = "0"; sslClientConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class); sslServerConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class); - - server = createEchoServer(SecurityProtocol.SSL); - createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - - NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + verifySslConfigs(); } /** @@ -1268,6 +1283,25 @@ private Map getTrustingConfig(CertStores certStores, CertStores return configs; } + private void verifySslConfigs() throws Exception { + server = createEchoServer(SecurityProtocol.SSL); + createSelector(sslClientConfigs); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); + String node = "0"; + selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); + NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + } + + public void verifySslConfigsWithHandshakeFailure() throws Exception { + server = createEchoServer(SecurityProtocol.SSL); + createSelector(sslClientConfigs); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); + String node = "0"; + selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); + server.verifyAuthenticationMetrics(0, 1); + } + @FunctionalInterface private interface FailureAction { FailureAction NO_OP = () -> { }; diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java new file mode 100644 index 0000000000000..c8c0566015de2 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.ssl; + +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.security.KeyStore; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; + +public class DefaultSslEngineFactoryTest { + + /* + * Key and certificates were extracted using openssl from a key store file created with 100 years validity using: + * + * openssl pkcs12 -in server.keystore.p12 -nodes -nocerts -out test.key.pem -passin pass:key-password + * openssl pkcs12 -in server.keystore.p12 -nodes -nokeys -out test.certchain.pem -passin pass:key-password + * openssl pkcs12 -in server.keystore.p12 -nodes -out test.keystore.pem -passin pass:key-password + * openssl pkcs8 -topk8 -v1 pbeWithSHA1And3-KeyTripleDES-CBC -in test.key.pem -out test.key.encrypted.pem -passout pass:key-password + */ + + private static final String CA1 = "-----BEGIN CERTIFICATE-----\n" + + "MIIC0zCCAbugAwIBAgIEStdXHTANBgkqhkiG9w0BAQsFADASMRAwDgYDVQQDEwdU\n" + + "ZXN0Q0ExMCAXDTIwMDkyODA5MDI0MFoYDzIxMjAwOTA0MDkwMjQwWjASMRAwDgYD\n" + + "VQQDEwdUZXN0Q0ExMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAo3Gr\n" + + "WJAkjnvgcuIfjArDhNdtAlRTt094WMUXhYDibgGtd+CLcWqA+c4PEoK4oybnKZqU\n" + + "6MlDfPgesIK2YiNBuSVWMtZ2doageOBnd80Iwbg8DqGtQpUsvw8X5fOmuza+4inv\n" + + "/8IpiTizq8YjSMT4nYDmIjyyRCSNY4atjgMnskutJ0v6i69+ZAA520Y6nn2n4RD5\n" + + "8Yc+y7yCkbZXnYS5xBOFEExmtc0Xa7S9nM157xqKws9Z+rTKZYLrryaHI9JNcXgG\n" + + "kzQEH9fBePASeWfi9AGRvAyS2GMSIBOsihIDIha/mqQcJOGCEqTMtefIj2FaErO2\n" + + "bL9yU7OpW53iIC8y0QIDAQABoy8wLTAMBgNVHRMEBTADAQH/MB0GA1UdDgQWBBRf\n" + + "svKcoQ9ZBvjwyUSV2uMFzlkOWDANBgkqhkiG9w0BAQsFAAOCAQEAEE1ZG2MGE248\n" + + "glO83ROrHbxmnVWSQHt/JZANR1i362sY1ekL83wlhkriuvGVBlHQYWezIfo/4l9y\n" + + "JTHNX3Mrs9eWUkaDXADkHWj3AyLXN3nfeU307x1wA7OvI4YKpwvfb4aYS8RTPz9d\n" + + "JtrfR0r8aGTgsXvCe4SgwDBKv7bckctOwD3S7D/b6y3w7X0s7JCU5+8ZjgoYfcLE\n" + + "gNqQEaOwdT2LHCvxHmGn/2VGs/yatPQIYYuufe5i8yX7pp4Xbd2eD6LULYkHFs3x\n" + + "uJzMRI7BukmIIWuBbAkYI0atxLQIysnVFXdL9pBgvgso2nA3FgP/XeORhkyHVvtL\n" + + "REH2YTlftQ==\n" + + "-----END CERTIFICATE-----"; + + private static final String CA2 = "-----BEGIN CERTIFICATE-----\n" + + "MIIC0zCCAbugAwIBAgIEfk9e9DANBgkqhkiG9w0BAQsFADASMRAwDgYDVQQDEwdU\n" + + "ZXN0Q0EyMCAXDTIwMDkyODA5MDI0MVoYDzIxMjAwOTA0MDkwMjQxWjASMRAwDgYD\n" + + "VQQDEwdUZXN0Q0EyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvCh0\n" + + "UO5op9eHfz7mvZ7IySK7AOCTC56QYFJcU+hD6yk1wKg2qot7naI5ozAc8n7c4pMt\n" + + "LjI3D0VtC/oHC29R2HNMSWyHcxIXw8z127XeCLRkCqYWuVAl3nBuWfWVPObjKetH\n" + + "TWlQANYWAfk1VbS6wfzgp9cMaK7wQ+VoGEo4x3pjlrdlyg4k4O2yubcpWmJ2TjxS\n" + + "gg7TfKGizUVAvF9wUG9Q4AlCg4uuww5RN9w6vnzDKGhWJhkQ6pf/m1xB+WueFOeU\n" + + "aASGhGqCTqiz3p3M3M4OZzG3KptjQ/yb67x4T5U5RxqoiN4L57E7ZJLREpa6ZZNs\n" + + "ps/gQ8dR9Uo/PRyAkQIDAQABoy8wLTAMBgNVHRMEBTADAQH/MB0GA1UdDgQWBBRg\n" + + "IAOVH5LeE6nZmdScEE3JO/AhvTANBgkqhkiG9w0BAQsFAAOCAQEAHkk1iybwy/Lf\n" + + "iEQMVRy7XfuC008O7jfCUBMgUvE+oO2RadH5MmsXHG3YerdsDM90dui4JqQNZOUh\n" + + "kF8dIWPQHE0xDsR9jiUsemZFpVMN7DcvVZ3eFhbvJA8Q50rxcNGA+tn9xT/xdQ6z\n" + + "1eRq9IPoYcRexQ7s9mincM4T4lLm8GGcd7ZPHy8kw0Bp3E/enRHWaF5b8KbXezXD\n" + + "I3SEYUyRL2K3px4FImT4X9XQm2EX6EONlu4GRcJpD6RPc0zC7c9dwEnSo+0NnewR\n" + + "gjgO34CLzShB/kASLS9VQXcUC6bsggAVK2rWQMmy35SOEUufSuvg8kUFoyuTzfhn\n" + + "hL+PVwIu7g==\n" + + "-----END CERTIFICATE-----"; + + private static final String CERTCHAIN = "Bag Attributes\n" + + " friendlyName: server\n" + + " localKeyID: 54 69 6D 65 20 31 36 30 31 32 38 33 37 36 35 34 32 33 \n" + + "subject=/CN=TestBroker\n" + + "issuer=/CN=TestCA1\n" + + "-----BEGIN CERTIFICATE-----\n" + + "MIIC/zCCAeegAwIBAgIEatBnEzANBgkqhkiG9w0BAQsFADASMRAwDgYDVQQDEwdU\n" + + "ZXN0Q0ExMCAXDTIwMDkyODA5MDI0NFoYDzIxMjAwOTA0MDkwMjQ0WjAVMRMwEQYD\n" + + "VQQDEwpUZXN0QnJva2VyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA\n" + + "pkw1AS71ej/iOMvzVgVL1dkQOYzI842NcPmx0yFFsue2umL8WVd3085NgWRb3SS1\n" + + "4X676t7zxjPGzYi7jwmA8stCrDt0NAPWd/Ko6ErsCs87CUs4u1Cinf+b3o9NF5u0\n" + + "UPYBQLF4Ir8T1jQ+tKiqsChGDt6urRAg1Cro5i7r10jN1uofY2tBs+r8mALhJ17c\n" + + "T5LKawXeYwNOQ86c5djClbcP0RrfcPyRyj1/Cp1axo28iO0fXFyO2Zf3a4vtt+Ih\n" + + "PW+A2tL+t3JTBd8g7Fl3ozzpcotAi7MDcZaYA9GiTP4DOiKUeDt6yMYQQr3VEqGa\n" + + "pXp4fKY+t9slqnAmcBZ4kQIDAQABo1gwVjAfBgNVHSMEGDAWgBRfsvKcoQ9ZBvjw\n" + + "yUSV2uMFzlkOWDAUBgNVHREEDTALgglsb2NhbGhvc3QwHQYDVR0OBBYEFGWt+27P\n" + + "INk/S5X+PRV/jW3WOhtaMA0GCSqGSIb3DQEBCwUAA4IBAQCLHCjFFvqa+0GcG9eq\n" + + "v1QWaXDohY5t5CCwD8Z+lT9wcSruTxDPwL7LrR36h++D6xJYfiw4iaRighoA40xP\n" + + "W6+0zGK/UtWV4t+ODTDzyAWgls5w+0R5ki6447qGqu5tXlW5DCHkkxWiozMnhNU2\n" + + "G3P/Drh7DhmADDBjtVLsu5M1sagF/xwTP/qCLMdChlJNdeqyLnAUa9SYG1eNZS/i\n" + + "wrCC8m9RUQb4+OlQuFtr0KhaaCkBXfmhigQAmh44zSyO+oa3qQDEavVFo/Mcui9o\n" + + "WBYetcgVbXPNoti+hQEMqmJYBHlLbhxMnkooGn2fa70f453Bdu/Xh6Yphi5NeCHn\n" + + "1I+y\n" + + "-----END CERTIFICATE-----\n" + + "Bag Attributes\n" + + " friendlyName: CN=TestCA1\n" + + "subject=/CN=TestCA1\n" + + "issuer=/CN=TestCA1\n" + + "-----BEGIN CERTIFICATE-----\n" + + "MIIC0zCCAbugAwIBAgIEStdXHTANBgkqhkiG9w0BAQsFADASMRAwDgYDVQQDEwdU\n" + + "ZXN0Q0ExMCAXDTIwMDkyODA5MDI0MFoYDzIxMjAwOTA0MDkwMjQwWjASMRAwDgYD\n" + + "VQQDEwdUZXN0Q0ExMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAo3Gr\n" + + "WJAkjnvgcuIfjArDhNdtAlRTt094WMUXhYDibgGtd+CLcWqA+c4PEoK4oybnKZqU\n" + + "6MlDfPgesIK2YiNBuSVWMtZ2doageOBnd80Iwbg8DqGtQpUsvw8X5fOmuza+4inv\n" + + "/8IpiTizq8YjSMT4nYDmIjyyRCSNY4atjgMnskutJ0v6i69+ZAA520Y6nn2n4RD5\n" + + "8Yc+y7yCkbZXnYS5xBOFEExmtc0Xa7S9nM157xqKws9Z+rTKZYLrryaHI9JNcXgG\n" + + "kzQEH9fBePASeWfi9AGRvAyS2GMSIBOsihIDIha/mqQcJOGCEqTMtefIj2FaErO2\n" + + "bL9yU7OpW53iIC8y0QIDAQABoy8wLTAMBgNVHRMEBTADAQH/MB0GA1UdDgQWBBRf\n" + + "svKcoQ9ZBvjwyUSV2uMFzlkOWDANBgkqhkiG9w0BAQsFAAOCAQEAEE1ZG2MGE248\n" + + "glO83ROrHbxmnVWSQHt/JZANR1i362sY1ekL83wlhkriuvGVBlHQYWezIfo/4l9y\n" + + "JTHNX3Mrs9eWUkaDXADkHWj3AyLXN3nfeU307x1wA7OvI4YKpwvfb4aYS8RTPz9d\n" + + "JtrfR0r8aGTgsXvCe4SgwDBKv7bckctOwD3S7D/b6y3w7X0s7JCU5+8ZjgoYfcLE\n" + + "gNqQEaOwdT2LHCvxHmGn/2VGs/yatPQIYYuufe5i8yX7pp4Xbd2eD6LULYkHFs3x\n" + + "uJzMRI7BukmIIWuBbAkYI0atxLQIysnVFXdL9pBgvgso2nA3FgP/XeORhkyHVvtL\n" + + "REH2YTlftQ==\n" + + "-----END CERTIFICATE-----"; + + private static final String KEY = "Bag Attributes\n" + + " friendlyName: server\n" + + " localKeyID: 54 69 6D 65 20 31 36 30 31 32 38 33 37 36 35 34 32 33\n" + + "Key Attributes: \n" + + "-----BEGIN PRIVATE KEY-----\n" + + "MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCmTDUBLvV6P+I4\n" + + "y/NWBUvV2RA5jMjzjY1w+bHTIUWy57a6YvxZV3fTzk2BZFvdJLXhfrvq3vPGM8bN\n" + + "iLuPCYDyy0KsO3Q0A9Z38qjoSuwKzzsJSzi7UKKd/5vej00Xm7RQ9gFAsXgivxPW\n" + + "ND60qKqwKEYO3q6tECDUKujmLuvXSM3W6h9ja0Gz6vyYAuEnXtxPksprBd5jA05D\n" + + "zpzl2MKVtw/RGt9w/JHKPX8KnVrGjbyI7R9cXI7Zl/dri+234iE9b4Da0v63clMF\n" + + "3yDsWXejPOlyi0CLswNxlpgD0aJM/gM6IpR4O3rIxhBCvdUSoZqlenh8pj632yWq\n" + + "cCZwFniRAgMBAAECggEAOfC/XwQvf0KW3VciF0yNGZshbgvBUCp3p284J+ml0Smu\n" + + "ns4yQiaZl3B/zJ9c6nYJ8OEpNDIuGVac46vKPZIAHZf4SO4GFMFpji078IN6LmH5\n" + + "nclZoNn9brNKaYbgQ2N6teKgmRu8Uc7laHKXjnZd0jaWAkRP8/h0l7fDob+jaERj\n" + + "oJBx4ux2Z62TTCP6W4VY3KZgSL1p6dQswqlukPVytMeI2XEwWnO+w8ED0BxCxM4F\n" + + "K//dw7nUMGS9GUNkgyDcH1akYSCDzdBeymQBp2latBotVfGNK1hq9nC1iaxmRkJL\n" + + "sYjwVc24n37u+txOovy3daq2ySj9trF7ySAPVYkh4QKBgQDWeN/MR6cy1TLF2j3g\n" + + "eMMeM32LxXArIPsar+mft+uisKWk5LDpsKpph93sl0JjFi4x0t1mqw23h23I+B2c\n" + + "JWiPAHUG3FGvvkPPcfMUvd7pODyE2XaXi+36UZAH7qc94VZGJEb+sPITckSruREE\n" + + "QErWZyrbBRgvQXsmVme5B2/kRQKBgQDGf2HQH0KHl54O2r9vrhiQxWIIMSWlizJC\n" + + "hjboY6DkIsAMwnXp3wn3Bk4tSgeLk8DEVlmEaE3gvGpiIp0vQnSOlME2TXfEthdM\n" + + "uS3+BFXN4Vxxx/qjKL2WfZloyzdaaaF7s+LIwmXgLsFFCUSq+uLtBqfpH2Qv+paX\n" + + "Xqm7LN3V3QKBgH5ssj/Q3RZx5oQKqf7wMNRUteT2dbB2uI56s9SariQwzPPuevrG\n" + + "US30ETWt1ExkfsaP7kLfAi71fhnBaHLq+j+RnWp15REbrw1RtmC7q/L+W25UYjvj\n" + + "GF0+RxDl9V/cvOaL6+2mkIw2B5TSet1uqK7KEdEZp6/zgYyP0oSXhbWhAoGAdnlZ\n" + + "HCtMPjnUcPFHCZVTvDTTSihrW9805FfPNe0g/olvLy5xymEBRZtR1d41mq1ZhNY1\n" + + "H75RnS1YIbKfNrHnd6J5n7ulHJfCWFy+grp7rCIyVwcRJYkPf17/zXhdVW1uoLLB\n" + + "TSoaPDAr0tSxU4vjHa23UoEV/z0F3Nr3W2xwC1ECgYBHKjv6ekLhx7HbP797+Ai+\n" + + "wkHvS2L/MqEBxuHzcQ9G6Mj3ANAeyDB8YSC8qGtDQoEyukv2dO73lpodNgbR8P+Q\n" + + "PDBb6eyntAo2sSeo0jZkiXvDOfRaGuGVrxjuTfaqcVB33jC6BYfi61/3Sr5oG9Nd\n" + + "tDGh1HlOIRm1jD9KQNVZ/Q==\n" + + "-----END PRIVATE KEY-----"; + + private static final String ENCRYPTED_KEY = "-----BEGIN ENCRYPTED PRIVATE KEY-----\n" + + "MIIE6jAcBgoqhkiG9w0BDAEDMA4ECGyAEWAXlaXzAgIIAASCBMgt7QD1Bbz7MAHI\n" + + "Ni0eTrwNiuAPluHirLXzsV57d1O9i4EXVp5nzRy6753cjXbGXARbBeaJD+/+jbZp\n" + + "CBZTHMG8rTCfbsg5kMqxT6XuuqWlKLKc4gaq+QNgHHleKqnpwZQmOQ+awKWEK/Ow\n" + + "Z0KxXqkp+b4/qJK3MqKZDsJtVdyUhO0tLVxd+BHDg9B93oExc87F16h3R0+T4rxE\n" + + "Tvz2c2upBqva49AbLDxpWXLCJC8CRkxM+KHrPkYjpNx3jCjtwiiXfzJCWjuCkVrL\n" + + "2F4bqvpYPIseoPtMvWaplNtoPwhpzBB/hoJ+R+URr4XHX3Y+bz6k6iQnhoCOIviy\n" + + "oEEUvWtKnaEEKSauR+Wyj3MoeB64g9NWMEHv7+SQeA4WqlgV2s4txwRxFGKyKLPq\n" + + "caMSpfxvYujtSh0DOv9GI3cVHPM8WsebCz9cNrbKSR8/8JufcoonTitwF/4vm1Et\n" + + "AdmCuH9JIYVvmFKFVxY9SvRAvo43OQaPmJQHMUa4yDfMtpTSgmB/7HFgxtksYs++\n" + + "Gbrq6F/hon+0bLx+bMz2FK635UU+iVno+qaScKWN3BFqDl+KnZprBhLSXTT3aHmp\n" + + "fisQit/HWp71a0Vzq85WwI4ucMKNc8LemlwNBxWLLiJDp7sNPLb5dIl8yIwSEIgd\n" + + "vC5px9KWEdt3GxTUEqtIeBmagbBhahcv+c9Dq924DLI+Slv6TJKZpIcMqUECgzvi\n" + + "hb8gegyEscBEcDSzl0ojlFVz4Va5eZS/linTjNJhnkx8BKLn/QFco7FpEE6uOmQ3\n" + + "0kF64M2Rv67cJbYVrhD46TgIzH3Y/FOMSi1zFHQ14nVXWMu0yAlBX+QGk7Xl+/aF\n" + + "BIq+i9WcBqbttR3CwyeTnIFXkdC66iTZYhDl9HT6yMcazql2Or2TjIIWr6tfNWH/\n" + + "5dWSEHYM5m8F2/wF0ANWJyR1oPr4ckcUsfl5TfOWVj5wz4QVF6EGV7FxEnQHrdx0\n" + + "6rXThRKFjqxUubsNt1yUEwdlTNz2UFhobGF9MmFeB97BZ6T4v8G825de/Caq9FzO\n" + + "yMFFCRcGC7gIzMXRPEjHIvBdTThm9rbNzKPXHqw0LHG478yIqzxvraCYTRw/4eWN\n" + + "Q+hyOL/5T5QNXHpR8Udp/7sptw7HfRnecQ/Vz9hOKShQq3h4Sz6eQMQm7P9qGo/N\n" + + "bltEAIECRVcNYLN8LuEORfeecNcV3BX+4BBniFtdD2bIRsWC0ZUsGf14Yhr4P1OA\n" + + "PtMJzy99mrcq3h+o+hEW6bhIj1gA88JSMJ4iRuwTLRKE81w7EyziScDsotYKvDPu\n" + + "w4+PFbQO3fr/Zga3LgYis8/DMqZoWjVCjAeVoypuOZreieZYC/BgBS8qSUAmDPKq\n" + + "jK+T5pwMMchfXbkV80LTu1kqLfKWdE0AmZfGy8COE/NNZ/FeiWZPdwu2Ix6u/RoY\n" + + "LTjNy4YLIBdVELFXaFJF2GfzLpnwrW5tyNPVVrGmUoiyOzgx8gMyCLGavGtduyoY\n" + + "tBiUTmd05Ugscn4Rz9X30S4NbnjL/h+bWl1m6/M+9FHEe85FPxmt/GRmJPbFPMR5\n" + + "q5EgQGkt4ifiaP6qvyFulwvVwx+m0bf1q6Vb/k3clIyLMcVZWFE1TqNH2Ife46AE\n" + + "2I39ZnGTt0mbWskpHBA=\n" + + "-----END ENCRYPTED PRIVATE KEY-----"; + + private static final Password KEY_PASSWORD = new Password("key-password"); + + private DefaultSslEngineFactory factory = new DefaultSslEngineFactory(); + Map configs = new HashMap<>(); + + @Before + public void setUp() { + factory = new DefaultSslEngineFactory(); + configs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + } + + @Test + public void testPemTrustStoreConfigWithOneCert() throws Exception { + configs.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, pemAsConfigValue(CA1)); + configs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE); + factory.configure(configs); + + KeyStore trustStore = factory.truststore(); + List aliases = Collections.list(trustStore.aliases()); + assertEquals(Collections.singletonList("kafka0"), aliases); + assertNotNull("Certificate not loaded", trustStore.getCertificate("kafka0")); + assertNull("Unexpected private key", trustStore.getKey("kafka0", null)); + } + + @Test + public void testPemTrustStoreConfigWithMultipleCerts() throws Exception { + configs.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, pemAsConfigValue(CA1, CA2)); + configs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE); + factory.configure(configs); + + KeyStore trustStore = factory.truststore(); + List aliases = Collections.list(trustStore.aliases()); + assertEquals(Arrays.asList("kafka0", "kafka1"), aliases); + assertNotNull("Certificate not loaded", trustStore.getCertificate("kafka0")); + assertNull("Unexpected private key", trustStore.getKey("kafka0", null)); + assertNotNull("Certificate not loaded", trustStore.getCertificate("kafka1")); + assertNull("Unexpected private key", trustStore.getKey("kafka1", null)); + } + + @Test + public void testPemKeyStoreConfigNoPassword() throws Exception { + verifyPemKeyStoreConfig(KEY, null); + } + + @Test + public void testPemKeyStoreConfigWithKeyPassword() throws Exception { + verifyPemKeyStoreConfig(ENCRYPTED_KEY, KEY_PASSWORD); + } + + @Test + public void testTrailingNewLines() throws Exception { + verifyPemKeyStoreConfig(ENCRYPTED_KEY + "\n\n", KEY_PASSWORD); + } + + @Test + public void testLeadingNewLines() throws Exception { + verifyPemKeyStoreConfig("\n\n" + ENCRYPTED_KEY, KEY_PASSWORD); + } + + @Test + public void testCarriageReturnLineFeed() throws Exception { + verifyPemKeyStoreConfig(ENCRYPTED_KEY.replaceAll("\n", "\r\n"), KEY_PASSWORD); + } + + private void verifyPemKeyStoreConfig(String keyFileName, Password keyPassword) throws Exception { + configs.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, pemAsConfigValue(keyFileName)); + configs.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, pemAsConfigValue(CERTCHAIN)); + configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword); + configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE); + factory.configure(configs); + + KeyStore keyStore = factory.keystore(); + List aliases = Collections.list(keyStore.aliases()); + assertEquals(Collections.singletonList("kafka"), aliases); + assertNotNull("Certificate not loaded", keyStore.getCertificate("kafka")); + assertNotNull("Private key not loaded", + keyStore.getKey("kafka", keyPassword == null ? null : keyPassword.value().toCharArray())); + } + + @Test + public void testPemTrustStoreFile() throws Exception { + configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, pemFilePath(CA1)); + configs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE); + factory.configure(configs); + + KeyStore trustStore = factory.truststore(); + List aliases = Collections.list(trustStore.aliases()); + assertEquals(Collections.singletonList("kafka0"), aliases); + assertNotNull("Certificate not found", trustStore.getCertificate("kafka0")); + assertNull("Unexpected private key", trustStore.getKey("kafka0", null)); + } + + @Test + public void testPemKeyStoreFileNoKeyPassword() throws Exception { + configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + pemFilePath(pemAsConfigValue(KEY, CERTCHAIN).value())); + configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE); + assertThrows(InvalidConfigurationException.class, () -> factory.configure(configs)); + } + + @Test + public void testPemKeyStoreFileWithKeyPassword() throws Exception { + configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + pemFilePath(pemAsConfigValue(ENCRYPTED_KEY, CERTCHAIN).value())); + configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD); + configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE); + factory.configure(configs); + + KeyStore keyStore = factory.keystore(); + List aliases = Collections.list(keyStore.aliases()); + assertEquals(Collections.singletonList("kafka"), aliases); + assertNotNull("Certificate not found", keyStore.getCertificate("kafka")); + assertNotNull("Private key not found", keyStore.getKey("kafka", KEY_PASSWORD.value().toCharArray())); + } + + private String pemFilePath(String pem) throws Exception { + return TestUtils.tempFile(pem).getAbsolutePath(); + } + + private Password pemAsConfigValue(String... pemValues) throws Exception { + StringBuilder builder = new StringBuilder(); + for (String pem : pemValues) { + builder.append(pem); + builder.append("\n"); + } + return new Password(builder.toString().trim()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java index 269c7a201718f..2f09087e5959d 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java @@ -36,6 +36,9 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.security.auth.SslEngineFactory; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.FileBasedStore; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.PemStore; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.SecurityStore; import org.apache.kafka.common.security.ssl.mock.TestKeyManagerFactory; import org.apache.kafka.common.security.ssl.mock.TestProviderCreator; import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory; @@ -313,6 +316,40 @@ public void testReconfigurationWithoutKeystore() throws Exception { } } + @Test + public void testPemReconfiguration() throws Exception { + Map sslConfig = sslConfigsBuilder(Mode.SERVER) + .createNewTrustStore(null) + .usePem(true) + .build(); + SslFactory sslFactory = new SslFactory(Mode.SERVER); + sslFactory.configure(sslConfig); + SslEngineFactory sslEngineFactory = sslFactory.sslEngineFactory(); + assertNotNull("SslEngineFactory not created", sslEngineFactory); + + sslConfig.put("some.config", "some.value"); + sslFactory.reconfigure(sslConfig); + assertSame("SslEngineFactory recreated unnecessarily", sslEngineFactory, sslFactory.sslEngineFactory()); + + sslConfig.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, + new Password(((Password) sslConfig.get(SslConfigs.SSL_KEYSTORE_KEY_CONFIG)).value() + " ")); + sslFactory.reconfigure(sslConfig); + assertNotSame("SslEngineFactory not recreated", sslEngineFactory, sslFactory.sslEngineFactory()); + sslEngineFactory = sslFactory.sslEngineFactory(); + + sslConfig.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, + new Password(((Password) sslConfig.get(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG)).value() + " ")); + sslFactory.reconfigure(sslConfig); + assertNotSame("SslEngineFactory not recreated", sslEngineFactory, sslFactory.sslEngineFactory()); + sslEngineFactory = sslFactory.sslEngineFactory(); + + sslConfig.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, + new Password(((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG)).value() + " ")); + sslFactory.reconfigure(sslConfig); + assertNotSame("SslEngineFactory not recreated", sslEngineFactory, sslFactory.sslEngineFactory()); + sslEngineFactory = sslFactory.sslEngineFactory(); + } + @Test public void testKeyStoreTrustStoreValidation() throws Exception { File trustStoreFile = File.createTempFile("truststore", ".jks"); @@ -351,16 +388,27 @@ public void testUntrustedKeyStoreValidationFails() throws Exception { @Test public void testKeystoreVerifiableUsingTruststore() throws Exception { - File trustStoreFile1 = File.createTempFile("truststore1", ".jks"); + verifyKeystoreVerifiableUsingTruststore(false); + } + + @Test + public void testPemKeystoreVerifiableUsingTruststore() throws Exception { + verifyKeystoreVerifiableUsingTruststore(true); + } + + private void verifyKeystoreVerifiableUsingTruststore(boolean usePem) throws Exception { + File trustStoreFile1 = usePem ? null : File.createTempFile("truststore1", ".jks"); Map sslConfig1 = sslConfigsBuilder(Mode.SERVER) .createNewTrustStore(trustStoreFile1) + .usePem(usePem) .build(); SslFactory sslFactory = new SslFactory(Mode.SERVER, null, true); sslFactory.configure(sslConfig1); - File trustStoreFile2 = File.createTempFile("truststore2", ".jks"); + File trustStoreFile2 = usePem ? null : File.createTempFile("truststore2", ".jks"); Map sslConfig2 = sslConfigsBuilder(Mode.SERVER) .createNewTrustStore(trustStoreFile2) + .usePem(usePem) .build(); // Verify that `createSSLContext` fails even if certificate from new keystore is trusted by // the new truststore, if certificate is not trusted by the existing truststore on the `SslFactory`. @@ -376,23 +424,35 @@ public void testKeystoreVerifiableUsingTruststore() throws Exception { @Test public void testCertificateEntriesValidation() throws Exception { - File trustStoreFile = File.createTempFile("truststore", ".jks"); + verifyCertificateEntriesValidation(false); + } + + @Test + public void testPemCertificateEntriesValidation() throws Exception { + verifyCertificateEntriesValidation(true); + } + + private void verifyCertificateEntriesValidation(boolean usePem) throws Exception { + File trustStoreFile = usePem ? null : File.createTempFile("truststore", ".jks"); Map serverSslConfig = sslConfigsBuilder(Mode.SERVER) .createNewTrustStore(trustStoreFile) + .usePem(usePem) .build(); + File newTrustStoreFile = usePem ? null : File.createTempFile("truststore", ".jks"); Map newCnConfig = sslConfigsBuilder(Mode.SERVER) - .createNewTrustStore(File.createTempFile("truststore", ".jks")) + .createNewTrustStore(newTrustStoreFile) .cn("Another CN") + .usePem(usePem) .build(); - KeyStore ks1 = sslKeyStore(serverSslConfig).get(); - KeyStore ks2 = sslKeyStore(serverSslConfig).get(); + KeyStore ks1 = sslKeyStore(serverSslConfig); + KeyStore ks2 = sslKeyStore(serverSslConfig); assertEquals(SslFactory.CertificateEntries.create(ks1), SslFactory.CertificateEntries.create(ks2)); // Use different alias name, validation should succeed ks2.setCertificateEntry("another", ks1.getCertificate("localhost")); assertEquals(SslFactory.CertificateEntries.create(ks1), SslFactory.CertificateEntries.create(ks2)); - KeyStore ks3 = sslKeyStore(newCnConfig).get(); + KeyStore ks3 = sslKeyStore(newCnConfig); assertNotEquals(SslFactory.CertificateEntries.create(ks1), SslFactory.CertificateEntries.create(ks3)); } @@ -461,13 +521,24 @@ public void testInvalidSslEngineFactory() throws Exception { sslFactory.configure(clientSslConfig); } - private DefaultSslEngineFactory.SecurityStore sslKeyStore(Map sslConfig) { - return new DefaultSslEngineFactory.SecurityStore( - (String) sslConfig.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), - (String) sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), - (Password) sslConfig.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), - (Password) sslConfig.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG) - ); + private KeyStore sslKeyStore(Map sslConfig) { + SecurityStore store; + if (sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG) != null) { + store = new FileBasedStore( + (String) sslConfig.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), + (String) sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), + (Password) sslConfig.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), + (Password) sslConfig.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), + true + ); + } else { + store = new PemStore( + (Password) sslConfig.get(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG), + (Password) sslConfig.get(SslConfigs.SSL_KEYSTORE_KEY_CONFIG), + (Password) sslConfig.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG) + ); + } + return store.get(); } private TestSslUtils.SslConfigsBuilder sslConfigsBuilder(Mode mode) { diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index 906bc6e77f10f..81282421bf7b0 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -17,34 +17,8 @@ package org.apache.kafka.test; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.network.Mode; - -import java.io.File; -import java.io.IOException; -import java.io.EOFException; -import java.io.InputStream; -import java.io.OutputStream; -import java.math.BigInteger; -import java.net.InetAddress; - -import javax.net.ssl.SSLEngine; -import javax.net.ssl.TrustManagerFactory; - -import java.nio.file.Files; -import java.nio.file.Paths; -import java.security.GeneralSecurityException; -import java.security.Key; -import java.security.KeyPair; -import java.security.KeyPairGenerator; -import java.security.KeyStore; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; -import java.security.Security; -import java.security.cert.Certificate; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; - import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.security.auth.SslEngineFactory; import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; import org.bouncycastle.asn1.DEROctetString; @@ -61,19 +35,61 @@ import org.bouncycastle.crypto.params.AsymmetricKeyParameter; import org.bouncycastle.crypto.util.PrivateKeyFactory; import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.openssl.PKCS8Generator; +import org.bouncycastle.openssl.jcajce.JcaMiscPEMGenerator; +import org.bouncycastle.openssl.jcajce.JcaPKCS8Generator; +import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8EncryptorBuilder; import org.bouncycastle.operator.ContentSigner; import org.bouncycastle.operator.DefaultDigestAlgorithmIdentifierFinder; import org.bouncycastle.operator.DefaultSignatureAlgorithmIdentifierFinder; +import org.bouncycastle.operator.bc.BcContentSignerBuilder; +import org.bouncycastle.operator.bc.BcDSAContentSignerBuilder; +import org.bouncycastle.operator.bc.BcECContentSignerBuilder; import org.bouncycastle.operator.bc.BcRSAContentSignerBuilder; +import org.bouncycastle.util.io.pem.PemWriter; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.math.BigInteger; +import java.net.InetAddress; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.GeneralSecurityException; +import java.security.Key; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.security.SecureRandom; +import java.security.Security; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; +import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Properties; import java.util.Set; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManagerFactory; + +import static org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.PEM_TYPE; + public class TestSslUtils { public static final String TRUST_STORE_PASSWORD = "TrustStorePassword"; @@ -98,7 +114,7 @@ public static X509Certificate generateCertificate(String dn, KeyPair pair, public static KeyPair generateKeyPair(String algorithm) throws NoSuchAlgorithmException { KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm); - keyGen.initialize(2048); + keyGen.initialize(algorithm.equals("EC") ? 256 : 2048); return keyGen.genKeyPair(); } @@ -115,15 +131,6 @@ private static void saveKeyStore(KeyStore ks, String filename, } } - public static void createKeyStore(String filename, - Password password, String alias, - Key privateKey, Certificate cert) throws GeneralSecurityException, IOException { - KeyStore ks = createEmptyKeyStore(); - ks.setKeyEntry(alias, privateKey, password.value().toCharArray(), - new Certificate[]{cert}); - saveKeyStore(ks, filename, password); - } - /** * Creates a keystore with a single key and saves it to a file. * @@ -199,6 +206,156 @@ public static Map createSslConfig(boolean useClientCert, boolea return builder.build(); } + public static void convertToPem(Map sslProps, boolean writeToFile, boolean encryptPrivateKey) throws Exception { + String tsPath = (String) sslProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG); + String tsType = (String) sslProps.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG); + Password tsPassword = (Password) sslProps.remove(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG); + Password trustCerts = (Password) sslProps.remove(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG); + if (trustCerts == null && tsPath != null) { + trustCerts = exportCertificates(tsPath, tsPassword, tsType); + } + if (trustCerts != null) { + if (tsPath == null) { + tsPath = File.createTempFile("truststore", ".pem").getPath(); + sslProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsPath); + } + sslProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, PEM_TYPE); + if (writeToFile) + writeToFile(tsPath, trustCerts); + else { + sslProps.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, trustCerts); + sslProps.remove(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG); + } + } + + String ksPath = (String) sslProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); + Password certChain = (Password) sslProps.remove(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG); + Password key = (Password) sslProps.remove(SslConfigs.SSL_KEYSTORE_KEY_CONFIG); + if (certChain == null && ksPath != null) { + String ksType = (String) sslProps.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG); + Password ksPassword = (Password) sslProps.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); + Password keyPassword = (Password) sslProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG); + certChain = exportCertificates(ksPath, ksPassword, ksType); + Password pemKeyPassword = encryptPrivateKey ? keyPassword : null; + key = exportPrivateKey(ksPath, ksPassword, keyPassword, ksType, pemKeyPassword); + if (!encryptPrivateKey) + sslProps.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); + } else if (!encryptPrivateKey) { + + } + + if (certChain != null) { + if (ksPath == null) { + ksPath = File.createTempFile("keystore", ".pem").getPath(); + sslProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ksPath); + } + sslProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, PEM_TYPE); + if (writeToFile) + writeToFile(ksPath, key, certChain); + else { + sslProps.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, key); + sslProps.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, certChain); + sslProps.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); + } + } + } + + private static void writeToFile(String path, Password... entries) throws IOException { + try (FileOutputStream out = new FileOutputStream(path)) { + for (Password entry: entries) { + out.write(entry.value().getBytes(StandardCharsets.UTF_8)); + } + } + } + + public static void convertToPemWithoutFiles(Properties sslProps) throws Exception { + String tsPath = sslProps.getProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG); + if (tsPath != null) { + Password trustCerts = exportCertificates(tsPath, + (Password) sslProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), + sslProps.getProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); + sslProps.remove(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG); + sslProps.remove(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG); + sslProps.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, PEM_TYPE); + sslProps.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, trustCerts); + } + String ksPath = sslProps.getProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); + if (ksPath != null) { + String ksType = sslProps.getProperty(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG); + Password ksPassword = (Password) sslProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); + Password keyPassword = (Password) sslProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG); + Password certChain = exportCertificates(ksPath, ksPassword, ksType); + Password key = exportPrivateKey(ksPath, ksPassword, keyPassword, ksType, keyPassword); + sslProps.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); + sslProps.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); + sslProps.setProperty(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, PEM_TYPE); + sslProps.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, certChain); + sslProps.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, key); + } + } + + public static Password exportCertificates(String storePath, Password storePassword, String storeType) throws Exception { + StringBuilder builder = new StringBuilder(); + try (FileInputStream in = new FileInputStream(storePath)) { + KeyStore ks = KeyStore.getInstance(storeType); + ks.load(in, storePassword.value().toCharArray()); + Enumeration aliases = ks.aliases(); + if (!aliases.hasMoreElements()) + throw new IllegalArgumentException("No certificates found in file " + storePath); + while (aliases.hasMoreElements()) { + String alias = aliases.nextElement(); + Certificate[] certs = ks.getCertificateChain(alias); + if (certs != null) { + for (Certificate cert : certs) { + builder.append(pem(cert)); + } + } else { + builder.append(pem(ks.getCertificate(alias))); + } + } + } + return new Password(builder.toString()); + } + + public static Password exportPrivateKey(String storePath, + Password storePassword, + Password keyPassword, + String storeType, + Password pemKeyPassword) throws Exception { + try (FileInputStream in = new FileInputStream(storePath)) { + KeyStore ks = KeyStore.getInstance(storeType); + ks.load(in, storePassword.value().toCharArray()); + String alias = ks.aliases().nextElement(); + return new Password(pem((PrivateKey) ks.getKey(alias, keyPassword.value().toCharArray()), pemKeyPassword)); + } + } + + static String pem(Certificate cert) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (PemWriter pemWriter = new PemWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8.name()))) { + pemWriter.writeObject(new JcaMiscPEMGenerator(cert)); + } + return new String(out.toByteArray(), StandardCharsets.UTF_8); + } + + static String pem(PrivateKey privateKey, Password password) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (PemWriter pemWriter = new PemWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8.name()))) { + if (password == null) { + pemWriter.writeObject(new JcaPKCS8Generator(privateKey, null)); + } else { + JceOpenSSLPKCS8EncryptorBuilder encryptorBuilder = new JceOpenSSLPKCS8EncryptorBuilder(PKCS8Generator.PBE_SHA1_3DES); + encryptorBuilder.setPasssword(password.value().toCharArray()); + try { + pemWriter.writeObject(new JcaPKCS8Generator(privateKey, encryptorBuilder.build())); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + return new String(out.toByteArray(), StandardCharsets.UTF_8); + } + public static class CertificateBuilder { private final int days; private final String algorithm; @@ -233,7 +390,17 @@ public X509Certificate generate(String dn, KeyPair keyPair) throws CertificateEx AlgorithmIdentifier digAlgId = new DefaultDigestAlgorithmIdentifierFinder().find(sigAlgId); AsymmetricKeyParameter privateKeyAsymKeyParam = PrivateKeyFactory.createKey(keyPair.getPrivate().getEncoded()); SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded()); - ContentSigner sigGen = new BcRSAContentSignerBuilder(sigAlgId, digAlgId).build(privateKeyAsymKeyParam); + BcContentSignerBuilder signerBuilder; + String keyAlgorithm = keyPair.getPublic().getAlgorithm(); + if (keyAlgorithm.equals("RSA")) + signerBuilder = new BcRSAContentSignerBuilder(sigAlgId, digAlgId); + else if (keyAlgorithm.equals("DSA")) + signerBuilder = new BcDSAContentSignerBuilder(sigAlgId, digAlgId); + else if (keyAlgorithm.equals("EC")) + signerBuilder = new BcECContentSignerBuilder(sigAlgId, digAlgId); + else + throw new IllegalArgumentException("Unsupported algorithm " + keyAlgorithm); + ContentSigner sigGen = signerBuilder.build(privateKeyAsymKeyParam); X500Name name = new X500Name(dn); Date from = new Date(); Date to = new Date(from.getTime() + days * 86400000L); @@ -259,12 +426,13 @@ public static class SslConfigsBuilder { boolean createTrustStore; File trustStoreFile; Password trustStorePassword; - File keyStoreFile; Password keyStorePassword; Password keyPassword; String certAlias; String cn; + String algorithm; CertificateBuilder certBuilder; + boolean usePem; public SslConfigsBuilder(Mode mode) { this.mode = mode; @@ -275,6 +443,8 @@ public SslConfigsBuilder(Mode mode) { this.certBuilder = new CertificateBuilder(); this.cn = "localhost"; this.certAlias = mode.name().toLowerCase(Locale.ROOT); + this.algorithm = "RSA"; + this.createTrustStore = true; } public SslConfigsBuilder tlsProtocol(String tlsProtocol) { @@ -294,11 +464,6 @@ public SslConfigsBuilder useExistingTrustStore(File trustStoreFile) { return this; } - public SslConfigsBuilder createNewKeyStore(File keyStoreFile) { - this.keyStoreFile = keyStoreFile; - return this; - } - public SslConfigsBuilder useClientCert(boolean useClientCert) { this.useClientCert = useClientCert; return this; @@ -314,27 +479,43 @@ public SslConfigsBuilder cn(String cn) { return this; } + public SslConfigsBuilder algorithm(String algorithm) { + this.algorithm = algorithm; + return this; + } + public SslConfigsBuilder certBuilder(CertificateBuilder certBuilder) { this.certBuilder = certBuilder; return this; } + public SslConfigsBuilder usePem(boolean usePem) { + this.usePem = usePem; + return this; + } + public Map build() throws IOException, GeneralSecurityException { + if (usePem) { + return buildPem(); + } else + return buildJks(); + } + + private Map buildJks() throws IOException, GeneralSecurityException { Map certs = new HashMap<>(); File keyStoreFile = null; if (mode == Mode.CLIENT && useClientCert) { keyStoreFile = File.createTempFile("clientKS", ".jks"); - KeyPair cKP = generateKeyPair("RSA"); + KeyPair cKP = generateKeyPair(algorithm); X509Certificate cCert = certBuilder.generate("CN=" + cn + ", O=A client", cKP); - createKeyStore(keyStoreFile.getPath(), keyStorePassword, "client", cKP.getPrivate(), cCert); + createKeyStore(keyStoreFile.getPath(), keyStorePassword, keyPassword, "client", cKP.getPrivate(), cCert); certs.put(certAlias, cCert); - keyStoreFile.deleteOnExit(); } else if (mode == Mode.SERVER) { keyStoreFile = File.createTempFile("serverKS", ".jks"); - KeyPair sKP = generateKeyPair("RSA"); + KeyPair sKP = generateKeyPair(algorithm); X509Certificate sCert = certBuilder.generate("CN=" + cn + ", O=A server", sKP); - createKeyStore(keyStoreFile.getPath(), keyStorePassword, keyStorePassword, "server", sKP.getPrivate(), sCert); + createKeyStore(keyStoreFile.getPath(), keyStorePassword, keyPassword, "server", sKP.getPrivate(), sCert); certs.put(certAlias, sCert); keyStoreFile.deleteOnExit(); } @@ -367,6 +548,31 @@ public Map build() throws IOException, GeneralSecurityException return sslConfigs; } + + private Map buildPem() throws IOException, GeneralSecurityException { + if (!createTrustStore) { + throw new IllegalArgumentException("PEM configs cannot be created with existing trust stores"); + } + + Map sslConfigs = new HashMap<>(); + sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocol); + sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList(tlsProtocol)); + + if (mode != Mode.CLIENT || useClientCert) { + KeyPair keyPair = generateKeyPair(algorithm); + X509Certificate cert = certBuilder.generate("CN=" + cn + ", O=A " + mode.name().toLowerCase(Locale.ROOT), keyPair); + + Password privateKeyPem = new Password(pem(keyPair.getPrivate(), keyPassword)); + Password certPem = new Password(pem(cert)); + sslConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, PEM_TYPE); + sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, PEM_TYPE); + sslConfigs.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, privateKeyPem); + sslConfigs.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, certPem); + sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword); + sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, certPem); + } + return sslConfigs; + } } public static final class TestSslEngineFactory implements SslEngineFactory { diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala index 35d4c8d6d4dd5..4334bc4ebd01d 100644 --- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala @@ -20,6 +20,7 @@ import kafka.utils.JaasTestUtils import kafka.zk.ConfigEntityChangeNotificationZNode import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.scram.internals.ScramMechanism +import org.apache.kafka.test.TestSslUtils import scala.jdk.CollectionConverters._ import org.junit.Before @@ -36,6 +37,10 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) // Create broker credentials before starting brokers createScramCredentials(zkConnect, kafkaPrincipal.getName, kafkaPassword) + TestSslUtils.convertToPemWithoutFiles(serverConfig) + TestSslUtils.convertToPemWithoutFiles(producerConfig) + TestSslUtils.convertToPemWithoutFiles(consumerConfig) + TestSslUtils.convertToPemWithoutFiles(consumerConfig) } override def createPrivilegedAdminClient() = createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, kafkaPassword) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 2c390c4b700aa..604c9de877453 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -206,7 +206,11 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } def verifySslConfig(prefix: String, expectedProps: Properties, configDesc: Config): Unit = { - KEYSTORE_PROPS.forEach { configName => + // Validate file-based SSL keystore configs + val keyStoreProps = new util.HashSet[String](KEYSTORE_PROPS) + keyStoreProps.remove(SSL_KEYSTORE_KEY_CONFIG) + keyStoreProps.remove(SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG) + keyStoreProps.forEach { configName => val desc = configEntry(configDesc, s"$prefix$configName") val isSensitive = configName.contains("password") verifyConfig(configName, desc, isSensitive, isReadOnly = prefix.nonEmpty, if (prefix.isEmpty) invalidSslProperties else sslProperties1) diff --git a/docs/security.html b/docs/security.html index f0a1e5f7a2373..f70e13f3e5bca 100644 --- a/docs/security.html +++ b/docs/security.html @@ -282,6 +282,22 @@
Host Name Verification

For some tooling assistance on this topic, please check out the easyRSA project which has extensive scripting in place to help with these steps.

+
SSL key and certificates in PEM format
+ From 2.7.0 onwards, SSL key and trust stores can be configured for Kafka brokers and clients directly in the configuration in PEM format. + This avoids the need to store separate files on the file system and benefits from password protection features of Kafka configuration. + PEM may also be used as the store type for file-based key and trust stores in addition to JKS and PKCS12. To configure PEM key store directly in the + broker or client configuration, private key in PEM format should be provided in ssl.keystore.key and the certificate chain in PEM format + should be provided in ssl.keystore.certificate.chain. To configure trust store, trust certificates, e.g. public certificate of CA, + should be provided in ssl.truststore.certificates. Since PEM is typically stored as multi-line base-64 strings, the configuration value + can be included in Kafka configuration as multi-line strings with lines terminating in backslash ('\') for line continuation. + +

Store password configs ssl.keystore.password and ssl.truststore.password are not used for PEM. + If private key is encrypted using a password, the key password must be provided in ssl.key.password. Private keys may be provided + in unencrypted form without a password when PEM is specified directly in the config value. In production deployments, configs should be encrypted or + externalized using password protection feature in Kafka in this case. Note that the default SSL engine factory has limited capabilities for decryption + of encrypted private keys when external tools like OpenSSL are used for encryption. Third party libraries like BouncyCastle may be integrated witn a + custom SslEngineFactory to support a wider range of encrypted private keys.

+
  • Common Pitfalls in Production

    The above paragraphs show the process to create your own CA and use it to sign certificates for your cluster. From b203a8373333ba3c64ce14ea397f39c0147f646e Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Tue, 29 Sep 2020 11:50:51 +0100 Subject: [PATCH 2/3] Add new SSL configs to KafkaConfig --- core/src/main/scala/kafka/server/KafkaConfig.scala | 9 +++++++++ .../api/SaslScramSslEndToEndAuthorizationTest.scala | 10 ++++++++-- core/src/test/scala/unit/kafka/KafkaConfigTest.scala | 11 ++++++++++- .../scala/unit/kafka/server/KafkaConfigTest.scala | 3 +++ 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 76b93dabc228f..3d8a9b9bdd2f9 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -522,9 +522,12 @@ object KafkaConfig { val SslKeystoreLocationProp = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG val SslKeystorePasswordProp = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG val SslKeyPasswordProp = SslConfigs.SSL_KEY_PASSWORD_CONFIG + val SslKeystoreKeyProp = SslConfigs.SSL_KEYSTORE_KEY_CONFIG + val SslKeystoreCertificateChainProp = SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG val SslTruststoreTypeProp = SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG val SslTruststoreLocationProp = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG val SslTruststorePasswordProp = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + val SslTruststoreCertificatesProp = SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG val SslKeyManagerAlgorithmProp = SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG val SslTrustManagerAlgorithmProp = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG val SslEndpointIdentificationAlgorithmProp = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG @@ -927,9 +930,12 @@ object KafkaConfig { val SslKeystoreLocationDoc = SslConfigs.SSL_KEYSTORE_LOCATION_DOC val SslKeystorePasswordDoc = SslConfigs.SSL_KEYSTORE_PASSWORD_DOC val SslKeyPasswordDoc = SslConfigs.SSL_KEY_PASSWORD_DOC + val SslKeystoreKeyDoc = SslConfigs.SSL_KEYSTORE_KEY_DOC + val SslKeystoreCertificateChainDoc = SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG val SslTruststoreTypeDoc = SslConfigs.SSL_TRUSTSTORE_TYPE_DOC val SslTruststorePasswordDoc = SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC val SslTruststoreLocationDoc = SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC + val SslTruststoreCertificatesDoc = SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG val SslKeyManagerAlgorithmDoc = SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC val SslTrustManagerAlgorithmDoc = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC val SslEndpointIdentificationAlgorithmDoc = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC @@ -1195,9 +1201,12 @@ object KafkaConfig { .define(SslKeystoreLocationProp, STRING, null, MEDIUM, SslKeystoreLocationDoc) .define(SslKeystorePasswordProp, PASSWORD, null, MEDIUM, SslKeystorePasswordDoc) .define(SslKeyPasswordProp, PASSWORD, null, MEDIUM, SslKeyPasswordDoc) + .define(SslKeystoreKeyProp, PASSWORD, null, MEDIUM, SslKeystoreKeyDoc) + .define(SslKeystoreCertificateChainProp, PASSWORD, null, MEDIUM, SslKeystoreCertificateChainDoc) .define(SslTruststoreTypeProp, STRING, Defaults.SslTruststoreType, MEDIUM, SslTruststoreTypeDoc) .define(SslTruststoreLocationProp, STRING, null, MEDIUM, SslTruststoreLocationDoc) .define(SslTruststorePasswordProp, PASSWORD, null, MEDIUM, SslTruststorePasswordDoc) + .define(SslTruststoreCertificatesProp, PASSWORD, null, MEDIUM, SslTruststoreCertificatesDoc) .define(SslKeyManagerAlgorithmProp, STRING, Defaults.SslKeyManagerAlgorithm, MEDIUM, SslKeyManagerAlgorithmDoc) .define(SslTrustManagerAlgorithmProp, STRING, Defaults.SslTrustManagerAlgorithm, MEDIUM, SslTrustManagerAlgorithmDoc) .define(SslEndpointIdentificationAlgorithmProp, STRING, Defaults.SslEndpointIdentificationAlgorithm, LOW, SslEndpointIdentificationAlgorithmDoc) diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala index 4334bc4ebd01d..eec16239e147d 100644 --- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala @@ -16,6 +16,8 @@ */ package kafka.api +import java.util.Properties + import kafka.utils.JaasTestUtils import kafka.zk.ConfigEntityChangeNotificationZNode import org.apache.kafka.common.security.auth.KafkaPrincipal @@ -37,10 +39,14 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) // Create broker credentials before starting brokers createScramCredentials(zkConnect, kafkaPrincipal.getName, kafkaPassword) - TestSslUtils.convertToPemWithoutFiles(serverConfig) TestSslUtils.convertToPemWithoutFiles(producerConfig) TestSslUtils.convertToPemWithoutFiles(consumerConfig) - TestSslUtils.convertToPemWithoutFiles(consumerConfig) + TestSslUtils.convertToPemWithoutFiles(adminClientConfig) + } + + override def configureListeners(props: collection.Seq[Properties]): Unit = { + props.foreach(TestSslUtils.convertToPemWithoutFiles) + super.configureListeners(props) } override def createPrivilegedAdminClient() = createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, kafkaPassword) diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index 4a388e8055b94..e0d058115e774 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -84,14 +84,23 @@ class KafkaTest { val propertiesFile = prepareDefaultConfig() val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "ssl.keystore.password=keystore_password", "--override", "ssl.key.password=key_password", - "--override", "ssl.truststore.password=truststore_password"))) + "--override", "ssl.truststore.password=truststore_password", + "--override", "ssl.keystore.certificate.chain=certificate_chain", + "--override", "ssl.keystore.key=private_key", + "--override", "ssl.truststore.certificates=truststore_certificates"))) assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeyPasswordProp).toString) assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeystorePasswordProp).toString) assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslTruststorePasswordProp).toString) + assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeystoreKeyProp).toString) + assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeystoreCertificateChainProp).toString) + assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslTruststoreCertificatesProp).toString) assertEquals("key_password", config.getPassword(KafkaConfig.SslKeyPasswordProp).value) assertEquals("keystore_password", config.getPassword(KafkaConfig.SslKeystorePasswordProp).value) assertEquals("truststore_password", config.getPassword(KafkaConfig.SslTruststorePasswordProp).value) + assertEquals("private_key", config.getPassword(KafkaConfig.SslKeystoreKeyProp).value) + assertEquals("certificate_chain", config.getPassword(KafkaConfig.SslKeystoreCertificateChainProp).value) + assertEquals("truststore_certificates", config.getPassword(KafkaConfig.SslTruststoreCertificatesProp).value) } @Test diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 72380a7fff17f..fdacd813552d9 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -740,9 +740,12 @@ class KafkaConfigTest { case KafkaConfig.SslKeystoreLocationProp => // ignore string case KafkaConfig.SslKeystorePasswordProp => // ignore string case KafkaConfig.SslKeyPasswordProp => // ignore string + case KafkaConfig.SslKeystoreCertificateChainProp => // ignore string + case KafkaConfig.SslKeystoreKeyProp => // ignore string case KafkaConfig.SslTruststoreTypeProp => // ignore string case KafkaConfig.SslTruststorePasswordProp => // ignore string case KafkaConfig.SslTruststoreLocationProp => // ignore string + case KafkaConfig.SslTruststoreCertificatesProp => // ignore string case KafkaConfig.SslKeyManagerAlgorithmProp => case KafkaConfig.SslTrustManagerAlgorithmProp => case KafkaConfig.SslClientAuthProp => // ignore string From 19b83fa8e94c335677c02ef9e11c3cb582fab9d2 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Tue, 6 Oct 2020 15:34:14 +0100 Subject: [PATCH 3/3] Address review comments --- .../kafka/common/config/SslConfigs.java | 4 +- .../kafka/common/network/CertStores.java | 4 -- .../common/network/SslTransportLayerTest.java | 39 ++++++------------- .../org/apache/kafka/test/TestSslUtils.java | 2 - .../main/scala/kafka/server/KafkaConfig.scala | 4 +- 5 files changed, 16 insertions(+), 37 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index 6e9e17638ffc3..55a58accf5c7d 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -97,7 +97,7 @@ public class SslConfigs { + "key password must be specified using 'ssl.key.password'"; public static final String SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG = "ssl.keystore.certificate.chain"; - public static final String SSL_KEYSTORE_CERTIFICATE_DOC = "Certificate chain in the format specified by 'ssl.keystore.type'. " + public static final String SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC = "Certificate chain in the format specified by 'ssl.keystore.type'. " + "Default SSL engine factory supports only PEM format with a list of X.509 certificates"; public static final String SSL_TRUSTSTORE_CERTIFICATES_CONFIG = "ssl.truststore.certificates"; @@ -170,7 +170,7 @@ public static void addClientSslSupport(ConfigDef config) { .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC) .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC) .define(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_KEY_DOC) - .define(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_CERTIFICATE_DOC) + .define(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC) .define(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC) .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC) .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC) diff --git a/clients/src/test/java/org/apache/kafka/common/network/CertStores.java b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java index 4e13a909d082d..3230da2c8340a 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/CertStores.java +++ b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java @@ -80,10 +80,6 @@ private CertStores(boolean server, String commonName, String keyAlgorithm, TestS public Map getTrustingConfig(CertStores truststoreConfig) { - return getTrustingConfig(truststoreConfig, false); - } - - public Map getTrustingConfig(CertStores truststoreConfig, boolean usePemCerts) { Map config = new HashMap<>(sslConfig); for (String propName : TRUSTSTORE_PROPS) { config.put(propName, truststoreConfig.sslConfig.get(propName)); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 60a308ad40a02..ccfb2a9f599f4 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -180,7 +180,6 @@ public void testValidEndpointIdentificationSanIp() throws Exception { */ @Test public void testValidEndpointIdentificationCN() throws Exception { - String node = "0"; serverCertStores = certBuilder(true, "localhost").build(); clientCertStores = certBuilder(false, "localhost").build(); sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores); @@ -260,7 +259,6 @@ protected TestSslTransportLayer newTransportLayer(String id, SelectionKey key, S */ @Test public void testInvalidEndpointIdentification() throws Exception { - String node = "0"; serverCertStores = certBuilder(true, "server").addHostName("notahost").build(); clientCertStores = certBuilder(false, "client").addHostName("localhost").build(); sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores); @@ -312,7 +310,6 @@ public void testEndpointIdentificationDisabled() throws Exception { */ @Test public void testClientAuthenticationRequiredValidProvided() throws Exception { - String node = "0"; sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); verifySslConfigs(); } @@ -361,7 +358,6 @@ public void testListenerConfigOverride() throws Exception { */ @Test public void testClientAuthenticationRequiredUntrustedProvided() throws Exception { - String node = "0"; sslServerConfigs = serverCertStores.getUntrustingConfig(); sslServerConfigs.putAll(sslConfigOverrides); sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); @@ -374,7 +370,6 @@ public void testClientAuthenticationRequiredUntrustedProvided() throws Exception */ @Test public void testClientAuthenticationRequiredNotProvided() throws Exception { - String node = "0"; sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); CertStores.KEYSTORE_PROPS.forEach(sslClientConfigs::remove); verifySslConfigsWithHandshakeFailure(); @@ -386,7 +381,6 @@ public void testClientAuthenticationRequiredNotProvided() throws Exception { */ @Test public void testClientAuthenticationDisabledUntrustedProvided() throws Exception { - String node = "0"; sslServerConfigs = serverCertStores.getUntrustingConfig(); sslServerConfigs.putAll(sslConfigOverrides); sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); @@ -399,7 +393,6 @@ public void testClientAuthenticationDisabledUntrustedProvided() throws Exception */ @Test public void testClientAuthenticationDisabledNotProvided() throws Exception { - String node = "0"; sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); CertStores.KEYSTORE_PROPS.forEach(sslClientConfigs::remove); @@ -412,7 +405,6 @@ public void testClientAuthenticationDisabledNotProvided() throws Exception { */ @Test public void testClientAuthenticationRequestedValidProvided() throws Exception { - String node = "0"; sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); verifySslConfigs(); } @@ -423,7 +415,6 @@ public void testClientAuthenticationRequestedValidProvided() throws Exception { */ @Test public void testClientAuthenticationRequestedNotProvided() throws Exception { - String node = "0"; sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); CertStores.KEYSTORE_PROPS.forEach(sslClientConfigs::remove); @@ -506,7 +497,7 @@ public void testPemFilesWithoutServerKeyPassword() throws Exception { * Tests that an invalid SecureRandom implementation cannot be configured */ @Test - public void testInvalidSecureRandomImplementation() throws Exception { + public void testInvalidSecureRandomImplementation() { try (SslChannelBuilder channelBuilder = newClientChannelBuilder()) { sslClientConfigs.put(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, "invalid"); channelBuilder.configure(sslClientConfigs); @@ -520,7 +511,7 @@ public void testInvalidSecureRandomImplementation() throws Exception { * Tests that channels cannot be created if truststore cannot be loaded */ @Test - public void testInvalidTruststorePassword() throws Exception { + public void testInvalidTruststorePassword() { try (SslChannelBuilder channelBuilder = newClientChannelBuilder()) { sslClientConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid"); channelBuilder.configure(sslClientConfigs); @@ -534,7 +525,7 @@ public void testInvalidTruststorePassword() throws Exception { * Tests that channels cannot be created if keystore cannot be loaded */ @Test - public void testInvalidKeystorePassword() throws Exception { + public void testInvalidKeystorePassword() { try (SslChannelBuilder channelBuilder = newClientChannelBuilder()) { sslClientConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid"); channelBuilder.configure(sslClientConfigs); @@ -550,7 +541,6 @@ public void testInvalidKeystorePassword() throws Exception { */ @Test public void testNullTruststorePassword() throws Exception { - String node = "0"; sslClientConfigs.remove(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG); sslServerConfigs.remove(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG); @@ -563,7 +553,6 @@ public void testNullTruststorePassword() throws Exception { */ @Test public void testInvalidKeyPassword() throws Exception { - String node = "0"; sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new Password("invalid")); if (useInlinePem) { // We fail fast for PEM @@ -594,15 +583,15 @@ public void testTlsDefaults() throws Exception { server.verifyAuthenticationMetrics(1, 0); selector.close(); - checkAuthentiationFailed("1", "TLSv1.1"); + checkAuthenticationFailed("1", "TLSv1.1"); server.verifyAuthenticationMetrics(1, 1); - checkAuthentiationFailed("2", "TLSv1"); + checkAuthenticationFailed("2", "TLSv1"); server.verifyAuthenticationMetrics(1, 2); } /** Checks connection failed using the specified {@code tlsVersion}. */ - private void checkAuthentiationFailed(String node, String tlsVersion) throws IOException { + private void checkAuthenticationFailed(String node, String tlsVersion) throws IOException { sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(tlsVersion)); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); @@ -621,7 +610,7 @@ public void testUnsupportedTLSVersion() throws Exception { sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2")); server = createEchoServer(SecurityProtocol.SSL); - checkAuthentiationFailed("0", "TLSv1.1"); + checkAuthenticationFailed("0", "TLSv1.1"); server.verifyAuthenticationMetrics(0, 1); } @@ -639,7 +628,7 @@ public void testUnsupportedCiphers() throws Exception { sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1])); createSelector(sslClientConfigs); - checkAuthentiationFailed("1", tlsProtocol); + checkAuthenticationFailed("1", tlsProtocol); server.verifyAuthenticationMetrics(0, 1); } @@ -1014,7 +1003,7 @@ public void testInterBrokerSslConfigValidation() throws Exception { * fails if certs from keystore are not trusted. */ @Test(expected = KafkaException.class) - public void testInterBrokerSslConfigValidationFailure() throws Exception { + public void testInterBrokerSslConfigValidationFailure() { SecurityProtocol securityProtocol = SecurityProtocol.SSL; sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); TestSecurityConfig config = new TestSecurityConfig(sslServerConfigs); @@ -1205,7 +1194,6 @@ public void testServerTruststoreDynamicUpdate() throws Exception { */ @Test public void testCustomClientSslEngineFactory() throws Exception { - String node = "0"; sslClientConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class); verifySslConfigs(); } @@ -1215,7 +1203,6 @@ public void testCustomClientSslEngineFactory() throws Exception { */ @Test public void testCustomServerSslEngineFactory() throws Exception { - String node = "0"; sslServerConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class); verifySslConfigs(); } @@ -1225,7 +1212,6 @@ public void testCustomServerSslEngineFactory() throws Exception { */ @Test public void testCustomClientAndServerSslEngineFactory() throws Exception { - String node = "0"; sslClientConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class); sslServerConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class); verifySslConfigs(); @@ -1235,7 +1221,7 @@ public void testCustomClientAndServerSslEngineFactory() throws Exception { * Tests invalid ssl.engine.factory plugin class */ @Test(expected = KafkaException.class) - public void testInvalidSslEngineFactory() throws Exception { + public void testInvalidSslEngineFactory() { sslClientConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, String.class); createSelector(sslClientConfigs); } @@ -1336,8 +1322,7 @@ protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id String host, ChannelMetadataRegistry metadataRegistry) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); SSLEngine sslEngine = sslFactory.createSslEngine(host, socketChannel.socket().getPort()); - TestSslTransportLayer transportLayer = newTransportLayer(id, key, sslEngine); - return transportLayer; + return newTransportLayer(id, key, sslEngine); } protected TestSslTransportLayer newTransportLayer(String id, SelectionKey key, SSLEngine sslEngine) throws IOException { @@ -1363,7 +1348,7 @@ class TestSslTransportLayer extends SslTransportLayer { private final AtomicLong numFlushesRemaining; private final AtomicInteger numDelayedFlushesRemaining; - public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { + public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) { super(channelId, key, sslEngine, new DefaultChannelMetadataRegistry()); this.netReadBufSize = new ResizeableBufferSize(netReadBufSizeOverride); this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSizeOverride); diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index 81282421bf7b0..fc72d3da68030 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -240,8 +240,6 @@ public static void convertToPem(Map sslProps, boolean writeToFil key = exportPrivateKey(ksPath, ksPassword, keyPassword, ksType, pemKeyPassword); if (!encryptPrivateKey) sslProps.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); - } else if (!encryptPrivateKey) { - } if (certChain != null) { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3d8a9b9bdd2f9..0e31d6e55d2a7 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -931,11 +931,11 @@ object KafkaConfig { val SslKeystorePasswordDoc = SslConfigs.SSL_KEYSTORE_PASSWORD_DOC val SslKeyPasswordDoc = SslConfigs.SSL_KEY_PASSWORD_DOC val SslKeystoreKeyDoc = SslConfigs.SSL_KEYSTORE_KEY_DOC - val SslKeystoreCertificateChainDoc = SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG + val SslKeystoreCertificateChainDoc = SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC val SslTruststoreTypeDoc = SslConfigs.SSL_TRUSTSTORE_TYPE_DOC val SslTruststorePasswordDoc = SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC val SslTruststoreLocationDoc = SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC - val SslTruststoreCertificatesDoc = SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG + val SslTruststoreCertificatesDoc = SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC val SslKeyManagerAlgorithmDoc = SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC val SslTrustManagerAlgorithmDoc = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC val SslEndpointIdentificationAlgorithmDoc = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC