Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.clients" />
</subpackage>
<subpackage name="ssl">
<allow pkg="javax.crypto" />
</subpackage>
<subpackage name="scram">
<allow pkg="javax.crypto" />
</subpackage>
Expand Down
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>

<suppress checks="CyclomaticComplexity"
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor).java"/>
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory).java"/>

<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
Expand Down Expand Up @@ -96,7 +96,7 @@
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>

<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest"/>
files="MemoryRecordsTest|MetricsTest|TestSslUtils"/>

<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.config;

import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.utils.Java;
import org.apache.kafka.common.utils.Utils;
Expand Down Expand Up @@ -90,17 +91,31 @@ public class SslConfigs {
+ "This is optional for client.";
public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS";

public static final String SSL_KEYSTORE_KEY_CONFIG = "ssl.keystore.key";
public static final String SSL_KEYSTORE_KEY_DOC = "Private key in the format specified by 'ssl.keystore.type'. "
+ "Default SSL engine factory supports only PEM format with PKCS#8 keys. If the key is encrypted, "
+ "key password must be specified using 'ssl.key.password'";

public static final String SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG = "ssl.keystore.certificate.chain";
public static final String SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC = "Certificate chain in the format specified by 'ssl.keystore.type'. "
+ "Default SSL engine factory supports only PEM format with a list of X.509 certificates";

public static final String SSL_TRUSTSTORE_CERTIFICATES_CONFIG = "ssl.truststore.certificates";
public static final String SSL_TRUSTSTORE_CERTIFICATES_DOC = "Trusted certificates in the format specified by 'ssl.truststore.type'. "
+ "Default SSL engine factory supports only PEM format with X.509 certificates.";

public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. "
+ "This is optional for client and can be used for two-way authentication for client.";

public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file. "
+ "This is optional for client and only needed if ssl.keystore.location is configured. ";
+ "This is optional for client and only needed if 'ssl.keystore.location' is configured. "
+ " Key store password is not supported for PEM format.";

public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";
public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file. "
+ "This is optional for client.";
public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file or"
+ "the PEM key specified in `ssl.keystore.key'. This is required for clients only if two-way authentication is configured.";

public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file.";
Expand All @@ -110,7 +125,9 @@ public class SslConfigs {
public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. ";

public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password";
public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.";
public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. "
+ "If a password is not set, trust store file configured will still be used, but integrity checking is disabled. "
+ "Trust store password is not supported for PEM format.";

public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm";
public static final String SSL_KEYMANAGER_ALGORITHM_DOC = "The algorithm used by key manager factory for SSL connections. "
Expand Down Expand Up @@ -152,6 +169,9 @@ public static void addClientSslSupport(ConfigDef config) {
.define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC)
.define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC)
.define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC)
.define(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_KEY_DOC)
.define(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
Expand All @@ -169,7 +189,10 @@ public static void addClientSslSupport(ConfigDef config) {
SslConfigs.SSL_KEY_PASSWORD_CONFIG,
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG,
SslConfigs.SSL_KEYSTORE_KEY_CONFIG,
SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG);

public static final Set<String> NON_RECONFIGURABLE_CONFIGS = Utils.mkSet(
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public void configure(Map<String, ?> configs) throws KafkaException {
sslPrincipalMapper = SslPrincipalMapper.fromRules(sslPrincipalMappingRules);
this.sslFactory = new SslFactory(mode, null, isInterBrokerListener);
this.sslFactory.configure(this.configs);
} catch (KafkaException e) {
throw e;
} catch (Exception e) {
throw new KafkaException(e);
}
Expand Down
Loading