From 34a23c1c9cc4c7a8de4b47d910dc4bc79c8f0eac Mon Sep 17 00:00:00 2001 From: Andor Molnar Date: Wed, 12 Oct 2022 16:49:33 +0200 Subject: [PATCH 1/6] HBASE-27347. Copied FileChangeWatcher class and tests --- .../hadoop/hbase/io/FileChangeWatcher.java | 251 ++++++++++++++++ .../hbase/io/TestFileChangeWatcher.java | 279 ++++++++++++++++++ 2 files changed, 530 insertions(+) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java new file mode 100644 index 000000000000..77e0e4e750ce --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/FileChangeWatcher.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystem; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.function.Consumer; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.server.ZooKeeperThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Instances of this class can be used to watch a directory for file changes. When a file is added + * to, deleted from, or is modified in the given directory, the callback provided by the user will + * be called from a background thread. Some things to keep in mind: + * + *

+ * This file has been copied from the Apache ZooKeeper project. + * @see Base + * revision + */ +@InterfaceAudience.Private +public final class FileChangeWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class); + + public enum State { + NEW, // object created but start() not called yet + STARTING, // start() called but background thread has not entered main loop + RUNNING, // background thread is running + STOPPING, // stop() called but background thread has not exited main loop + STOPPED // stop() called and background thread has exited, or background thread crashed + } + + private final WatcherThread watcherThread; + private State state; // protected by synchronized(this) + + /** + * Creates a watcher that watches dirPath and invokes callback on + * changes. + * @param dirPath the directory to watch. + * @param callback the callback to invoke with events. event.kind() will return the + * type of event, and event.context() will return the filename + * relative to dirPath. + * @throws IOException if there is an error creating the WatchService. + */ + public FileChangeWatcher(Path dirPath, Consumer> callback) throws IOException { + FileSystem fs = dirPath.getFileSystem(); + WatchService watchService = fs.newWatchService(); + + LOG.debug("Registering with watch service: {}", dirPath); + + dirPath.register(watchService, + new WatchEvent.Kind[] { StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.OVERFLOW }); + state = State.NEW; + this.watcherThread = new WatcherThread(watchService, callback); + this.watcherThread.setDaemon(true); + } + + /** + * Returns the current {@link FileChangeWatcher.State}. + * @return the current state. + */ + public synchronized State getState() { + return state; + } + + /** + * Blocks until the current state becomes desiredState. Currently only used by tests, + * thus package-private. + * @param desiredState the desired state. + * @throws InterruptedException if the current thread gets interrupted. + */ + synchronized void waitForState(State desiredState) throws InterruptedException { + while (this.state != desiredState) { + this.wait(); + } + } + + /** + * Sets the state to newState. + * @param newState the new state. + */ + private synchronized void setState(State newState) { + state = newState; + this.notifyAll(); + } + + /** + * Atomically sets the state to update if and only if the state is currently + * expected. + * @param expected the expected state. + * @param update the new state. + * @return true if the update succeeds, or false if the current state does not equal + * expected. + */ + private synchronized boolean compareAndSetState(State expected, State update) { + if (state == expected) { + setState(update); + return true; + } else { + return false; + } + } + + /** + * Atomically sets the state to update if and only if the state is currently one of + * expectedStates. + * @param expectedStates the expected states. + * @param update the new state. + * @return true if the update succeeds, or false if the current state does not equal any of the + * expectedStates. + */ + private synchronized boolean compareAndSetState(State[] expectedStates, State update) { + for (State expected : expectedStates) { + if (state == expected) { + setState(update); + return true; + } + } + return false; + } + + /** + * Tells the background thread to start. Does not wait for it to be running. Calling this method + * more than once has no effect. + */ + public void start() { + if (!compareAndSetState(State.NEW, State.STARTING)) { + // If previous state was not NEW, start() has already been called. + return; + } + this.watcherThread.start(); + } + + /** + * Tells the background thread to stop. Does not wait for it to exit. + */ + public void stop() { + if (compareAndSetState(new State[] { State.RUNNING, State.STARTING }, State.STOPPING)) { + watcherThread.interrupt(); + } + } + + /** + * Inner class that implements the watcher thread logic. + */ + private class WatcherThread extends ZooKeeperThread { + + private static final String THREAD_NAME = "FileChangeWatcher"; + + final WatchService watchService; + final Consumer> callback; + + WatcherThread(WatchService watchService, Consumer> callback) { + super(THREAD_NAME); + this.watchService = watchService; + this.callback = callback; + } + + @Override + public void run() { + try { + LOG.info("{} thread started", getName()); + if ( + !compareAndSetState(FileChangeWatcher.State.STARTING, FileChangeWatcher.State.RUNNING) + ) { + // stop() called shortly after start(), before + // this thread started running. + FileChangeWatcher.State state = FileChangeWatcher.this.getState(); + if (state != FileChangeWatcher.State.STOPPING) { + throw new IllegalStateException("Unexpected state: " + state); + } + return; + } + runLoop(); + } catch (Exception e) { + LOG.warn("Error in runLoop()", e); + throw e; + } finally { + try { + watchService.close(); + } catch (IOException e) { + LOG.warn("Error closing watch service", e); + } + LOG.info("{} thread finished", getName()); + FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED); + } + } + + private void runLoop() { + while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) { + WatchKey key; + try { + key = watchService.take(); + } catch (InterruptedException | ClosedWatchServiceException e) { + LOG.debug("{} was interrupted and is shutting down...", getName()); + break; + } + for (WatchEvent event : key.pollEvents()) { + LOG.debug("Got file changed event: {} with context: {}", event.kind(), event.context()); + try { + callback.accept(event); + } catch (Throwable e) { + LOG.error("Error from callback", e); + } + } + boolean isKeyValid = key.reset(); + if (!isKeyValid) { + // This is likely a problem, it means that file reloading is broken, probably because the + // directory we are watching was deleted or otherwise became inaccessible (unmounted, + // permissions + // changed, ???). + // For now, we log an error and exit the watcher thread. + LOG.error("Watch key no longer valid, maybe the directory is inaccessible?"); + break; + } + } + } + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java new file mode 100644 index 000000000000..cee368596292 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestFileChangeWatcher.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtil; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This file has been copied from the Apache ZooKeeper project. + * @see Base + * revision + */ +@Category({ IOTests.class, SmallTests.class }) +public class TestFileChangeWatcher { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFileChangeWatcher.class); + + private static File tempDir; + private static File tempFile; + + private static final Logger LOG = LoggerFactory.getLogger(TestFileChangeWatcher.class); + private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); + + private static final long FS_TIMEOUT = 30000L; + + @BeforeClass + public static void createTempFile() throws IOException { + tempDir = new File(UTIL.getDataTestDir(TestFileChangeWatcher.class.getSimpleName()).toString()) + .getCanonicalFile(); + FileUtils.forceMkdir(tempDir); + tempFile = File.createTempFile("zk_test_", "", tempDir); + } + + @AfterClass + public static void cleanupTempDir() { + UTIL.cleanupTestDir(); + } + + @Test + public void testCallbackWorksOnFileChanges() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: {} {}", event.kind(), event.context()); + // Filter out the extra ENTRY_CREATE events that are + // sometimes seen at the start. Even though we create the watcher + // after the file exists, sometimes we still get a create event. + if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { + return; + } + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // TODO hack + for (int i = 0; i < 3; i++) { + LOG.info("Modifying file, attempt {}", (i + 1)); + FileUtils.writeStringToFile(tempFile, "Hello world " + i + "\n", StandardCharsets.UTF_8, + true); + synchronized (events) { + if (events.size() < i + 1) { + events.wait(FS_TIMEOUT); + } + assertEquals("Wrong number of events", i + 1, events.size()); + WatchEvent event = events.get(i); + assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } + + @Test + public void testCallbackWorksOnFileTouched() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: {} {}", event.kind(), event.context()); + // Filter out the extra ENTRY_CREATE events that are + // sometimes seen at the start. Even though we create the watcher + // after the file exists, sometimes we still get a create event. + if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { + return; + } + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // TODO hack + LOG.info("Touching file"); + FileUtils.touch(tempFile); + synchronized (events) { + if (events.isEmpty()) { + events.wait(FS_TIMEOUT); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_MODIFY, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } + + @Test + public void testCallbackWorksOnFileAdded() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: {} {}", event.kind(), event.context()); + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // TODO hack + File tempFile2 = File.createTempFile("zk_test_", "", tempDir); + tempFile2.deleteOnExit(); + synchronized (events) { + if (events.isEmpty()) { + events.wait(FS_TIMEOUT); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_CREATE, event.kind()); + assertEquals(tempFile2.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } + + @Test + public void testCallbackWorksOnFileDeleted() throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final List> events = new ArrayList<>(); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: {} {}", event.kind(), event.context()); + // Filter out the extra ENTRY_CREATE events that are + // sometimes seen at the start. Even though we create the watcher + // after the file exists, sometimes we still get a create event. + if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { + return; + } + synchronized (events) { + events.add(event); + events.notifyAll(); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // TODO hack + tempFile.delete(); + synchronized (events) { + if (events.isEmpty()) { + events.wait(FS_TIMEOUT); + } + assertFalse(events.isEmpty()); + WatchEvent event = events.get(0); + assertEquals(StandardWatchEventKinds.ENTRY_DELETE, event.kind()); + assertEquals(tempFile.getName(), event.context().toString()); + } + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } + + @Test + public void testCallbackErrorDoesNotCrashWatcherThread() + throws IOException, InterruptedException { + FileChangeWatcher watcher = null; + try { + final AtomicInteger callCount = new AtomicInteger(0); + watcher = new FileChangeWatcher(tempDir.toPath(), event -> { + LOG.info("Got an update: {} {}", event.kind(), event.context()); + int oldValue; + synchronized (callCount) { + oldValue = callCount.getAndIncrement(); + callCount.notifyAll(); + } + if (oldValue == 0) { + throw new RuntimeException("This error should not crash the watcher thread"); + } + }); + watcher.start(); + watcher.waitForState(FileChangeWatcher.State.RUNNING); + Thread.sleep(1000L); // TODO hack + LOG.info("Modifying file"); + FileUtils.writeStringToFile(tempFile, "Hello world\n", StandardCharsets.UTF_8, true); + synchronized (callCount) { + while (callCount.get() == 0) { + callCount.wait(FS_TIMEOUT); + } + } + LOG.info("Modifying file again"); + FileUtils.writeStringToFile(tempFile, "Hello world again\n", StandardCharsets.UTF_8, true); + synchronized (callCount) { + if (callCount.get() == 1) { + callCount.wait(FS_TIMEOUT); + } + } + // The value of callCount can exceed 1 only if the callback thread + // survives the exception thrown by the first callback. + assertTrue(callCount.get() > 1); + } finally { + if (watcher != null) { + watcher.stop(); + watcher.waitForState(FileChangeWatcher.State.STOPPED); + } + } + } +} From 910b197a7696b742009cf422739e98fe79d2aa7a Mon Sep 17 00:00:00 2001 From: Andor Molnar Date: Wed, 9 Nov 2022 10:53:27 +0100 Subject: [PATCH 2/6] HBASE-27347. Add file change watcher for cert file reloading --- .../hadoop/hbase/ipc/NettyRpcClient.java | 6 +- .../hadoop/hbase/io/crypto/tls/X509Util.java | 90 ++++++++ .../hbase/io/crypto/tls/X509TestContext.java | 124 +++++++---- .../hadoop/hbase/ipc/NettyRpcServer.java | 20 +- .../security/TestNettyTLSIPCFileWatcher.java | 209 ++++++++++++++++++ 5 files changed, 407 insertions(+), 42 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index 23eaa5a649fa..a8c092ea9c5f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -90,7 +90,7 @@ protected void closeInternal() { SslContext getSslContext() throws X509Exception, IOException { SslContext result = sslContextForClient.get(); if (result == null) { - result = X509Util.createSslContextForClient(conf); + result = X509Util.createSslContextForClient(conf, this::resetContext); if (!sslContextForClient.compareAndSet(null, result)) { // lost the race, another thread already set the value result = sslContextForClient.get(); @@ -98,4 +98,8 @@ SslContext getSslContext() throws X509Exception, IOException { } return result; } + + private void resetContext() { + sslContextForClient.set(null); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java index 96aa66364bec..f1bdc27fd050 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java @@ -17,7 +17,12 @@ */ package org.apache.hadoop.hbase.io.crypto.tls; +import java.io.File; import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; import java.security.GeneralSecurityException; import java.security.KeyStore; import java.security.Security; @@ -25,6 +30,7 @@ import java.security.cert.X509CertSelector; import java.util.Arrays; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.CertPathTrustManagerParameters; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; @@ -38,6 +44,7 @@ import org.apache.hadoop.hbase.exceptions.SSLContextException; import org.apache.hadoop.hbase.exceptions.TrustManagerException; import org.apache.hadoop.hbase.exceptions.X509Exception; +import org.apache.hadoop.hbase.io.FileChangeWatcher; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,6 +86,7 @@ public final class X509Util { CONFIG_PREFIX + "host-verification.reverse-dns.enabled"; private static final String TLS_ENABLED_PROTOCOLS = CONFIG_PREFIX + "enabledProtocols"; private static final String TLS_CIPHER_SUITES = CONFIG_PREFIX + "ciphersuites"; + public static final String TLS_CERT_RELOAD = CONFIG_PREFIX + "certReload"; public static final String DEFAULT_PROTOCOL = "TLSv1.2"; // @@ -192,6 +200,11 @@ static String[] getDefaultCipherSuitesForJavaVersion(String javaVersion) { public static SslContext createSslContextForClient(Configuration config) throws X509Exception, IOException { + return createSslContextForClient(config, null); + } + + public static SslContext createSslContextForClient(Configuration config, Runnable resetContext) + throws X509Exception, IOException { SslContextBuilder sslContextBuilder = SslContextBuilder.forClient(); @@ -204,6 +217,9 @@ public static SslContext createSslContextForClient(Configuration config) } else { sslContextBuilder .keyManager(createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType)); + if (config.getBoolean(TLS_CERT_RELOAD, false)) { + newFileChangeWatcher(keyStoreLocation, resetContext); + } } String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, ""); @@ -223,6 +239,9 @@ public static SslContext createSslContextForClient(Configuration config) sslContextBuilder .trustManager(createTrustManager(trustStoreLocation, trustStorePassword, trustStoreType, sslCrlEnabled, sslOcspEnabled, verifyServerHostname, allowReverseDnsLookup)); + if (config.getBoolean(TLS_CERT_RELOAD, false)) { + newFileChangeWatcher(trustStoreLocation, resetContext); + } } sslContextBuilder.enableOcsp(sslOcspEnabled); @@ -233,6 +252,11 @@ public static SslContext createSslContextForClient(Configuration config) } public static SslContext createSslContextForServer(Configuration config) + throws X509Exception, IOException { + return createSslContextForServer(config, null); + } + + public static SslContext createSslContextForServer(Configuration config, Runnable resetContext) throws X509Exception, IOException { String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, ""); char[] keyStorePassword = config.getPassword(TLS_CONFIG_KEYSTORE_PASSWORD); @@ -247,6 +271,9 @@ public static SslContext createSslContextForServer(Configuration config) sslContextBuilder = SslContextBuilder .forServer(createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType)); + if (config.getBoolean(TLS_CERT_RELOAD, false)) { + newFileChangeWatcher(keyStoreLocation, resetContext); + } String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, ""); char[] trustStorePassword = config.getPassword(TLS_CONFIG_TRUSTSTORE_PASSWORD); @@ -267,6 +294,9 @@ public static SslContext createSslContextForServer(Configuration config) sslContextBuilder .trustManager(createTrustManager(trustStoreLocation, trustStorePassword, trustStoreType, sslCrlEnabled, sslOcspEnabled, verifyClientHostname, allowReverseDnsLookup)); + if (config.getBoolean(TLS_CERT_RELOAD, false)) { + newFileChangeWatcher(trustStoreLocation, resetContext); + } } sslContextBuilder.enableOcsp(sslOcspEnabled); @@ -277,6 +307,11 @@ public static SslContext createSslContextForServer(Configuration config) return sslContextBuilder.build(); } +// public static void enableCertFileReloading() { +// newFileChangeWatcher(keyStoreLocation, resetContext); +// +// } + /** * Creates a key manager by loading the key store from the given file of the given type, * optionally decrypting it using the given password. @@ -395,4 +430,59 @@ private static String[] getCipherSuites(Configuration config) { return cipherSuitesInput.split(","); } } + + private static void newFileChangeWatcher(String fileLocation, Runnable resetContext) throws IOException { + if (fileLocation == null || fileLocation.isEmpty() || resetContext == null) { + return; + } + final Path filePath = Paths.get(fileLocation).toAbsolutePath(); + Path parentPath = filePath.getParent(); + if (parentPath == null) { + throw new IOException( + "Key/trust store path does not have a parent: " + filePath); + } + AtomicReference fileWatcher = new AtomicReference<>(); + fileWatcher.set(new FileChangeWatcher( + parentPath, + watchEvent -> { + handleWatchEvent(filePath, watchEvent, resetContext, fileWatcher); + })); + fileWatcher.get().start(); + } + + /** + * Handler for watch events that let us know a file we may care about has changed on disk. + * + * @param filePath the path to the file we are watching for changes. + * @param event the WatchEvent. + */ + private static void handleWatchEvent(Path filePath, WatchEvent event, Runnable resetContext, + AtomicReference fileWatcher) { + boolean shouldResetContext = false; + Path dirPath = filePath.getParent(); + if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) { + // If we get notified about possibly missed events, reload the key store / trust store just to be sure. + shouldResetContext = true; + } else if (event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY) || + event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE)) { + Path eventFilePath = dirPath.resolve((Path) event.context()); + if (filePath.equals(eventFilePath)) { + shouldResetContext = true; + } + } + // Note: we don't care about delete events + if (shouldResetContext) { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempting to reset default SSL context after receiving watch event: " + + event.kind() + " with context: " + event.context()); + } + fileWatcher.getAndSet(null).stop(); + resetContext.run(); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring watch event and keeping previous default SSL context. Event kind: " + + event.kind() + " with context: " + event.context()); + } + } + } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java index 0185ebff0ec8..024aeb4e60d3 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java @@ -20,6 +20,7 @@ import static java.util.Objects.requireNonNull; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.lang.invoke.MethodHandles; @@ -57,16 +58,16 @@ public final class X509TestContext { private final File tempDir; private final Configuration conf; - private final X509Certificate trustStoreCertificate; + private X509Certificate trustStoreCertificate; private final char[] trustStorePassword; - private final KeyPair trustStoreKeyPair; + private KeyPair trustStoreKeyPair; private File trustStoreJksFile; private File trustStorePemFile; private File trustStorePkcs12File; private File trustStoreBcfksFile; - private final KeyPair keyStoreKeyPair; - private final X509Certificate keyStoreCertificate; + private KeyPair keyStoreKeyPair; + private X509Certificate keyStoreCertificate; private final char[] keyStorePassword; private File keyStoreJksFile; private File keyStorePemFile; @@ -101,16 +102,8 @@ private X509TestContext(Configuration conf, File tempDir, KeyPair trustStoreKeyP this.keyStoreKeyPair = requireNonNull(keyStoreKeyPair); this.keyStorePassword = requireNonNull(keyStorePassword); - X500NameBuilder caNameBuilder = new X500NameBuilder(BCStyle.INSTANCE); - caNameBuilder.addRDN(BCStyle.CN, - MethodHandles.lookup().lookupClass().getCanonicalName() + " Root CA"); - trustStoreCertificate = - X509TestHelpers.newSelfSignedCACert(caNameBuilder.build(), trustStoreKeyPair); + createCertificates(); - X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE); - nameBuilder.addRDN(BCStyle.CN, - MethodHandles.lookup().lookupClass().getCanonicalName() + " Zookeeper Test"); - keyStoreCertificate = newCert(nameBuilder.build()); trustStorePkcs12File = null; trustStorePemFile = null; trustStoreJksFile = null; @@ -197,23 +190,26 @@ public File getTrustStoreFile(KeyStoreFileType storeFileType) throws IOException private File getTrustStoreJksFile() throws IOException { if (trustStoreJksFile == null) { - File trustStoreJksFile = File.createTempFile(TRUST_STORE_PREFIX, + trustStoreJksFile = File.createTempFile(TRUST_STORE_PREFIX, KeyStoreFileType.JKS.getDefaultFileExtension(), tempDir); trustStoreJksFile.deleteOnExit(); - try ( - final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreJksFile)) { - byte[] bytes = - X509TestHelpers.certToJavaTrustStoreBytes(trustStoreCertificate, trustStorePassword); - trustStoreOutputStream.write(bytes); - trustStoreOutputStream.flush(); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - this.trustStoreJksFile = trustStoreJksFile; + generateTrustStoreJksFile(); } return trustStoreJksFile; } + private void generateTrustStoreJksFile() throws IOException { + try ( + final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreJksFile)) { + byte[] bytes = + X509TestHelpers.certToJavaTrustStoreBytes(trustStoreCertificate, trustStorePassword); + trustStoreOutputStream.write(bytes); + trustStoreOutputStream.flush(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + private File getTrustStorePemFile() throws IOException { if (trustStorePemFile == null) { File trustStorePemFile = File.createTempFile(TRUST_STORE_PREFIX, @@ -307,33 +303,32 @@ public File getKeyStoreFile(KeyStoreFileType storeFileType) throws IOException { private File getKeyStoreJksFile() throws IOException { if (keyStoreJksFile == null) { - File keyStoreJksFile = File.createTempFile(KEY_STORE_PREFIX, + keyStoreJksFile = File.createTempFile(KEY_STORE_PREFIX, KeyStoreFileType.JKS.getDefaultFileExtension(), tempDir); keyStoreJksFile.deleteOnExit(); - try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreJksFile)) { - byte[] bytes = X509TestHelpers.certAndPrivateKeyToJavaKeyStoreBytes(keyStoreCertificate, - keyStoreKeyPair.getPrivate(), keyStorePassword); - keyStoreOutputStream.write(bytes); - keyStoreOutputStream.flush(); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - this.keyStoreJksFile = keyStoreJksFile; + generateKeyStoreJksFile(); } return keyStoreJksFile; } + private void generateKeyStoreJksFile() throws IOException { + try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreJksFile)) { + byte[] bytes = X509TestHelpers.certAndPrivateKeyToJavaKeyStoreBytes(keyStoreCertificate, + keyStoreKeyPair.getPrivate(), keyStorePassword); + keyStoreOutputStream.write(bytes); + keyStoreOutputStream.flush(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + private File getKeyStorePemFile() throws IOException { if (keyStorePemFile == null) { try { - File keyStorePemFile = File.createTempFile(KEY_STORE_PREFIX, + keyStorePemFile = File.createTempFile(KEY_STORE_PREFIX, KeyStoreFileType.PEM.getDefaultFileExtension(), tempDir); keyStorePemFile.deleteOnExit(); - FileUtils.writeStringToFile(keyStorePemFile, - X509TestHelpers.pemEncodeCertAndPrivateKey(keyStoreCertificate, - keyStoreKeyPair.getPrivate(), keyStorePassword), - StandardCharsets.US_ASCII, false); - this.keyStorePemFile = keyStorePemFile; + generateKeyStorePemFile(); } catch (OperatorCreationException e) { throw new IOException(e); } @@ -341,6 +336,13 @@ private File getKeyStorePemFile() throws IOException { return keyStorePemFile; } + private void generateKeyStorePemFile() throws IOException, OperatorCreationException { + FileUtils.writeStringToFile(keyStorePemFile, + X509TestHelpers.pemEncodeCertAndPrivateKey(keyStoreCertificate, + keyStoreKeyPair.getPrivate(), keyStorePassword), + StandardCharsets.US_ASCII, false); + } + private File getKeyStorePkcs12File() throws IOException { if (keyStorePkcs12File == null) { File keyStorePkcs12File = File.createTempFile(KEY_STORE_PREFIX, @@ -445,6 +447,48 @@ public X509TestContext cloneWithNewKeystoreCert(X509Certificate cert) { keyStoreKeyPair, keyStorePassword, cert); } + public void regenerateStores(X509KeyType keyStoreKeyType, + X509KeyType trustStoreKeyType, + KeyStoreFileType keyStoreFileType, + KeyStoreFileType trustStoreFileType) + throws GeneralSecurityException, IOException, OperatorCreationException { + + trustStoreKeyPair = X509TestHelpers.generateKeyPair(trustStoreKeyType); + keyStoreKeyPair = X509TestHelpers.generateKeyPair(keyStoreKeyType); + createCertificates(); + + switch (keyStoreFileType) { + case JKS: + generateKeyStoreJksFile(); + break; + case PEM: + generateKeyStorePemFile(); + break; + case BCFKS: + break; + } + + switch (trustStoreFileType) { + case JKS: + generateTrustStoreJksFile(); + break; + } + } + + private void createCertificates() + throws GeneralSecurityException, IOException, OperatorCreationException { + X500NameBuilder caNameBuilder = new X500NameBuilder(BCStyle.INSTANCE); + caNameBuilder.addRDN(BCStyle.CN, + MethodHandles.lookup().lookupClass().getCanonicalName() + " Root CA"); + trustStoreCertificate = + X509TestHelpers.newSelfSignedCACert(caNameBuilder.build(), trustStoreKeyPair); + + X500NameBuilder nameBuilder = new X500NameBuilder(BCStyle.INSTANCE); + nameBuilder.addRDN(BCStyle.CN, + MethodHandles.lookup().lookupClass().getCanonicalName() + " Zookeeper Test"); + keyStoreCertificate = newCert(nameBuilder.build()); + } + /** * Builder class, used for creating new instances of X509TestContext. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index 7de46bb4ea6e..061c3d749a78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -25,6 +25,7 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseServerBase; @@ -85,6 +86,7 @@ public class NettyRpcServer extends RpcServer { private final Channel serverChannel; final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); private final ByteBufAllocator channelAllocator; + private final AtomicReference sslContextForServer = new AtomicReference<>(); public NettyRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, @@ -226,7 +228,7 @@ public int getNumOpenConnections() { private void initSSL(ChannelPipeline p, boolean supportPlaintext) throws X509Exception, IOException { - SslContext nettySslContext = X509Util.createSslContextForServer(conf); + SslContext nettySslContext = getSslContext(); if (supportPlaintext) { p.addLast("ssl", new OptionalSslHandler(nettySslContext)); @@ -236,4 +238,20 @@ private void initSSL(ChannelPipeline p, boolean supportPlaintext) LOG.debug("SSL handler added for channel: {}", p.channel()); } } + + SslContext getSslContext() throws X509Exception, IOException { + SslContext result = sslContextForServer.get(); + if (result == null) { + result = X509Util.createSslContextForServer(conf, this::resetContext); + if (!sslContextForServer.compareAndSet(null, result)) { + // lost the race, another thread already set the value + result = sslContextForServer.get(); + } + } + return result; + } + + private void resetContext() { + sslContextForServer.set(null); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java new file mode 100644 index 000000000000..b5cb130cb811 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseServerBase; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType; +import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType; +import org.apache.hadoop.hbase.io.crypto.tls.X509TestContext; +import org.apache.hadoop.hbase.io.crypto.tls.X509TestContextProvider; +import org.apache.hadoop.hbase.io.crypto.tls.X509Util; +import org.apache.hadoop.hbase.ipc.AbstractRpcClient; +import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; +import org.apache.hadoop.hbase.ipc.NettyRpcClient; +import org.apache.hadoop.hbase.ipc.NettyRpcServer; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.operator.OperatorCreationException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.GeneralSecurityException; +import java.security.Security; +import java.util.List; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Category({ RPCTests.class, MediumTests.class }) +public class TestNettyTLSIPCFileWatcher { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestNettyTLSIPCFileWatcher.class); + + private static final Configuration CONF = HBaseConfiguration.create(); + private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(CONF); + private static HBaseServerBase SERVER; + private static X509TestContextProvider PROVIDER; + private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG; + + private X509TestContext x509TestContext; + + @BeforeClass + public static void setUpBeforeClass() throws IOException { + Security.addProvider(new BouncyCastleProvider()); + File dir = new File(UTIL.getDataTestDir(TestNettyTlsIPC.class.getSimpleName()).toString()) + .getCanonicalFile(); + FileUtils.forceMkdir(dir); + // server must enable tls + CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_ENABLED, true); + PROVIDER = new X509TestContextProvider(CONF, dir); + EVENT_LOOP_GROUP_CONFIG = + NettyEventLoopGroupConfig.setup(CONF, TestNettyTlsIPC.class.getSimpleName()); + SERVER = mock(HBaseServerBase.class); + when(SERVER.getEventLoopGroupConfig()).thenReturn(EVENT_LOOP_GROUP_CONFIG); + } + + @AfterClass + public static void tearDownAfterClass() throws InterruptedException { + Security.removeProvider(BouncyCastleProvider.PROVIDER_NAME); + EVENT_LOOP_GROUP_CONFIG.group().shutdownGracefully().sync(); + UTIL.cleanupTestDir(); + } + + @Before + public void setUp() throws IOException { + x509TestContext = PROVIDER.get(X509KeyType.RSA, X509KeyType.RSA, "keyPa$$word".toCharArray()); + x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS); + CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false); + CONF.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, true); + CONF.setBoolean(X509Util.TLS_CERT_RELOAD, true); + } + + @After + public void tearDown() { + x509TestContext.clearConfigurations(); + x509TestContext.getConf().unset(X509Util.TLS_CONFIG_OCSP); + x509TestContext.getConf().unset(X509Util.TLS_CONFIG_CLR); + x509TestContext.getConf().unset(X509Util.TLS_CONFIG_PROTOCOL); + System.clearProperty("com.sun.net.ssl.checkRevocation"); + System.clearProperty("com.sun.security.enableCRLDP"); + Security.setProperty("ocsp.enable", Boolean.FALSE.toString()); + Security.setProperty("com.sun.security.enableCRLDP", Boolean.FALSE.toString()); + } + + @Test + public void testReplaceServerKeystore() + throws IOException, ServiceException, GeneralSecurityException, OperatorCreationException { + Configuration clientConf = new Configuration(CONF); + RpcServer rpcServer = createRpcServer("testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); + + try { + rpcServer.start(); + + try (AbstractRpcClient client = new NettyRpcClient(clientConf)) { + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = + newBlockingStub(client, rpcServer.getListenerAddress()); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + String message = "hello"; + assertEquals(message, + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); + assertNull(pcrc.cellScanner()); + } + + // Replace keystore + x509TestContext.regenerateStores(X509KeyType.RSA, X509KeyType.RSA, + KeyStoreFileType.JKS, KeyStoreFileType.JKS); + + try (AbstractRpcClient client = new NettyRpcClient(clientConf)) { + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = + newBlockingStub(client, rpcServer.getListenerAddress()); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + String message = "hello"; + assertEquals(message, + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); + assertNull(pcrc.cellScanner()); + } + + } finally { + rpcServer.stop(); + } + } + + @Test + public void testReplaceClientAndServerKeystore() + throws GeneralSecurityException, IOException, OperatorCreationException, ServiceException { + Configuration clientConf = new Configuration(CONF); + RpcServer rpcServer = createRpcServer("testRpcServer", + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); + + try { + rpcServer.start(); + + try (AbstractRpcClient client = new NettyRpcClient(clientConf)) { + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = + newBlockingStub(client, rpcServer.getListenerAddress()); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + String message = "hello"; + assertEquals(message, + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); + assertNull(pcrc.cellScanner()); + + // Replace keystore and cancel client connections + x509TestContext.regenerateStores(X509KeyType.RSA, X509KeyType.RSA, + KeyStoreFileType.JKS, KeyStoreFileType.JKS); + client.cancelConnections( + ServerName.valueOf(Address.fromSocketAddress(rpcServer.getListenerAddress()), 0L)); + + assertEquals(message, + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); + assertNull(pcrc.cellScanner()); + } + } finally { + rpcServer.stop(); + } + } + + private RpcServer createRpcServer(String name, List services, + InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { + return new NettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler, true); + } + + +} From be6aaa31704613fe187b75bb5c6f16b4e7b8f5d5 Mon Sep 17 00:00:00 2001 From: Andor Molnar Date: Wed, 9 Nov 2022 11:11:51 +0100 Subject: [PATCH 3/6] HBASE-27347. Keep state of file watchers in client/server --- .../hadoop/hbase/ipc/NettyRpcClient.java | 23 ++++++-- .../hadoop/hbase/io/crypto/tls/X509Util.java | 54 ++++++------------- .../hadoop/hbase/ipc/NettyRpcServer.java | 22 ++++++-- 3 files changed, 52 insertions(+), 47 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index a8c092ea9c5f..8fcddf034df5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.ipc; +import java.io.File; import java.io.IOException; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicReference; @@ -25,6 +26,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.exceptions.X509Exception; +import org.apache.hadoop.hbase.io.FileChangeWatcher; import org.apache.hadoop.hbase.io.crypto.tls.X509Util; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -49,6 +51,8 @@ public class NettyRpcClient extends AbstractRpcClient { private final boolean shutdownGroupWhenClose; private final AtomicReference sslContextForClient = new AtomicReference<>(); + private final AtomicReference keyStoreWatcher = new AtomicReference<>(); + private final AtomicReference trustStoreWatcher = new AtomicReference<>(); public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, MetricsConnection metrics) { @@ -85,21 +89,30 @@ protected void closeInternal() { if (shutdownGroupWhenClose) { group.shutdownGracefully(); } + FileChangeWatcher ks = keyStoreWatcher.getAndSet(null); + if (ks != null) { + ks.stop(); + } + FileChangeWatcher ts = trustStoreWatcher.getAndSet(null); + if (ts != null) { + ts.stop(); + } } SslContext getSslContext() throws X509Exception, IOException { SslContext result = sslContextForClient.get(); if (result == null) { - result = X509Util.createSslContextForClient(conf, this::resetContext); + result = X509Util.createSslContextForClient(conf); if (!sslContextForClient.compareAndSet(null, result)) { // lost the race, another thread already set the value result = sslContextForClient.get(); + } else if (keyStoreWatcher.get() == null && + trustStoreWatcher.get() == null && + conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)) { + X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher, + () -> sslContextForClient.set(null)); } } return result; } - - private void resetContext() { - sslContextForClient.set(null); - } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java index f1bdc27fd050..06eef6201101 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java @@ -200,11 +200,6 @@ static String[] getDefaultCipherSuitesForJavaVersion(String javaVersion) { public static SslContext createSslContextForClient(Configuration config) throws X509Exception, IOException { - return createSslContextForClient(config, null); - } - - public static SslContext createSslContextForClient(Configuration config, Runnable resetContext) - throws X509Exception, IOException { SslContextBuilder sslContextBuilder = SslContextBuilder.forClient(); @@ -217,9 +212,6 @@ public static SslContext createSslContextForClient(Configuration config, Runnabl } else { sslContextBuilder .keyManager(createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType)); - if (config.getBoolean(TLS_CERT_RELOAD, false)) { - newFileChangeWatcher(keyStoreLocation, resetContext); - } } String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, ""); @@ -239,9 +231,6 @@ public static SslContext createSslContextForClient(Configuration config, Runnabl sslContextBuilder .trustManager(createTrustManager(trustStoreLocation, trustStorePassword, trustStoreType, sslCrlEnabled, sslOcspEnabled, verifyServerHostname, allowReverseDnsLookup)); - if (config.getBoolean(TLS_CERT_RELOAD, false)) { - newFileChangeWatcher(trustStoreLocation, resetContext); - } } sslContextBuilder.enableOcsp(sslOcspEnabled); @@ -252,11 +241,6 @@ public static SslContext createSslContextForClient(Configuration config, Runnabl } public static SslContext createSslContextForServer(Configuration config) - throws X509Exception, IOException { - return createSslContextForServer(config, null); - } - - public static SslContext createSslContextForServer(Configuration config, Runnable resetContext) throws X509Exception, IOException { String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, ""); char[] keyStorePassword = config.getPassword(TLS_CONFIG_KEYSTORE_PASSWORD); @@ -268,12 +252,8 @@ public static SslContext createSslContextForServer(Configuration config, Runnabl } SslContextBuilder sslContextBuilder; - sslContextBuilder = SslContextBuilder .forServer(createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType)); - if (config.getBoolean(TLS_CERT_RELOAD, false)) { - newFileChangeWatcher(keyStoreLocation, resetContext); - } String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, ""); char[] trustStorePassword = config.getPassword(TLS_CONFIG_TRUSTSTORE_PASSWORD); @@ -294,9 +274,6 @@ public static SslContext createSslContextForServer(Configuration config, Runnabl sslContextBuilder .trustManager(createTrustManager(trustStoreLocation, trustStorePassword, trustStoreType, sslCrlEnabled, sslOcspEnabled, verifyClientHostname, allowReverseDnsLookup)); - if (config.getBoolean(TLS_CERT_RELOAD, false)) { - newFileChangeWatcher(trustStoreLocation, resetContext); - } } sslContextBuilder.enableOcsp(sslOcspEnabled); @@ -307,10 +284,15 @@ public static SslContext createSslContextForServer(Configuration config, Runnabl return sslContextBuilder.build(); } -// public static void enableCertFileReloading() { -// newFileChangeWatcher(keyStoreLocation, resetContext); -// -// } + public static void enableCertFileReloading(Configuration config, + AtomicReference keystoreWatcher, + AtomicReference trustStoreWatcher, + Runnable resetContext) throws IOException { + String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, ""); + keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext)); + String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, ""); + trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, resetContext)); + } /** * Creates a key manager by loading the key store from the given file of the given type, @@ -431,9 +413,9 @@ private static String[] getCipherSuites(Configuration config) { } } - private static void newFileChangeWatcher(String fileLocation, Runnable resetContext) throws IOException { + private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runnable resetContext) throws IOException { if (fileLocation == null || fileLocation.isEmpty() || resetContext == null) { - return; + return null; } final Path filePath = Paths.get(fileLocation).toAbsolutePath(); Path parentPath = filePath.getParent(); @@ -441,13 +423,13 @@ private static void newFileChangeWatcher(String fileLocation, Runnable resetCont throw new IOException( "Key/trust store path does not have a parent: " + filePath); } - AtomicReference fileWatcher = new AtomicReference<>(); - fileWatcher.set(new FileChangeWatcher( + FileChangeWatcher fileChangeWatcher = new FileChangeWatcher( parentPath, watchEvent -> { - handleWatchEvent(filePath, watchEvent, resetContext, fileWatcher); - })); - fileWatcher.get().start(); + handleWatchEvent(filePath, watchEvent, resetContext); + }); + fileChangeWatcher.start(); + return fileChangeWatcher; } /** @@ -456,8 +438,7 @@ private static void newFileChangeWatcher(String fileLocation, Runnable resetCont * @param filePath the path to the file we are watching for changes. * @param event the WatchEvent. */ - private static void handleWatchEvent(Path filePath, WatchEvent event, Runnable resetContext, - AtomicReference fileWatcher) { + private static void handleWatchEvent(Path filePath, WatchEvent event, Runnable resetContext) { boolean shouldResetContext = false; Path dirPath = filePath.getParent(); if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) { @@ -476,7 +457,6 @@ private static void handleWatchEvent(Path filePath, WatchEvent event, Runnabl LOG.debug("Attempting to reset default SSL context after receiving watch event: " + event.kind() + " with context: " + event.context()); } - fileWatcher.getAndSet(null).stop(); resetContext.run(); } else { if (LOG.isDebugEnabled()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index 061c3d749a78..f2434ddf0e0b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseServerBase; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.exceptions.X509Exception; +import org.apache.hadoop.hbase.io.FileChangeWatcher; import org.apache.hadoop.hbase.io.crypto.tls.X509Util; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; @@ -87,6 +88,8 @@ public class NettyRpcServer extends RpcServer { final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); private final ByteBufAllocator channelAllocator; private final AtomicReference sslContextForServer = new AtomicReference<>(); + private final AtomicReference keyStoreWatcher = new AtomicReference<>(); + private final AtomicReference trustStoreWatcher = new AtomicReference<>(); public NettyRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, @@ -194,6 +197,14 @@ public synchronized void stop() { return; } LOG.info("Stopping server on " + this.serverChannel.localAddress()); + FileChangeWatcher ks = keyStoreWatcher.getAndSet(null); + if (ks != null) { + ks.stop(); + } + FileChangeWatcher ts = trustStoreWatcher.getAndSet(null); + if (ts != null) { + ts.stop(); + } if (authTokenSecretMgr != null) { authTokenSecretMgr.stop(); authTokenSecretMgr = null; @@ -242,16 +253,17 @@ private void initSSL(ChannelPipeline p, boolean supportPlaintext) SslContext getSslContext() throws X509Exception, IOException { SslContext result = sslContextForServer.get(); if (result == null) { - result = X509Util.createSslContextForServer(conf, this::resetContext); + result = X509Util.createSslContextForServer(conf); if (!sslContextForServer.compareAndSet(null, result)) { // lost the race, another thread already set the value result = sslContextForServer.get(); + } else if (keyStoreWatcher.get() == null && + trustStoreWatcher.get() == null && + conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)) { + X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher, + () -> sslContextForServer.set(null)); } } return result; } - - private void resetContext() { - sslContextForServer.set(null); - } } From dd88c3b196897a54dcc672e9fe40b28475197ad5 Mon Sep 17 00:00:00 2001 From: Andor Molnar Date: Wed, 9 Nov 2022 11:29:31 +0100 Subject: [PATCH 4/6] HBASE-27347. Finish modification on test context class --- .../hbase/io/crypto/tls/X509TestContext.java | 122 +++++++++++------- 1 file changed, 75 insertions(+), 47 deletions(-) diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java index 024aeb4e60d3..ae79be3f3cf2 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java @@ -212,55 +212,64 @@ private void generateTrustStoreJksFile() throws IOException { private File getTrustStorePemFile() throws IOException { if (trustStorePemFile == null) { - File trustStorePemFile = File.createTempFile(TRUST_STORE_PREFIX, + trustStorePemFile = File.createTempFile(TRUST_STORE_PREFIX, KeyStoreFileType.PEM.getDefaultFileExtension(), tempDir); trustStorePemFile.deleteOnExit(); - FileUtils.writeStringToFile(trustStorePemFile, - X509TestHelpers.pemEncodeX509Certificate(trustStoreCertificate), StandardCharsets.US_ASCII, - false); - this.trustStorePemFile = trustStorePemFile; + generateTrustStorePemFile(); } return trustStorePemFile; } + private void generateTrustStorePemFile() throws IOException { + FileUtils.writeStringToFile(trustStorePemFile, + X509TestHelpers.pemEncodeX509Certificate(trustStoreCertificate), StandardCharsets.US_ASCII, + false); + } + private File getTrustStorePkcs12File() throws IOException { if (trustStorePkcs12File == null) { - File trustStorePkcs12File = File.createTempFile(TRUST_STORE_PREFIX, + trustStorePkcs12File = File.createTempFile(TRUST_STORE_PREFIX, KeyStoreFileType.PKCS12.getDefaultFileExtension(), tempDir); trustStorePkcs12File.deleteOnExit(); - try (final FileOutputStream trustStoreOutputStream = - new FileOutputStream(trustStorePkcs12File)) { - byte[] bytes = - X509TestHelpers.certToPKCS12TrustStoreBytes(trustStoreCertificate, trustStorePassword); - trustStoreOutputStream.write(bytes); - trustStoreOutputStream.flush(); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - this.trustStorePkcs12File = trustStorePkcs12File; + generateTrustStorePkcs12File(); } return trustStorePkcs12File; } + private void generateTrustStorePkcs12File() throws IOException { + try (final FileOutputStream trustStoreOutputStream = + new FileOutputStream(trustStorePkcs12File)) { + byte[] bytes = + X509TestHelpers.certToPKCS12TrustStoreBytes(trustStoreCertificate, trustStorePassword); + trustStoreOutputStream.write(bytes); + trustStoreOutputStream.flush(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + private File getTrustStoreBcfksFile() throws IOException { if (trustStoreBcfksFile == null) { - File trustStoreBcfksFile = File.createTempFile(TRUST_STORE_PREFIX, + trustStoreBcfksFile = File.createTempFile(TRUST_STORE_PREFIX, KeyStoreFileType.BCFKS.getDefaultFileExtension(), tempDir); trustStoreBcfksFile.deleteOnExit(); - try ( - final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreBcfksFile)) { - byte[] bytes = - X509TestHelpers.certToBCFKSTrustStoreBytes(trustStoreCertificate, trustStorePassword); - trustStoreOutputStream.write(bytes); - trustStoreOutputStream.flush(); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - this.trustStoreBcfksFile = trustStoreBcfksFile; + generateTrustStoreBcfksFile(); } return trustStoreBcfksFile; } + private void generateTrustStoreBcfksFile() throws IOException { + try ( + final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreBcfksFile)) { + byte[] bytes = + X509TestHelpers.certToBCFKSTrustStoreBytes(trustStoreCertificate, trustStorePassword); + trustStoreOutputStream.write(bytes); + trustStoreOutputStream.flush(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + public X509Certificate getKeyStoreCertificate() { return keyStoreCertificate; } @@ -345,40 +354,46 @@ private void generateKeyStorePemFile() throws IOException, OperatorCreationExcep private File getKeyStorePkcs12File() throws IOException { if (keyStorePkcs12File == null) { - File keyStorePkcs12File = File.createTempFile(KEY_STORE_PREFIX, + keyStorePkcs12File = File.createTempFile(KEY_STORE_PREFIX, KeyStoreFileType.PKCS12.getDefaultFileExtension(), tempDir); keyStorePkcs12File.deleteOnExit(); - try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStorePkcs12File)) { - byte[] bytes = X509TestHelpers.certAndPrivateKeyToPKCS12Bytes(keyStoreCertificate, - keyStoreKeyPair.getPrivate(), keyStorePassword); - keyStoreOutputStream.write(bytes); - keyStoreOutputStream.flush(); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - this.keyStorePkcs12File = keyStorePkcs12File; + generateKeyStorePkcs12File(); } return keyStorePkcs12File; } + private void generateKeyStorePkcs12File() throws IOException { + try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStorePkcs12File)) { + byte[] bytes = X509TestHelpers.certAndPrivateKeyToPKCS12Bytes(keyStoreCertificate, + keyStoreKeyPair.getPrivate(), keyStorePassword); + keyStoreOutputStream.write(bytes); + keyStoreOutputStream.flush(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + private File getKeyStoreBcfksFile() throws IOException { if (keyStoreBcfksFile == null) { - File keyStoreBcfksFile = File.createTempFile(KEY_STORE_PREFIX, + keyStoreBcfksFile = File.createTempFile(KEY_STORE_PREFIX, KeyStoreFileType.BCFKS.getDefaultFileExtension(), tempDir); keyStoreBcfksFile.deleteOnExit(); - try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreBcfksFile)) { - byte[] bytes = X509TestHelpers.certAndPrivateKeyToBCFKSBytes(keyStoreCertificate, - keyStoreKeyPair.getPrivate(), keyStorePassword); - keyStoreOutputStream.write(bytes); - keyStoreOutputStream.flush(); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } - this.keyStoreBcfksFile = keyStoreBcfksFile; + generateKeyStoreBcfksFile(); } return keyStoreBcfksFile; } + private void generateKeyStoreBcfksFile() throws IOException { + try (final FileOutputStream keyStoreOutputStream = new FileOutputStream(keyStoreBcfksFile)) { + byte[] bytes = X509TestHelpers.certAndPrivateKeyToBCFKSBytes(keyStoreCertificate, + keyStoreKeyPair.getPrivate(), keyStorePassword); + keyStoreOutputStream.write(bytes); + keyStoreOutputStream.flush(); + } catch (GeneralSecurityException e) { + throw new IOException(e); + } + } + /** * Sets the SSL system properties such that the given X509Util object can be used to create SSL * Contexts that will use the trust store and key store files created by this test context. @@ -465,6 +480,10 @@ public void regenerateStores(X509KeyType keyStoreKeyType, generateKeyStorePemFile(); break; case BCFKS: + generateKeyStoreBcfksFile(); + break; + case PKCS12: + generateKeyStorePkcs12File(); break; } @@ -472,6 +491,15 @@ public void regenerateStores(X509KeyType keyStoreKeyType, case JKS: generateTrustStoreJksFile(); break; + case PEM: + generateTrustStorePemFile(); + break; + case PKCS12: + generateTrustStorePkcs12File(); + break; + case BCFKS: + generateTrustStoreBcfksFile(); + break; } } From a7e4d6385685b17eac63ed37b66773a04ebf0782 Mon Sep 17 00:00:00 2001 From: Andor Molnar Date: Wed, 9 Nov 2022 12:21:46 +0100 Subject: [PATCH 5/6] HBASE-27347. Make tests parameterized, spotless apply --- .../hadoop/hbase/ipc/NettyRpcClient.java | 8 +- .../hadoop/hbase/io/crypto/tls/X509Util.java | 39 +++++---- .../hbase/io/crypto/tls/X509TestContext.java | 18 ++-- .../hadoop/hbase/ipc/NettyRpcServer.java | 7 +- .../security/TestNettyTLSIPCFileWatcher.java | 85 ++++++++++++------- 5 files changed, 89 insertions(+), 68 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index 8fcddf034df5..231caa40a89e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.File; import java.io.IOException; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicReference; @@ -106,9 +105,10 @@ SslContext getSslContext() throws X509Exception, IOException { if (!sslContextForClient.compareAndSet(null, result)) { // lost the race, another thread already set the value result = sslContextForClient.get(); - } else if (keyStoreWatcher.get() == null && - trustStoreWatcher.get() == null && - conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)) { + } else if ( + keyStoreWatcher.get() == null && trustStoreWatcher.get() == null + && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false) + ) { X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher, () -> sslContextForClient.set(null)); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java index 06eef6201101..aa321ed6ad37 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.io.crypto.tls; -import java.io.File; import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; @@ -285,9 +284,9 @@ public static SslContext createSslContextForServer(Configuration config) } public static void enableCertFileReloading(Configuration config, - AtomicReference keystoreWatcher, - AtomicReference trustStoreWatcher, - Runnable resetContext) throws IOException { + AtomicReference keystoreWatcher, + AtomicReference trustStoreWatcher, Runnable resetContext) + throws IOException { String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, ""); keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext)); String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, ""); @@ -413,28 +412,25 @@ private static String[] getCipherSuites(Configuration config) { } } - private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runnable resetContext) throws IOException { + private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runnable resetContext) + throws IOException { if (fileLocation == null || fileLocation.isEmpty() || resetContext == null) { return null; } final Path filePath = Paths.get(fileLocation).toAbsolutePath(); Path parentPath = filePath.getParent(); if (parentPath == null) { - throw new IOException( - "Key/trust store path does not have a parent: " + filePath); + throw new IOException("Key/trust store path does not have a parent: " + filePath); } - FileChangeWatcher fileChangeWatcher = new FileChangeWatcher( - parentPath, - watchEvent -> { - handleWatchEvent(filePath, watchEvent, resetContext); - }); + FileChangeWatcher fileChangeWatcher = new FileChangeWatcher(parentPath, watchEvent -> { + handleWatchEvent(filePath, watchEvent, resetContext); + }); fileChangeWatcher.start(); return fileChangeWatcher; } /** * Handler for watch events that let us know a file we may care about has changed on disk. - * * @param filePath the path to the file we are watching for changes. * @param event the WatchEvent. */ @@ -442,10 +438,13 @@ private static void handleWatchEvent(Path filePath, WatchEvent event, Runnabl boolean shouldResetContext = false; Path dirPath = filePath.getParent(); if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) { - // If we get notified about possibly missed events, reload the key store / trust store just to be sure. + // If we get notified about possibly missed events, reload the key store / trust store just to + // be sure. shouldResetContext = true; - } else if (event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY) || - event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE)) { + } else if ( + event.kind().equals(StandardWatchEventKinds.ENTRY_MODIFY) + || event.kind().equals(StandardWatchEventKinds.ENTRY_CREATE) + ) { Path eventFilePath = dirPath.resolve((Path) event.context()); if (filePath.equals(eventFilePath)) { shouldResetContext = true; @@ -454,14 +453,14 @@ private static void handleWatchEvent(Path filePath, WatchEvent event, Runnabl // Note: we don't care about delete events if (shouldResetContext) { if (LOG.isDebugEnabled()) { - LOG.debug("Attempting to reset default SSL context after receiving watch event: " + - event.kind() + " with context: " + event.context()); + LOG.debug("Attempting to reset default SSL context after receiving watch event: " + + event.kind() + " with context: " + event.context()); } resetContext.run(); } else { if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring watch event and keeping previous default SSL context. Event kind: " + - event.kind() + " with context: " + event.context()); + LOG.debug("Ignoring watch event and keeping previous default SSL context. Event kind: " + + event.kind() + " with context: " + event.context()); } } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java index ae79be3f3cf2..ad4ffe0ab5a4 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/crypto/tls/X509TestContext.java @@ -20,7 +20,6 @@ import static java.util.Objects.requireNonNull; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.lang.invoke.MethodHandles; @@ -199,8 +198,7 @@ private File getTrustStoreJksFile() throws IOException { } private void generateTrustStoreJksFile() throws IOException { - try ( - final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreJksFile)) { + try (final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStoreJksFile)) { byte[] bytes = X509TestHelpers.certToJavaTrustStoreBytes(trustStoreCertificate, trustStorePassword); trustStoreOutputStream.write(bytes); @@ -237,8 +235,8 @@ private File getTrustStorePkcs12File() throws IOException { } private void generateTrustStorePkcs12File() throws IOException { - try (final FileOutputStream trustStoreOutputStream = - new FileOutputStream(trustStorePkcs12File)) { + try ( + final FileOutputStream trustStoreOutputStream = new FileOutputStream(trustStorePkcs12File)) { byte[] bytes = X509TestHelpers.certToPKCS12TrustStoreBytes(trustStoreCertificate, trustStorePassword); trustStoreOutputStream.write(bytes); @@ -347,8 +345,8 @@ private File getKeyStorePemFile() throws IOException { private void generateKeyStorePemFile() throws IOException, OperatorCreationException { FileUtils.writeStringToFile(keyStorePemFile, - X509TestHelpers.pemEncodeCertAndPrivateKey(keyStoreCertificate, - keyStoreKeyPair.getPrivate(), keyStorePassword), + X509TestHelpers.pemEncodeCertAndPrivateKey(keyStoreCertificate, keyStoreKeyPair.getPrivate(), + keyStorePassword), StandardCharsets.US_ASCII, false); } @@ -462,10 +460,8 @@ public X509TestContext cloneWithNewKeystoreCert(X509Certificate cert) { keyStoreKeyPair, keyStorePassword, cert); } - public void regenerateStores(X509KeyType keyStoreKeyType, - X509KeyType trustStoreKeyType, - KeyStoreFileType keyStoreFileType, - KeyStoreFileType trustStoreFileType) + public void regenerateStores(X509KeyType keyStoreKeyType, X509KeyType trustStoreKeyType, + KeyStoreFileType keyStoreFileType, KeyStoreFileType trustStoreFileType) throws GeneralSecurityException, IOException, OperatorCreationException { trustStoreKeyPair = X509TestHelpers.generateKeyPair(trustStoreKeyType); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index f2434ddf0e0b..4b8aa28ad124 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -257,9 +257,10 @@ SslContext getSslContext() throws X509Exception, IOException { if (!sslContextForServer.compareAndSet(null, result)) { // lost the race, another thread already set the value result = sslContextForServer.get(); - } else if (keyStoreWatcher.get() == null && - trustStoreWatcher.get() == null && - conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)) { + } else if ( + keyStoreWatcher.get() == null && trustStoreWatcher.get() == null + && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false) + ) { X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher, () -> sslContextForServer.set(null)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java index b5cb130cb811..72fc7141680a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestNettyTLSIPCFileWatcher.java @@ -17,6 +17,20 @@ */ package org.apache.hadoop.hbase.security; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.GeneralSecurityException; +import java.security.Security; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -38,13 +52,9 @@ import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.net.Address; -import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; -import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.operator.OperatorCreationException; import org.junit.After; @@ -54,19 +64,16 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.security.GeneralSecurityException; -import java.security.Security; -import java.util.List; -import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; -import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos; + +@RunWith(Parameterized.class) @Category({ RPCTests.class, MediumTests.class }) public class TestNettyTLSIPCFileWatcher { @@ -82,6 +89,23 @@ public class TestNettyTLSIPCFileWatcher { private X509TestContext x509TestContext; + @Parameterized.Parameter(0) + public X509KeyType keyType; + + @Parameterized.Parameter(1) + public KeyStoreFileType storeFileType; + + @Parameterized.Parameters(name = "{index}: keyType={0}, storeFileType={1}") + public static List data() { + List params = new ArrayList<>(); + for (X509KeyType caKeyType : X509KeyType.values()) { + for (KeyStoreFileType ks : KeyStoreFileType.values()) { + params.add(new Object[] { caKeyType, ks }); + } + } + return params; + } + @BeforeClass public static void setUpBeforeClass() throws IOException { Security.addProvider(new BouncyCastleProvider()); @@ -106,8 +130,8 @@ public static void tearDownAfterClass() throws InterruptedException { @Before public void setUp() throws IOException { - x509TestContext = PROVIDER.get(X509KeyType.RSA, X509KeyType.RSA, "keyPa$$word".toCharArray()); - x509TestContext.setConfigurations(KeyStoreFileType.JKS, KeyStoreFileType.JKS); + x509TestContext = PROVIDER.get(keyType, keyType, "keyPa$$word".toCharArray()); + x509TestContext.setConfigurations(storeFileType, storeFileType); CONF.setBoolean(X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, false); CONF.setBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, true); CONF.setBoolean(X509Util.TLS_CERT_RELOAD, true); @@ -142,13 +166,13 @@ public void testReplaceServerKeystore() HBaseRpcController pcrc = new HBaseRpcControllerImpl(); String message = "hello"; assertEquals(message, - stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) + .getMessage()); assertNull(pcrc.cellScanner()); } // Replace keystore - x509TestContext.regenerateStores(X509KeyType.RSA, X509KeyType.RSA, - KeyStoreFileType.JKS, KeyStoreFileType.JKS); + x509TestContext.regenerateStores(keyType, keyType, storeFileType, storeFileType); try (AbstractRpcClient client = new NettyRpcClient(clientConf)) { TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = @@ -156,7 +180,8 @@ public void testReplaceServerKeystore() HBaseRpcController pcrc = new HBaseRpcControllerImpl(); String message = "hello"; assertEquals(message, - stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) + .getMessage()); assertNull(pcrc.cellScanner()); } @@ -182,17 +207,18 @@ public void testReplaceClientAndServerKeystore() HBaseRpcController pcrc = new HBaseRpcControllerImpl(); String message = "hello"; assertEquals(message, - stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) + .getMessage()); assertNull(pcrc.cellScanner()); // Replace keystore and cancel client connections - x509TestContext.regenerateStores(X509KeyType.RSA, X509KeyType.RSA, - KeyStoreFileType.JKS, KeyStoreFileType.JKS); + x509TestContext.regenerateStores(keyType, keyType, storeFileType, storeFileType); client.cancelConnections( ServerName.valueOf(Address.fromSocketAddress(rpcServer.getListenerAddress()), 0L)); assertEquals(message, - stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) + .getMessage()); assertNull(pcrc.cellScanner()); } } finally { @@ -200,10 +226,9 @@ public void testReplaceClientAndServerKeystore() } } - private RpcServer createRpcServer(String name, List services, - InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { + private RpcServer createRpcServer(String name, + List services, InetSocketAddress bindAddress, + Configuration conf, RpcScheduler scheduler) throws IOException { return new NettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler, true); } - - } From 5aa518ab8cde2e63be8d4fca2c3ce43f2192f27a Mon Sep 17 00:00:00 2001 From: Andor Molnar Date: Thu, 10 Nov 2022 15:19:20 +0100 Subject: [PATCH 6/6] HBASE-27347. Address review comments --- .../hadoop/hbase/io/crypto/tls/X509Util.java | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java index aa321ed6ad37..f120b457b5a3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/crypto/tls/X509Util.java @@ -283,16 +283,6 @@ public static SslContext createSslContextForServer(Configuration config) return sslContextBuilder.build(); } - public static void enableCertFileReloading(Configuration config, - AtomicReference keystoreWatcher, - AtomicReference trustStoreWatcher, Runnable resetContext) - throws IOException { - String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, ""); - keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext)); - String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, ""); - trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, resetContext)); - } - /** * Creates a key manager by loading the key store from the given file of the given type, * optionally decrypting it using the given password. @@ -412,6 +402,24 @@ private static String[] getCipherSuites(Configuration config) { } } + /** + * Enable certificate file reloading by creating FileWatchers for keystore and truststore. + * AtomicReferences will be set with the new instances. resetContext - if not null - will be + * called when the file has been modified. + * @param keystoreWatcher Reference to keystoreFileWatcher. + * @param trustStoreWatcher Reference to truststoreFileWatcher. + * @param resetContext Callback for file changes. + */ + public static void enableCertFileReloading(Configuration config, + AtomicReference keystoreWatcher, + AtomicReference trustStoreWatcher, Runnable resetContext) + throws IOException { + String keyStoreLocation = config.get(TLS_CONFIG_KEYSTORE_LOCATION, ""); + keystoreWatcher.set(newFileChangeWatcher(keyStoreLocation, resetContext)); + String trustStoreLocation = config.get(TLS_CONFIG_TRUSTSTORE_LOCATION, ""); + trustStoreWatcher.set(newFileChangeWatcher(trustStoreLocation, resetContext)); + } + private static FileChangeWatcher newFileChangeWatcher(String fileLocation, Runnable resetContext) throws IOException { if (fileLocation == null || fileLocation.isEmpty() || resetContext == null) { @@ -452,15 +460,15 @@ private static void handleWatchEvent(Path filePath, WatchEvent event, Runnabl } // Note: we don't care about delete events if (shouldResetContext) { - if (LOG.isDebugEnabled()) { - LOG.debug("Attempting to reset default SSL context after receiving watch event: " - + event.kind() + " with context: " + event.context()); - } + LOG.info( + "Attempting to reset default SSL context after receiving watch event: {} with context: {}", + event.kind(), event.context()); resetContext.run(); } else { if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring watch event and keeping previous default SSL context. Event kind: " - + event.kind() + " with context: " + event.context()); + LOG.debug( + "Ignoring watch event and keeping previous default SSL context. Event kind: {} with context: {}", + event.kind(), event.context()); } } }