Skip to content

Commit 7be8bd8

Browse files
KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651) (apache#9345)
Adds support for SSL key and trust stores to be specified in PEM format either as files or directly as configuration values. Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
1 parent 05f9803 commit 7be8bd8

File tree

16 files changed

+1286
-268
lines changed

16 files changed

+1286
-268
lines changed

checkstyle/import-control.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@
109109
<allow pkg="org.apache.kafka.common.requests" />
110110
<allow pkg="org.apache.kafka.clients" />
111111
</subpackage>
112+
<subpackage name="ssl">
113+
<allow pkg="javax.crypto" />
114+
</subpackage>
112115
<subpackage name="scram">
113116
<allow pkg="javax.crypto" />
114117
</subpackage>

checkstyle/suppressions.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
6464

6565
<suppress checks="CyclomaticComplexity"
66-
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor).java"/>
66+
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory).java"/>
6767

6868
<suppress checks="JavaNCSS"
6969
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
@@ -96,7 +96,7 @@
9696
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
9797

9898
<suppress checks="NPathComplexity"
99-
files="MemoryRecordsTest|MetricsTest"/>
99+
files="MemoryRecordsTest|MetricsTest|TestSslUtils"/>
100100

101101
<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
102102
files="Murmur3Test.java"/>

clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.common.config;
1818

19+
import org.apache.kafka.common.config.ConfigDef.Type;
1920
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
2021
import org.apache.kafka.common.utils.Java;
2122
import org.apache.kafka.common.utils.Utils;
@@ -90,17 +91,31 @@ public class SslConfigs {
9091
+ "This is optional for client.";
9192
public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS";
9293

94+
public static final String SSL_KEYSTORE_KEY_CONFIG = "ssl.keystore.key";
95+
public static final String SSL_KEYSTORE_KEY_DOC = "Private key in the format specified by 'ssl.keystore.type'. "
96+
+ "Default SSL engine factory supports only PEM format with PKCS#8 keys. If the key is encrypted, "
97+
+ "key password must be specified using 'ssl.key.password'";
98+
99+
public static final String SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG = "ssl.keystore.certificate.chain";
100+
public static final String SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC = "Certificate chain in the format specified by 'ssl.keystore.type'. "
101+
+ "Default SSL engine factory supports only PEM format with a list of X.509 certificates";
102+
103+
public static final String SSL_TRUSTSTORE_CERTIFICATES_CONFIG = "ssl.truststore.certificates";
104+
public static final String SSL_TRUSTSTORE_CERTIFICATES_DOC = "Trusted certificates in the format specified by 'ssl.truststore.type'. "
105+
+ "Default SSL engine factory supports only PEM format with X.509 certificates.";
106+
93107
public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
94108
public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. "
95109
+ "This is optional for client and can be used for two-way authentication for client.";
96110

97111
public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
98112
public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file. "
99-
+ "This is optional for client and only needed if ssl.keystore.location is configured. ";
113+
+ "This is optional for client and only needed if 'ssl.keystore.location' is configured. "
114+
+ " Key store password is not supported for PEM format.";
100115

101116
public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";
102-
public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file. "
103-
+ "This is optional for client.";
117+
public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file or"
118+
+ "the PEM key specified in `ssl.keystore.key'. This is required for clients only if two-way authentication is configured.";
104119

105120
public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
106121
public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file.";
@@ -110,7 +125,9 @@ public class SslConfigs {
110125
public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. ";
111126

112127
public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password";
113-
public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.";
128+
public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. "
129+
+ "If a password is not set, trust store file configured will still be used, but integrity checking is disabled. "
130+
+ "Trust store password is not supported for PEM format.";
114131

115132
public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm";
116133
public static final String SSL_KEYMANAGER_ALGORITHM_DOC = "The algorithm used by key manager factory for SSL connections. "
@@ -152,6 +169,9 @@ public static void addClientSslSupport(ConfigDef config) {
152169
.define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC)
153170
.define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC)
154171
.define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC)
172+
.define(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_KEY_DOC)
173+
.define(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC)
174+
.define(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC)
155175
.define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
156176
.define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC)
157177
.define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
@@ -169,7 +189,10 @@ public static void addClientSslSupport(ConfigDef config) {
169189
SslConfigs.SSL_KEY_PASSWORD_CONFIG,
170190
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
171191
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
172-
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
192+
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
193+
SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG,
194+
SslConfigs.SSL_KEYSTORE_KEY_CONFIG,
195+
SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG);
173196

174197
public static final Set<String> NON_RECONFIGURABLE_CONFIGS = Utils.mkSet(
175198
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,

clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ public void configure(Map<String, ?> configs) throws KafkaException {
7070
sslPrincipalMapper = SslPrincipalMapper.fromRules(sslPrincipalMappingRules);
7171
this.sslFactory = new SslFactory(mode, null, isInterBrokerListener);
7272
this.sslFactory.configure(this.configs);
73+
} catch (KafkaException e) {
74+
throw e;
7375
} catch (Exception e) {
7476
throw new KafkaException(e);
7577
}

0 commit comments

Comments
 (0)