From da43b86e9edab39547d837c157f81ee0b50f9f2c Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 4 Apr 2019 22:54:36 +0100 Subject: [PATCH 1/3] KAFKA-8190; Don't update keystore modification time during validation --- .../org/apache/kafka/common/security/ssl/SslFactory.java | 4 ++-- .../apache/kafka/common/security/ssl/SslFactoryTest.java | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index b9b52037c52a0..33aa56ac9c0c9 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -316,7 +316,7 @@ static class SecurityStore { private final String path; private final Password password; private final Password keyPassword; - private Long fileLastModifiedMs; + private final Long fileLastModifiedMs; SecurityStore(String type, String path, Password password, Password keyPassword) { Objects.requireNonNull(type, "type must not be null"); @@ -324,6 +324,7 @@ static class SecurityStore { this.path = path; this.password = password; this.keyPassword = keyPassword; + fileLastModifiedMs = lastModifiedMs(path); } /** @@ -338,7 +339,6 @@ KeyStore load() { // If a password is not set access to the truststore is still available, but integrity checking is disabled. char[] passwordChars = password != null ? password.value().toCharArray() : null; ks.load(in, passwordChars); - fileLastModifiedMs = lastModifiedMs(path); log.debug("Loaded key store with path {} modification time {}", path, fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs)); diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java index bfe34c98382be..6c4a2391c8b89 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java @@ -120,6 +120,13 @@ public void testReconfiguration() throws Exception { assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext()); sslContext = sslFactory.sslContext(); + // Verify that context is recreated after validation on reconfigure() if config is not changed, but keystore file was modified + keyStoreFile.setLastModified(System.currentTimeMillis() + 15000); + sslFactory.validateReconfiguration(sslConfig); + sslFactory.reconfigure(sslConfig); + assertNotSame("SSL context not recreated", sslContext, sslFactory.sslContext()); + sslContext = sslFactory.sslContext(); + // Verify that the context is not recreated if modification time cannot be determined keyStoreFile.setLastModified(System.currentTimeMillis() + 20000); Files.delete(keyStoreFile.toPath()); From 43ac6985172c34d8f627cece471da033c429dbea Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Fri, 5 Apr 2019 10:06:00 +0100 Subject: [PATCH 2/3] Update DynamicBrokerReconfigurationTest --- .../DynamicBrokerReconfigurationTest.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index e8aa081925c26..7299075f91310 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -253,6 +253,28 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet alterSslKeystore(adminClient, sslPropertiesCopy, SecureInternal) verifyProduceConsume(producer, consumer, 10, topic2) + // Verify that keystores can be updated using same file name. + val reusableProps = sslProperties2.clone().asInstanceOf[Properties] + val reusableFile = File.createTempFile("keystore", ".jks") + reusableProps.setProperty(SSL_KEYSTORE_LOCATION_CONFIG, reusableFile.getPath) + Files.copy(new File(sslProperties1.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath, + reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING) + alterSslKeystore(adminClient, reusableProps, SecureExternal) + val producer3 = ProducerBuilder().trustStoreProps(sslProperties2).maxRetries(0).build() + verifyAuthenticationFailure(producer3) + // Now alter using same file name. We can't check if the update has completed by comparing config on + // the broker, so we wait for producer operation to succeed to verify that the update has been performed. + Files.copy(new File(sslProperties2.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath, + reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING) + alterSslKeystore(adminClient, reusableProps, SecureExternal) + TestUtils.waitUntilTrue(() => { + try { + producer3.partitionsFor(topic).size() == numPartitions + } catch { + case _: Exception => false + } + }, "Keystore not updated") + // Verify that all messages sent with retries=0 while keystores were being altered were consumed stopAndVerifyProduceConsume(producerThread, consumerThread) } From d0597b5da45ddeba4efdd06347a285a90257f371 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Fri, 5 Apr 2019 13:56:53 +0100 Subject: [PATCH 3/3] Update file modification time to avoid transient failures, log at info level --- .../apache/kafka/common/security/ssl/SslFactory.java | 12 +++++++++--- .../server/DynamicBrokerReconfigurationTest.scala | 1 + 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index 33aa56ac9c0c9..0c5094df4a5da 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -139,6 +139,7 @@ else if (clientAuthConfig.equals("requested")) (Password) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); try { this.sslContext = createSSLContext(keystore, truststore); + log.debug("Created SSL context with keystore {} truststore {}", keystore, truststore); } catch (Exception e) { throw new KafkaException(e); } @@ -173,6 +174,7 @@ public void reconfigure(Map configs) throws KafkaException { SecurityStore keystore = newKeystore != null ? newKeystore : this.keystore; SecurityStore truststore = newTruststore != null ? newTruststore : this.truststore; this.sslContext = createSSLContext(keystore, truststore); + log.info("Created new SSL context with keystore {} truststore {}", keystore, truststore); this.keystore = keystore; this.truststore = truststore; } catch (Exception e) { @@ -339,9 +341,6 @@ KeyStore load() { // If a password is not set access to the truststore is still available, but integrity checking is disabled. char[] passwordChars = password != null ? password.value().toCharArray() : null; ks.load(in, passwordChars); - - log.debug("Loaded key store with path {} modification time {}", path, - fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs)); return ks; } catch (GeneralSecurityException | IOException e) { throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e); @@ -361,6 +360,13 @@ boolean modified() { Long modifiedMs = lastModifiedMs(path); return modifiedMs != null && !Objects.equals(modifiedMs, this.fileLastModifiedMs); } + + @Override + public String toString() { + return "SecurityStore(" + + "path=" + path + + ", modificationTime=" + (fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs)) + ")"; + } } /** diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 7299075f91310..fb7b539954576 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -266,6 +266,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet // the broker, so we wait for producer operation to succeed to verify that the update has been performed. Files.copy(new File(sslProperties2.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)).toPath, reusableFile.toPath, StandardCopyOption.REPLACE_EXISTING) + reusableFile.setLastModified(System.currentTimeMillis() + 1000) alterSslKeystore(adminClient, reusableProps, SecureExternal) TestUtils.waitUntilTrue(() => { try {