From 3bbfb862c6dc1e9370da26a41c467ca49450acd3 Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Thu, 18 Jul 2024 11:20:24 -0700 Subject: [PATCH 1/3] HBASE-28428 : Zookeeper ConnectionRegistry APIs should have timeout --- .../hbase/client/ZKConnectionRegistry.java | 16 ++++++-- .../hbase/zookeeper/ReadOnlyZKClient.java | 39 ++++++++++++++++++- .../hbase/zookeeper/TestReadOnlyZKClient.java | 27 ++++++++++++- 3 files changed, 76 insertions(+), 6 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index 6ef3d1c947df..a6afef38dc8f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -72,11 +72,19 @@ class ZKConnectionRegistry implements ConnectionRegistry { private final ReadOnlyZKClient zk; private final ZNodePaths znodePaths; + private final Configuration conf; + private final int zkRegistryAsyncTimeout; + public static final String ZK_REGISTRY_ASYNC_GET_TIMEOUT = "zookeeper.registry.async.get.timeout"; + public static final int DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT = 60000; // 1 min // User not used, but for rpc based registry we need it ZKConnectionRegistry(Configuration conf, User ignored) { this.znodePaths = new ZNodePaths(conf); - this.zk = new ReadOnlyZKClient(conf); + this.zk = new ReadOnlyZKClient(conf, AsyncConnectionImpl.RETRY_TIMER); + this.conf = conf; + this.zkRegistryAsyncTimeout = + conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT); + // this.zk = new ReadOnlyZKClient(conf); if (NEEDS_LOG_WARN) { synchronized (WARN_LOCK) { if (NEEDS_LOG_WARN) { @@ -94,7 +102,7 @@ private interface Converter { private CompletableFuture getAndConvert(String path, Converter converter) { CompletableFuture future = new CompletableFuture<>(); - addListener(zk.get(path), (data, error) -> { + addListener(zk.get(path, this.zkRegistryAsyncTimeout), (data, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -228,8 +236,8 @@ public CompletableFuture getMetaRegionLocations() { return tracedFuture(() -> { CompletableFuture future = new CompletableFuture<>(); addListener( - zk.list(znodePaths.baseZNode).thenApply(children -> children.stream() - .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), + zk.list(znodePaths.baseZNode, this.zkRegistryAsyncTimeout).thenApply(children -> children + .stream().filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), (metaReplicaZNodes, error) -> { if (error != null) { future.completeExceptionally(error); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 979094fda80b..0148d10139ae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -42,6 +42,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import org.apache.hbase.thirdparty.io.netty.util.TimerTask; + /** * A very simple read only zookeeper implementation without watcher support. */ @@ -75,6 +78,8 @@ public final class ReadOnlyZKClient implements Closeable { private final int keepAliveTimeMs; + private HashedWheelTimer retryTimer; + private static abstract class Task implements Delayed { protected long time = System.nanoTime(); @@ -123,7 +128,7 @@ private String getId() { return String.format("0x%08x", System.identityHashCode(this)); } - public ReadOnlyZKClient(Configuration conf) { + public ReadOnlyZKClient(Configuration conf, HashedWheelTimer retryTimer) { // We might use a different ZK for client access String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); if (clientZkQuorumServers != null) { @@ -136,6 +141,7 @@ public ReadOnlyZKClient(Configuration conf) { this.retryIntervalMs = conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); + this.retryTimer = retryTimer; LOG.debug( "Connect {} to {} with session timeout={}ms, retries {}, " + "retry interval {}ms, keepAlive={}ms", @@ -253,6 +259,23 @@ public void closed(IOException e) { } } + private static TimerTask getTimerTask(final long timeoutMs, final CompletableFuture future, + final String api) { + return timeout -> { + if (!future.isDone()) { + future.completeExceptionally(new DoNotRetryIOException( + "Zookeeper " + api + " could not be completed in " + timeoutMs + " ms")); + } + }; + } + + public CompletableFuture get(final String path, final long timeoutMs) { + CompletableFuture future = get(path); + TimerTask timerTask = getTimerTask(timeoutMs, future, "GET"); + retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); + return future; + } + public CompletableFuture get(String path) { if (closed.get()) { return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); @@ -269,6 +292,13 @@ protected void doExec(ZooKeeper zk) { return future; } + public CompletableFuture exists(String path, long timeoutMs) { + CompletableFuture future = exists(path); + TimerTask timerTask = getTimerTask(timeoutMs, future, "EXISTS"); + retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); + return future; + } + public CompletableFuture exists(String path) { if (closed.get()) { return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); @@ -284,6 +314,13 @@ protected void doExec(ZooKeeper zk) { return future; } + public CompletableFuture> list(String path, long timeoutMs) { + CompletableFuture> future = list(path); + TimerTask timerTask = getTimerTask(timeoutMs, future, "LIST"); + retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); + return future; + } + public CompletableFuture> list(String path) { if (closed.get()) { return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java index 0681ed68c5f9..692b20e586bc 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java @@ -43,6 +43,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtility; @@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ZKTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -63,6 +65,9 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; + @Category({ ZKTests.class, MediumTests.class }) public class TestReadOnlyZKClient { @@ -79,6 +84,10 @@ public class TestReadOnlyZKClient { private static int CHILDREN = 5; private static ReadOnlyZKClient RO_ZK; + private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( + new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) + .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), + 10, TimeUnit.MILLISECONDS); @BeforeClass public static void setUp() throws Exception { @@ -98,13 +107,15 @@ public static void setUp() throws Exception { conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY, 3); conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY_INTERVAL_MILLIS, 100); conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000); - RO_ZK = new ReadOnlyZKClient(conf); + // RO_ZK = new ReadOnlyZKClient(conf); + RO_ZK = new ReadOnlyZKClient(conf, RETRY_TIMER); // only connect when necessary assertNull(RO_ZK.zookeeper); } @AfterClass public static void tearDown() throws IOException { + RETRY_TIMER.stop(); RO_ZK.close(); UTIL.shutdownMiniZKCluster(); UTIL.cleanupTestDir(); @@ -204,4 +215,18 @@ public void testNotCloseZkWhenPending() throws Exception { waitForIdleConnectionClosed(); verify(mockedZK, times(1)).close(); } + + @Test + public void testReadWithTimeout() throws Exception { + assertArrayEquals(DATA, RO_ZK.get(PATH, 60000).get()); + assertEquals(CHILDREN, RO_ZK.exists(PATH, 60000).get().getNumChildren()); + List children = RO_ZK.list(PATH, 60000).get(); + assertEquals(CHILDREN, children.size()); + Collections.sort(children); + for (int i = 0; i < CHILDREN; i++) { + assertEquals("c" + i, children.get(i)); + } + assertNotNull(RO_ZK.zookeeper); + waitForIdleConnectionClosed(); + } } From 84a97292b848d55e3fee8cae27c9bf692b55e528 Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Thu, 18 Jul 2024 11:25:26 -0700 Subject: [PATCH 2/3] removed unused lines --- .../org/apache/hadoop/hbase/client/ZKConnectionRegistry.java | 1 - .../org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java | 1 - 2 files changed, 2 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index a6afef38dc8f..e356b095e248 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -84,7 +84,6 @@ class ZKConnectionRegistry implements ConnectionRegistry { this.conf = conf; this.zkRegistryAsyncTimeout = conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT); - // this.zk = new ReadOnlyZKClient(conf); if (NEEDS_LOG_WARN) { synchronized (WARN_LOCK) { if (NEEDS_LOG_WARN) { diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java index 692b20e586bc..bd835df4fc5d 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java @@ -107,7 +107,6 @@ public static void setUp() throws Exception { conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY, 3); conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY_INTERVAL_MILLIS, 100); conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000); - // RO_ZK = new ReadOnlyZKClient(conf); RO_ZK = new ReadOnlyZKClient(conf, RETRY_TIMER); // only connect when necessary assertNull(RO_ZK.zookeeper); From 6e930a8d2126c7323a8b6e6fda7e167d21265c49 Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Thu, 18 Jul 2024 17:15:44 -0700 Subject: [PATCH 3/3] ran spotless apply --- .../org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index dc08cfc5eaa7..a02ad994c7d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -80,7 +80,7 @@ public final class ReadOnlyZKClient implements Closeable { private final int keepAliveTimeMs; private HashedWheelTimer retryTimer; - + private final ZKClientConfig zkClientConfig; private static abstract class Task implements Delayed {