From d201c95f1e12e875a3ec342da9c727852c7c1f42 Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Thu, 18 Apr 2024 14:59:01 -0700 Subject: [PATCH 01/14] HBASE-28428 : ConnectionRegistry APIs should have timeout --- .../hadoop/hbase/client/AsyncConnectionImpl.java | 15 ++++++++++----- .../hbase/client/AsyncMetaRegionLocator.java | 5 +++-- .../hadoop/hbase/client/AsyncRegionLocator.java | 6 +++--- .../hadoop/hbase/client/ConnectionFactory.java | 4 +++- .../hadoop/hbase/client/ConnectionUtils.java | 5 +++-- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 9 ++++++--- .../TestAsyncMetaRegionLocatorFailFast.java | 2 +- .../java/org/apache/hadoop/hbase/HConstants.java | 5 +++++ .../org/apache/hadoop/hbase/util/FutureUtils.java | 12 ++++++++++++ .../hbase/client/ClusterConnectionFactory.java | 8 ++++++-- .../hbase/client/TestAsyncMetaRegionLocator.java | 3 ++- 11 files changed, 54 insertions(+), 20 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 3f0e3e0b370e..9767827fc307 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -46,6 +47,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -326,15 +328,17 @@ CompletableFuture getMasterStub() { } } - }); + }, conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, + HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); return future; - }, stub -> true, "master stub"); + }, stub -> true, "master stub", conf); } String getClusterId() { try { - return registry.getClusterId().get(); - } catch (InterruptedException | ExecutionException e) { + return registry.getClusterId().get(conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, + HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT), TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.error("Error fetching cluster ID: ", e); } return null; @@ -447,7 +451,8 @@ public CompletableFuture getHbck() { } else { future.complete(getHbckInternal(sn)); } - }); + }, conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, + HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); return future; }, "AsyncConnection.getHbck"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java index 161160e63599..febce851e406 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; @@ -56,9 +57,9 @@ class AsyncMetaRegionLocator { * replicas. If we do not check the location for the given replica, we will always return the * cached region locations and cause an infinite loop. */ - CompletableFuture getRegionLocations(int replicaId, boolean reload) { + CompletableFuture getRegionLocations(int replicaId, boolean reload, Configuration conf) { return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload, - registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location"); + registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location", conf); } private HRegionLocation getCacheLocation(HRegionLocation loc) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index da58dd8e1e53..7101511e4576 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -141,8 +141,8 @@ CompletableFuture getRegionLocations(TableName tableName, byte[ .setName("AsyncRegionLocator.getRegionLocations").setTableName(tableName); return tracedLocationFuture(() -> { CompletableFuture future = isMeta(tableName) - ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) - : nonMetaRegionLocator.getRegionLocations(tableName, row, + ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload, + conn.getConfiguration()) : nonMetaRegionLocator.getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload); return withTimeout(future, timeoutNs, () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) @@ -160,7 +160,7 @@ CompletableFuture getRegionLocation(TableName tableName, byte[] // Change it later if the meta table can have more than one regions. CompletableFuture future = new CompletableFuture<>(); CompletableFuture locsFuture = isMeta(tableName) - ? metaRegionLocator.getRegionLocations(replicaId, reload) + ? metaRegionLocator.getRegionLocations(replicaId, reload, conn.getConfiguration()) : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); addListener(locsFuture, (locs, error) -> { if (error != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index 716fb4863fe8..af6ea99dd6bb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.TraceUtil; @@ -360,7 +361,8 @@ public static CompletableFuture createAsyncConnection(Configura registry.close(); future.completeExceptionally(e); } - }); + }, conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, + HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); return future; }, "ConnectionFactory.createAsyncConnection"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 84acc6e4d398..0406ddb93774 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -533,7 +533,7 @@ static int getPriority(TableName tableName) { static CompletableFuture getOrFetch(AtomicReference cacheRef, AtomicReference> futureRef, boolean reload, - Supplier> fetch, Predicate validator, String type) { + Supplier> fetch, Predicate validator, String type, Configuration conf) { for (;;) { if (!reload) { T value = cacheRef.get(); @@ -564,7 +564,8 @@ static CompletableFuture getOrFetch(AtomicReference cacheRef, cacheRef.set(value); futureRef.set(null); future.complete(value); - }); + }, conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, + HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); return future; } else { CompletableFuture future = futureRef.get(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 103a64e520a1..d7b343fcf5f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -1246,7 +1246,8 @@ private CompletableFuture> getTableHRegionLocations(TableN } else { future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); } - }); + }, connection.getConfiguration().getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, + HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); return future; } else { // For non-meta table, we fetch all locations by scanning hbase:meta table @@ -1276,7 +1277,8 @@ private CompletableFuture compact(TableName tableName, byte[] columnFamily future.complete(ret); } }); - }); + }, connection.getConfiguration().getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, + HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); break; case NORMAL: addListener(getTableHRegionLocations(tableName), (locations, err) -> { @@ -3254,7 +3256,8 @@ GetRegionInfoResponse> adminCall(controller, stub, } } }); - }); + }, connection.getConfiguration().getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, + HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); break; case NORMAL: addListener(getTableHRegionLocations(tableName), (locations, err) -> { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java index 6380f1f6fb0f..a54605d3b2c8 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java @@ -65,6 +65,6 @@ public static void setUp() throws IOException { @Test(expected = DoNotRetryIOException.class) public void test() throws IOException { - FutureUtils.get(LOCATOR.getRegionLocations(RegionInfo.DEFAULT_REPLICA_ID, false)); + FutureUtils.get(LOCATOR.getRegionLocations(RegionInfo.DEFAULT_REPLICA_ID, false, CONF)); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 9597ec23d81a..3492562b9218 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -208,6 +208,11 @@ public enum OperationStatusCode { public static final String ZK_CONNECTION_REGISTRY_CLASS = "org.apache.hadoop.hbase.client.ZKConnectionRegistry"; + public static final String CONNECTION_REGISTRY_API_TIMEOUT = + "hbase.connection.registry.api.timeout"; + + public static final int DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT = 120000; + /** Parameter name for the master type being backup (waits for primary to go inactive). */ public static final String MASTER_TYPE_BACKUP = "hbase.master.backup"; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java index 4f8a7320fb40..939c5c46c5cc 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -75,6 +75,18 @@ public static void addListener(CompletableFuture future, }); } + public static void addListener(CompletableFuture future, + BiConsumer action, long timeout) { + Throwable error = null; + T t = null; + try { + t = future.get(timeout, TimeUnit.MILLISECONDS); + } catch (Throwable throwable) { + error = throwable; + } + action.accept(t, unwrapCompletionException(error)); + } + /** * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only * exception is that we will call diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java index 7225f92b7ff9..2149b422676e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.net.SocketAddress; import java.security.PrivilegedExceptionAction; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -39,8 +41,10 @@ private ClusterConnectionFactory() { } private static AsyncClusterConnection createAsyncClusterConnection(Configuration conf, - ConnectionRegistry registry, SocketAddress localAddress, User user) throws IOException { - String clusterId = FutureUtils.get(registry.getClusterId()); + ConnectionRegistry registry, SocketAddress localAddress, User user ) throws IOException { + String clusterId = FutureUtils.get(registry.getClusterId(), + conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, + HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT), TimeUnit.MILLISECONDS); Class clazz = conf.getClass(HBASE_SERVER_CLUSTER_CONNECTION_IMPL, AsyncClusterConnectionImpl.class, AsyncClusterConnection.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java index 90d2cb51e8cf..9cd9c4114c66 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java @@ -174,7 +174,8 @@ public void updateCachedLocationOnError(HRegionLocation loc, Throwable error) { @Override public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload) throws Exception { - return locator.getRegionLocations(replicaId, reload).get(); + final Configuration conf = HBaseConfiguration.create(); + return locator.getRegionLocations(replicaId, reload, conf).get(); } }); } catch (Exception e) { From 9891a83728666889091ba5396f726ecaca0005d9 Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Mon, 13 May 2024 10:43:19 -0700 Subject: [PATCH 02/14] updated timeout implementation --- .../hbase/client/AsyncConnectionImpl.java | 15 +-- .../hbase/client/AsyncMetaRegionLocator.java | 4 +- .../hbase/client/AsyncRegionLocator.java | 6 +- .../hbase/client/ConnectionFactory.java | 4 +- .../hadoop/hbase/client/ConnectionUtils.java | 5 +- .../hbase/client/RawAsyncHBaseAdmin.java | 9 +- .../hbase/client/ZKConnectionRegistry.java | 108 ++++++++++++++---- .../TestAsyncMetaRegionLocatorFailFast.java | 2 +- .../org/apache/hadoop/hbase/HConstants.java | 5 - .../apache/hadoop/hbase/util/FutureUtils.java | 12 -- .../client/ClusterConnectionFactory.java | 6 +- .../client/TestAsyncMetaRegionLocator.java | 2 +- 12 files changed, 106 insertions(+), 72 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 9767827fc307..3f0e3e0b370e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -39,7 +39,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -47,7 +46,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.ChoreService; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -328,17 +326,15 @@ CompletableFuture getMasterStub() { } } - }, conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, - HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); + }); return future; - }, stub -> true, "master stub", conf); + }, stub -> true, "master stub"); } String getClusterId() { try { - return registry.getClusterId().get(conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, - HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT), TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { + return registry.getClusterId().get(); + } catch (InterruptedException | ExecutionException e) { LOG.error("Error fetching cluster ID: ", e); } return null; @@ -451,8 +447,7 @@ public CompletableFuture getHbck() { } else { future.complete(getHbckInternal(sn)); } - }, conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, - HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); + }); return future; }, "AsyncConnection.getHbck"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java index febce851e406..40cf6411c795 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java @@ -57,9 +57,9 @@ class AsyncMetaRegionLocator { * replicas. If we do not check the location for the given replica, we will always return the * cached region locations and cause an infinite loop. */ - CompletableFuture getRegionLocations(int replicaId, boolean reload, Configuration conf) { + CompletableFuture getRegionLocations(int replicaId, boolean reload) { return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload, - registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location", conf); + registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location"); } private HRegionLocation getCacheLocation(HRegionLocation loc) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index 7101511e4576..da58dd8e1e53 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -141,8 +141,8 @@ CompletableFuture getRegionLocations(TableName tableName, byte[ .setName("AsyncRegionLocator.getRegionLocations").setTableName(tableName); return tracedLocationFuture(() -> { CompletableFuture future = isMeta(tableName) - ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload, - conn.getConfiguration()) : nonMetaRegionLocator.getRegionLocations(tableName, row, + ? metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) + : nonMetaRegionLocator.getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, type, reload); return withTimeout(future, timeoutNs, () -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) @@ -160,7 +160,7 @@ CompletableFuture getRegionLocation(TableName tableName, byte[] // Change it later if the meta table can have more than one regions. CompletableFuture future = new CompletableFuture<>(); CompletableFuture locsFuture = isMeta(tableName) - ? metaRegionLocator.getRegionLocations(replicaId, reload, conn.getConfiguration()) + ? metaRegionLocator.getRegionLocations(replicaId, reload) : nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload); addListener(locsFuture, (locs, error) -> { if (error != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index af6ea99dd6bb..716fb4863fe8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.TraceUtil; @@ -361,8 +360,7 @@ public static CompletableFuture createAsyncConnection(Configura registry.close(); future.completeExceptionally(e); } - }, conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, - HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); + }); return future; }, "ConnectionFactory.createAsyncConnection"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 0406ddb93774..84acc6e4d398 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -533,7 +533,7 @@ static int getPriority(TableName tableName) { static CompletableFuture getOrFetch(AtomicReference cacheRef, AtomicReference> futureRef, boolean reload, - Supplier> fetch, Predicate validator, String type, Configuration conf) { + Supplier> fetch, Predicate validator, String type) { for (;;) { if (!reload) { T value = cacheRef.get(); @@ -564,8 +564,7 @@ static CompletableFuture getOrFetch(AtomicReference cacheRef, cacheRef.set(value); futureRef.set(null); future.complete(value); - }, conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, - HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); + }); return future; } else { CompletableFuture future = futureRef.get(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index d7b343fcf5f4..103a64e520a1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -1246,8 +1246,7 @@ private CompletableFuture> getTableHRegionLocations(TableN } else { future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); } - }, connection.getConfiguration().getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, - HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); + }); return future; } else { // For non-meta table, we fetch all locations by scanning hbase:meta table @@ -1277,8 +1276,7 @@ private CompletableFuture compact(TableName tableName, byte[] columnFamily future.complete(ret); } }); - }, connection.getConfiguration().getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, - HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); + }); break; case NORMAL: addListener(getTableHRegionLocations(tableName), (locations, err) -> { @@ -3256,8 +3254,7 @@ GetRegionInfoResponse> adminCall(controller, stub, } } }); - }, connection.getConfiguration().getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, - HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT)); + }); break; case NORMAL: addListener(getTableHRegionLocations(tableName), (locations, err) -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index a46f4d74e382..77d1628c3f0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -25,11 +25,13 @@ import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; - import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.hbase.thirdparty.io.netty.util.Timeout; +import org.apache.hbase.thirdparty.io.netty.util.TimerTask; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterId; @@ -49,6 +51,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * Zookeeper based registry implementation. @@ -70,6 +73,11 @@ class ZKConnectionRegistry implements ConnectionRegistry { private final ZNodePaths znodePaths; + private static final long expectedTimeout = 120000; + private static final int maxAttempts = 5; + private static final long pauseNs = 100000; + + // User not used, but for rpc based registry we need it ZKConnectionRegistry(Configuration conf, User ignored) { this.znodePaths = new ZNodePaths(conf); @@ -85,23 +93,54 @@ class ZKConnectionRegistry implements ConnectionRegistry { } } + public ZNodePaths getZNodePaths() { + return znodePaths; + } + private interface Converter { T convert(byte[] data) throws Exception; } private CompletableFuture getAndConvert(String path, Converter converter) { CompletableFuture future = new CompletableFuture<>(); - addListener(zk.get(path), (data, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - try { - future.complete(converter.convert(data)); - } catch (Exception e) { - future.completeExceptionally(e); - } - }); + TimerTask pollingTask = new TimerTask() { + int tries = 0; + long startTime = EnvironmentEdgeManager.currentTime(); + long endTime = startTime + expectedTimeout; + long maxPauseTime = expectedTimeout / maxAttempts; + + @Override + public void run(Timeout timeout) throws Exception { + if (EnvironmentEdgeManager.currentTime() < endTime) { + addListener(zk.get(path), (data, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (data != null && data.length > 0) { + try { + future.complete(converter.convert(data)); + } catch (Exception e) { + future.completeExceptionally(e); + } + } else { + // retry again after pauseTime. + long pauseTime = + ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); + pauseTime = Math.min(pauseTime, maxPauseTime); + AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, + TimeUnit.MICROSECONDS); + } + }); + } else { + future.completeExceptionally(new IOException("Procedure wasn't completed in " + + "expectedTime:" + expectedTimeout + " ms")); + } + } + }; + // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. + AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); + return future; } @@ -217,16 +256,43 @@ private void getMetaRegionLocation(CompletableFuture future, public CompletableFuture getMetaRegionLocations() { return tracedFuture(() -> { CompletableFuture future = new CompletableFuture<>(); - addListener( - zk.list(znodePaths.baseZNode).thenApply(children -> children.stream() - .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), - (metaReplicaZNodes, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; + TimerTask pollingTask = new TimerTask() { + int tries = 0; + long startTime = EnvironmentEdgeManager.currentTime(); + long endTime = startTime + expectedTimeout; + long maxPauseTime = expectedTimeout / maxAttempts; + + @Override + public void run(Timeout timeout) throws Exception { + if (EnvironmentEdgeManager.currentTime() < endTime) { + addListener( + zk.list(znodePaths.baseZNode).thenApply(children -> children.stream() + .filter(c -> getZNodePaths().isMetaZNodePrefix(c)).collect(Collectors.toList())), + (metaReplicaZNodes, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (metaReplicaZNodes != null && !metaReplicaZNodes.isEmpty()) { + getMetaRegionLocation(future, metaReplicaZNodes); + } else { + // retry again after pauseTime. + long pauseTime = + ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); + pauseTime = Math.min(pauseTime, maxPauseTime); + AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, + TimeUnit.MICROSECONDS); + } + }); + } else { + future.completeExceptionally(new IOException("Procedure wasn't completed in " + + "expectedTime:" + expectedTimeout + " ms")); } - getMetaRegionLocation(future, metaReplicaZNodes); - }); + } + }; + // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. + AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); + return future; }, "ZKConnectionRegistry.getMetaRegionLocations"); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java index a54605d3b2c8..6380f1f6fb0f 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java @@ -65,6 +65,6 @@ public static void setUp() throws IOException { @Test(expected = DoNotRetryIOException.class) public void test() throws IOException { - FutureUtils.get(LOCATOR.getRegionLocations(RegionInfo.DEFAULT_REPLICA_ID, false, CONF)); + FutureUtils.get(LOCATOR.getRegionLocations(RegionInfo.DEFAULT_REPLICA_ID, false)); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 3492562b9218..9597ec23d81a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -208,11 +208,6 @@ public enum OperationStatusCode { public static final String ZK_CONNECTION_REGISTRY_CLASS = "org.apache.hadoop.hbase.client.ZKConnectionRegistry"; - public static final String CONNECTION_REGISTRY_API_TIMEOUT = - "hbase.connection.registry.api.timeout"; - - public static final int DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT = 120000; - /** Parameter name for the master type being backup (waits for primary to go inactive). */ public static final String MASTER_TYPE_BACKUP = "hbase.master.backup"; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java index 939c5c46c5cc..4f8a7320fb40 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -75,18 +75,6 @@ public static void addListener(CompletableFuture future, }); } - public static void addListener(CompletableFuture future, - BiConsumer action, long timeout) { - Throwable error = null; - T t = null; - try { - t = future.get(timeout, TimeUnit.MILLISECONDS); - } catch (Throwable throwable) { - error = throwable; - } - action.accept(t, unwrapCompletionException(error)); - } - /** * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only * exception is that we will call diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java index 2149b422676e..d697a51fe4da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java @@ -20,9 +20,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.security.PrivilegedExceptionAction; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -42,9 +40,7 @@ private ClusterConnectionFactory() { private static AsyncClusterConnection createAsyncClusterConnection(Configuration conf, ConnectionRegistry registry, SocketAddress localAddress, User user ) throws IOException { - String clusterId = FutureUtils.get(registry.getClusterId(), - conf.getInt(HConstants.CONNECTION_REGISTRY_API_TIMEOUT, - HConstants.DEFAULT_CONNECTION_REGISTRY_API_TIMEOUT), TimeUnit.MILLISECONDS); + String clusterId = FutureUtils.get(registry.getClusterId()); Class clazz = conf.getClass(HBASE_SERVER_CLUSTER_CONNECTION_IMPL, AsyncClusterConnectionImpl.class, AsyncClusterConnection.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java index 9cd9c4114c66..14dea8355fff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java @@ -175,7 +175,7 @@ public void updateCachedLocationOnError(HRegionLocation loc, Throwable error) { public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload) throws Exception { final Configuration conf = HBaseConfiguration.create(); - return locator.getRegionLocations(replicaId, reload, conf).get(); + return locator.getRegionLocations(replicaId, reload).get(); } }); } catch (Exception e) { From 8ad5b5352ae7acbb7a46230642e46dc3a8b14dc4 Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Tue, 4 Jun 2024 17:43:22 -0500 Subject: [PATCH 03/14] updated implementation --- .../hbase/client/AsyncMetaRegionLocator.java | 1 - .../hbase/client/ZKConnectionRegistry.java | 43 ++++++++++++------- .../hbase/zookeeper/ReadOnlyZKClient.java | 36 ++++++++++++++++ .../client/ClusterConnectionFactory.java | 2 +- .../client/TestAsyncMetaRegionLocator.java | 1 - 5 files changed, 65 insertions(+), 18 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java index 40cf6411c795..161160e63599 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java @@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index 77d1628c3f0f..3613b65d2ed4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.TimerTask; @@ -72,22 +73,28 @@ class ZKConnectionRegistry implements ConnectionRegistry { private final ReadOnlyZKClient zk; private final ZNodePaths znodePaths; + private final Configuration conf; - private static final long expectedTimeout = 120000; - private static final int maxAttempts = 5; - private static final long pauseNs = 100000; + public static final String EXPECTED_TIMEOUT = "expected.timeout"; + public static final int DEFAULT_EXPECTED_TIMEOUT = 200000; + public static final String MAX_ATTEMPTS = "max.attempts"; + public static final int DEFAULT_MAX_ATTEMPTS = 5; + public static final String PAUSE_NS = "pause.ns"; + public static final long DEFAULT_PAUSE_NS = 100000; // User not used, but for rpc based registry we need it ZKConnectionRegistry(Configuration conf, User ignored) { this.znodePaths = new ZNodePaths(conf); this.zk = new ReadOnlyZKClient(conf); + this.conf = conf; if (NEEDS_LOG_WARN) { synchronized (WARN_LOCK) { if (NEEDS_LOG_WARN) { LOG.warn( "ZKConnectionRegistry is deprecated. See https://hbase.apache.org/book.html#client.rpcconnectionregistry"); NEEDS_LOG_WARN = false; + } } } @@ -106,13 +113,16 @@ private CompletableFuture getAndConvert(String path, Converter convert TimerTask pollingTask = new TimerTask() { int tries = 0; long startTime = EnvironmentEdgeManager.currentTime(); - long endTime = startTime + expectedTimeout; - long maxPauseTime = expectedTimeout / maxAttempts; + long endTime = startTime + conf.getInt(EXPECTED_TIMEOUT, DEFAULT_EXPECTED_TIMEOUT); + long maxPauseTime = + conf.getInt(EXPECTED_TIMEOUT, DEFAULT_EXPECTED_TIMEOUT) / + conf.getInt(MAX_ATTEMPTS, DEFAULT_MAX_ATTEMPTS);; + @Override public void run(Timeout timeout) throws Exception { if (EnvironmentEdgeManager.currentTime() < endTime) { - addListener(zk.get(path), (data, error) -> { + addListener(zk.getWithTimeout(path, endTime), (data, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -126,15 +136,16 @@ public void run(Timeout timeout) throws Exception { } else { // retry again after pauseTime. long pauseTime = - ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); + ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS + .toMillis(conf.getLong(PAUSE_NS, DEFAULT_PAUSE_NS)), ++tries); pauseTime = Math.min(pauseTime, maxPauseTime); AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MICROSECONDS); } }); } else { - future.completeExceptionally(new IOException("Procedure wasn't completed in " - + "expectedTime:" + expectedTimeout + " ms")); + future.completeExceptionally(new TimeoutException("Procedure wasn't completed in " + + "expectedTime:" + conf.getInt(EXPECTED_TIMEOUT, DEFAULT_EXPECTED_TIMEOUT) + " ms")); } } }; @@ -259,14 +270,15 @@ public CompletableFuture getMetaRegionLocations() { TimerTask pollingTask = new TimerTask() { int tries = 0; long startTime = EnvironmentEdgeManager.currentTime(); - long endTime = startTime + expectedTimeout; - long maxPauseTime = expectedTimeout / maxAttempts; + long endTime = startTime + conf.getInt(EXPECTED_TIMEOUT, DEFAULT_EXPECTED_TIMEOUT); + long maxPauseTime = conf.getInt(EXPECTED_TIMEOUT, DEFAULT_EXPECTED_TIMEOUT) / + conf.getInt(MAX_ATTEMPTS, DEFAULT_MAX_ATTEMPTS); @Override public void run(Timeout timeout) throws Exception { if (EnvironmentEdgeManager.currentTime() < endTime) { addListener( - zk.list(znodePaths.baseZNode).thenApply(children -> children.stream() + zk.listWithTimeout(znodePaths.baseZNode, endTime).thenApply(children -> children.stream() .filter(c -> getZNodePaths().isMetaZNodePrefix(c)).collect(Collectors.toList())), (metaReplicaZNodes, error) -> { if (error != null) { @@ -278,15 +290,16 @@ public void run(Timeout timeout) throws Exception { } else { // retry again after pauseTime. long pauseTime = - ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); + ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS + .toMillis(conf.getLong(PAUSE_NS, DEFAULT_PAUSE_NS)), ++tries); pauseTime = Math.min(pauseTime, maxPauseTime); AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, TimeUnit.MICROSECONDS); } }); } else { - future.completeExceptionally(new IOException("Procedure wasn't completed in " - + "expectedTime:" + expectedTimeout + " ms")); + future.completeExceptionally(new TimeoutException("Procedure wasn't completed in " + + "expectedTime:" + conf.getInt(EXPECTED_TIMEOUT, DEFAULT_EXPECTED_TIMEOUT) + " ms")); } } }; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 979094fda80b..8e59bf3e168b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -253,6 +253,18 @@ public void closed(IOException e) { } } + public CompletableFuture getWithTimeout(String path, long timeout) { + CompletableFuture future = get(path); + while(timeout > 0){ + if(future.isCancelled() || future.isDone() || future.isCompletedExceptionally()){ + return future; + } + --timeout; + } + future.completeExceptionally(new KeeperException.OperationTimeoutException()); + return future; + } + public CompletableFuture get(String path) { if (closed.get()) { return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); @@ -269,6 +281,18 @@ protected void doExec(ZooKeeper zk) { return future; } + public CompletableFuture existsWithTimeout(String path, int timeout) { + CompletableFuture future = exists(path); + while(timeout > 0){ + if(future.isCancelled() || future.isDone() || future.isCompletedExceptionally()){ + return future; + } + --timeout; + } + future.completeExceptionally(new KeeperException.OperationTimeoutException()); + return future; + } + public CompletableFuture exists(String path) { if (closed.get()) { return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); @@ -284,6 +308,18 @@ protected void doExec(ZooKeeper zk) { return future; } + public CompletableFuture> listWithTimeout(String path, long timeout) { + CompletableFuture> future = list(path); + while(timeout > 0){ + if(future.isCancelled() || future.isDone() || future.isCompletedExceptionally()){ + return future; + } + --timeout; + } + future.completeExceptionally(new KeeperException.OperationTimeoutException()); + return future; + } + public CompletableFuture> list(String path) { if (closed.get()) { return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed")); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java index d697a51fe4da..7225f92b7ff9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java @@ -39,7 +39,7 @@ private ClusterConnectionFactory() { } private static AsyncClusterConnection createAsyncClusterConnection(Configuration conf, - ConnectionRegistry registry, SocketAddress localAddress, User user ) throws IOException { + ConnectionRegistry registry, SocketAddress localAddress, User user) throws IOException { String clusterId = FutureUtils.get(registry.getClusterId()); Class clazz = conf.getClass(HBASE_SERVER_CLUSTER_CONNECTION_IMPL, AsyncClusterConnectionImpl.class, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java index 14dea8355fff..90d2cb51e8cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java @@ -174,7 +174,6 @@ public void updateCachedLocationOnError(HRegionLocation loc, Throwable error) { @Override public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload) throws Exception { - final Configuration conf = HBaseConfiguration.create(); return locator.getRegionLocations(replicaId, reload).get(); } }); From ab554988a394ebd601502a8a0429a54c392e730b Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Thu, 6 Jun 2024 09:15:05 -0700 Subject: [PATCH 04/14] added timer task implementation to readonlyzkclient --- .../hbase/zookeeper/ReadOnlyZKClient.java | 84 ++++++++++++++----- 1 file changed, 63 insertions(+), 21 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 8e59bf3e168b..1f0987436d2a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -32,6 +32,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.client.AsyncConnectionImpl; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import org.apache.hbase.thirdparty.io.netty.util.Timeout; +import org.apache.hbase.thirdparty.io.netty.util.TimerTask; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; @@ -42,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * A very simple read only zookeeper implementation without watcher support. */ @@ -75,6 +82,11 @@ public final class ReadOnlyZKClient implements Closeable { private final int keepAliveTimeMs; + public static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( + new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) + .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), + 10, TimeUnit.MILLISECONDS); + private static abstract class Task implements Delayed { protected long time = System.nanoTime(); @@ -253,15 +265,25 @@ public void closed(IOException e) { } } - public CompletableFuture getWithTimeout(String path, long timeout) { + public CompletableFuture getWithTimeout(String path, long endTime) { CompletableFuture future = get(path); - while(timeout > 0){ - if(future.isCancelled() || future.isDone() || future.isCompletedExceptionally()){ - return future; + TimerTask timerTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (EnvironmentEdgeManager.currentTime() > endTime) { + if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { + future.completeExceptionally( + new DoNotRetryIOException("Zookeeper get could not be completed by " + endTime)); + } + } else { + if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { + RETRY_TIMER.newTimeout(this, 10, TimeUnit.MILLISECONDS); + } + } } - --timeout; - } - future.completeExceptionally(new KeeperException.OperationTimeoutException()); + }; + + RETRY_TIMER.newTimeout(timerTask, 1, TimeUnit.MILLISECONDS); return future; } @@ -281,15 +303,25 @@ protected void doExec(ZooKeeper zk) { return future; } - public CompletableFuture existsWithTimeout(String path, int timeout) { + public CompletableFuture existsWithTimeout(String path, int endTime) { CompletableFuture future = exists(path); - while(timeout > 0){ - if(future.isCancelled() || future.isDone() || future.isCompletedExceptionally()){ - return future; + TimerTask timerTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (EnvironmentEdgeManager.currentTime() > endTime) { + if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { + future.completeExceptionally( + new DoNotRetryIOException("Zookeeper get could not be completed by " + endTime)); + } + } else { + if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { + RETRY_TIMER.newTimeout(this, 10, TimeUnit.MILLISECONDS); + } + } } - --timeout; - } - future.completeExceptionally(new KeeperException.OperationTimeoutException()); + }; + + RETRY_TIMER.newTimeout(timerTask, 1, TimeUnit.MILLISECONDS); return future; } @@ -308,15 +340,25 @@ protected void doExec(ZooKeeper zk) { return future; } - public CompletableFuture> listWithTimeout(String path, long timeout) { + public CompletableFuture> listWithTimeout(String path, long endTime) { CompletableFuture> future = list(path); - while(timeout > 0){ - if(future.isCancelled() || future.isDone() || future.isCompletedExceptionally()){ - return future; + TimerTask timerTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (EnvironmentEdgeManager.currentTime() > endTime) { + if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { + future.completeExceptionally( + new DoNotRetryIOException("Zookeeper get could not be completed by " + endTime)); + } + } else { + if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { + RETRY_TIMER.newTimeout(this, 10, TimeUnit.MILLISECONDS); + } + } } - --timeout; - } - future.completeExceptionally(new KeeperException.OperationTimeoutException()); + }; + + RETRY_TIMER.newTimeout(timerTask, 1, TimeUnit.MILLISECONDS); return future; } From 7af1c8064e3b35c97a26bc608d26e171c184c90c Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Tue, 11 Jun 2024 11:41:40 -0700 Subject: [PATCH 05/14] updated timeout time --- .../hadoop/hbase/zookeeper/ReadOnlyZKClient.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index e17054e73559..979ea2ed4d09 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -84,9 +84,9 @@ public final class ReadOnlyZKClient implements Closeable { private final int keepAliveTimeMs; public static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( - new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) + new ThreadFactoryBuilder().setNameFormat("Read-Only-ZKClient-Retry-Timer-pool-%d").setDaemon(true) .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), - 10, TimeUnit.MILLISECONDS); + 1, TimeUnit.MILLISECONDS); private final ZKClientConfig zkClientConfig; @@ -283,7 +283,7 @@ public void run(Timeout timeout) throws Exception { } } else { if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { - RETRY_TIMER.newTimeout(this, 10, TimeUnit.MILLISECONDS); + RETRY_TIMER.newTimeout(this, 1, TimeUnit.MILLISECONDS); } } } @@ -321,7 +321,7 @@ public void run(Timeout timeout) throws Exception { } } else { if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { - RETRY_TIMER.newTimeout(this, 10, TimeUnit.MILLISECONDS); + RETRY_TIMER.newTimeout(this, 1, TimeUnit.MILLISECONDS); } } } @@ -358,7 +358,7 @@ public void run(Timeout timeout) throws Exception { } } else { if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { - RETRY_TIMER.newTimeout(this, 10, TimeUnit.MILLISECONDS); + RETRY_TIMER.newTimeout(this, 1, TimeUnit.MILLISECONDS); } } } From 21fc4358bf8c28a877d10c4b82156d703258facb Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Tue, 18 Jun 2024 15:45:46 -0700 Subject: [PATCH 06/14] updated implementation --- .../hbase/client/AsyncConnectionImpl.java | 2 +- .../hbase/client/ZKConnectionRegistry.java | 111 ++++-------------- .../hbase/zookeeper/ReadOnlyZKClient.java | 42 ++----- 3 files changed, 36 insertions(+), 119 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 3f0e3e0b370e..807d192229d7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -79,7 +79,7 @@ public class AsyncConnectionImpl implements AsyncConnection { private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class); - static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( + public static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10, TimeUnit.MILLISECONDS); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index 3613b65d2ed4..c5422380dd52 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -75,12 +75,8 @@ class ZKConnectionRegistry implements ConnectionRegistry { private final ZNodePaths znodePaths; private final Configuration conf; - public static final String EXPECTED_TIMEOUT = "expected.timeout"; - public static final int DEFAULT_EXPECTED_TIMEOUT = 200000; - public static final String MAX_ATTEMPTS = "max.attempts"; - public static final int DEFAULT_MAX_ATTEMPTS = 5; - public static final String PAUSE_NS = "pause.ns"; - public static final long DEFAULT_PAUSE_NS = 100000; + public static final String ZK_REGISTRY_ASYNC_GET_TIMEOUT = "zookeeper.registry.async.get.timeout"; + public static final int DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT = 60000; // 1 min // User not used, but for rpc based registry we need it @@ -110,48 +106,20 @@ private interface Converter { private CompletableFuture getAndConvert(String path, Converter converter) { CompletableFuture future = new CompletableFuture<>(); - TimerTask pollingTask = new TimerTask() { - int tries = 0; - long startTime = EnvironmentEdgeManager.currentTime(); - long endTime = startTime + conf.getInt(EXPECTED_TIMEOUT, DEFAULT_EXPECTED_TIMEOUT); - long maxPauseTime = - conf.getInt(EXPECTED_TIMEOUT, DEFAULT_EXPECTED_TIMEOUT) / - conf.getInt(MAX_ATTEMPTS, DEFAULT_MAX_ATTEMPTS);; - - - @Override - public void run(Timeout timeout) throws Exception { - if (EnvironmentEdgeManager.currentTime() < endTime) { - addListener(zk.getWithTimeout(path, endTime), (data, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - if (data != null && data.length > 0) { - try { - future.complete(converter.convert(data)); - } catch (Exception e) { - future.completeExceptionally(e); - } - } else { - // retry again after pauseTime. - long pauseTime = - ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS - .toMillis(conf.getLong(PAUSE_NS, DEFAULT_PAUSE_NS)), ++tries); - pauseTime = Math.min(pauseTime, maxPauseTime); - AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, - TimeUnit.MICROSECONDS); - } - }); - } else { - future.completeExceptionally(new TimeoutException("Procedure wasn't completed in " - + "expectedTime:" + conf.getInt(EXPECTED_TIMEOUT, DEFAULT_EXPECTED_TIMEOUT) + " ms")); - } + addListener(zk.getWithTimeout(path, + conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT), + AsyncConnectionImpl.RETRY_TIMER), + (data, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; } - }; - // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. - AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); - + try { + future.complete(converter.convert(data)); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); return future; } @@ -267,45 +235,18 @@ private void getMetaRegionLocation(CompletableFuture future, public CompletableFuture getMetaRegionLocations() { return tracedFuture(() -> { CompletableFuture future = new CompletableFuture<>(); - TimerTask pollingTask = new TimerTask() { - int tries = 0; - long startTime = EnvironmentEdgeManager.currentTime(); - long endTime = startTime + conf.getInt(EXPECTED_TIMEOUT, DEFAULT_EXPECTED_TIMEOUT); - long maxPauseTime = conf.getInt(EXPECTED_TIMEOUT, DEFAULT_EXPECTED_TIMEOUT) / - conf.getInt(MAX_ATTEMPTS, DEFAULT_MAX_ATTEMPTS); - - @Override - public void run(Timeout timeout) throws Exception { - if (EnvironmentEdgeManager.currentTime() < endTime) { - addListener( - zk.listWithTimeout(znodePaths.baseZNode, endTime).thenApply(children -> children.stream() - .filter(c -> getZNodePaths().isMetaZNodePrefix(c)).collect(Collectors.toList())), - (metaReplicaZNodes, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - if (metaReplicaZNodes != null && !metaReplicaZNodes.isEmpty()) { - getMetaRegionLocation(future, metaReplicaZNodes); - } else { - // retry again after pauseTime. - long pauseTime = - ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS - .toMillis(conf.getLong(PAUSE_NS, DEFAULT_PAUSE_NS)), ++tries); - pauseTime = Math.min(pauseTime, maxPauseTime); - AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, - TimeUnit.MICROSECONDS); - } - }); - } else { - future.completeExceptionally(new TimeoutException("Procedure wasn't completed in " - + "expectedTime:" + conf.getInt(EXPECTED_TIMEOUT, DEFAULT_EXPECTED_TIMEOUT) + " ms")); + addListener( + zk.listWithTimeout(znodePaths.baseZNode, + conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT), + AsyncConnectionImpl.RETRY_TIMER).thenApply(children -> children.stream() + .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), + (metaReplicaZNodes, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; } - } - }; - // Queue the polling task into RETRY_TIMER to poll procedure state asynchronously. - AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS); - + getMetaRegionLocation(future, metaReplicaZNodes); + }); return future; }, "ZKConnectionRegistry.getMetaRegionLocations"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 979ea2ed4d09..1d1749e70bc0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -83,11 +83,6 @@ public final class ReadOnlyZKClient implements Closeable { private final int keepAliveTimeMs; - public static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( - new ThreadFactoryBuilder().setNameFormat("Read-Only-ZKClient-Retry-Timer-pool-%d").setDaemon(true) - .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), - 1, TimeUnit.MILLISECONDS); - private final ZKClientConfig zkClientConfig; @@ -271,25 +266,19 @@ public void closed(IOException e) { } } - public CompletableFuture getWithTimeout(String path, long endTime) { + public CompletableFuture getWithTimeout(String path, long endTime, HashedWheelTimer retryTimer) { CompletableFuture future = get(path); TimerTask timerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - if (EnvironmentEdgeManager.currentTime() > endTime) { - if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { - future.completeExceptionally( - new DoNotRetryIOException("Zookeeper get could not be completed by " + endTime)); - } - } else { - if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { - RETRY_TIMER.newTimeout(this, 1, TimeUnit.MILLISECONDS); - } + if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { + future.completeExceptionally( + new DoNotRetryIOException("Zookeeper get could not be completed by " + endTime)); } } }; - RETRY_TIMER.newTimeout(timerTask, 1, TimeUnit.MILLISECONDS); + retryTimer.newTimeout(timerTask, endTime + 1, TimeUnit.MILLISECONDS); return future; } @@ -309,25 +298,18 @@ protected void doExec(ZooKeeper zk) { return future; } - public CompletableFuture existsWithTimeout(String path, int endTime) { + public CompletableFuture existsWithTimeout(String path, long endTime, HashedWheelTimer retryTimer) { CompletableFuture future = exists(path); TimerTask timerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - if (EnvironmentEdgeManager.currentTime() > endTime) { if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { future.completeExceptionally( new DoNotRetryIOException("Zookeeper get could not be completed by " + endTime)); } - } else { - if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { - RETRY_TIMER.newTimeout(this, 1, TimeUnit.MILLISECONDS); - } - } } }; - - RETRY_TIMER.newTimeout(timerTask, 1, TimeUnit.MILLISECONDS); + retryTimer.newTimeout(timerTask, endTime + 1, TimeUnit.MILLISECONDS); return future; } @@ -346,25 +328,19 @@ protected void doExec(ZooKeeper zk) { return future; } - public CompletableFuture> listWithTimeout(String path, long endTime) { + public CompletableFuture> listWithTimeout(String path, long endTime, HashedWheelTimer retryTimer) { CompletableFuture> future = list(path); TimerTask timerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - if (EnvironmentEdgeManager.currentTime() > endTime) { if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { future.completeExceptionally( new DoNotRetryIOException("Zookeeper get could not be completed by " + endTime)); } - } else { - if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { - RETRY_TIMER.newTimeout(this, 1, TimeUnit.MILLISECONDS); - } - } } }; - RETRY_TIMER.newTimeout(timerTask, 1, TimeUnit.MILLISECONDS); + retryTimer.newTimeout(timerTask, endTime + 1, TimeUnit.MILLISECONDS); return future; } From 3001cba120dd97b77cf1e0e56093d4a23f016e27 Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Thu, 20 Jun 2024 18:59:23 -0700 Subject: [PATCH 07/14] fixed minor comments --- .../hbase/zookeeper/ReadOnlyZKClient.java | 51 ++++++------------- 1 file changed, 15 insertions(+), 36 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 1d1749e70bc0..7ef830f44c68 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -32,12 +32,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.client.AsyncConnectionImpl; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.TimerTask; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; @@ -266,19 +263,18 @@ public void closed(IOException e) { } } - public CompletableFuture getWithTimeout(String path, long endTime, HashedWheelTimer retryTimer) { - CompletableFuture future = get(path); - TimerTask timerTask = new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { - future.completeExceptionally( - new DoNotRetryIOException("Zookeeper get could not be completed by " + endTime)); - } + private static TimerTask getTimerTask(final long timeoutMs, final CompletableFuture future, final String api) { + return timeout -> { + if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { + future.completeExceptionally(new DoNotRetryIOException( "Zookeeper " + api + " could not be completed in " + timeoutMs + " ms")); } }; + } - retryTimer.newTimeout(timerTask, endTime + 1, TimeUnit.MILLISECONDS); + public CompletableFuture getWithTimeout(final String path, final long timeoutMs, final HashedWheelTimer retryTimer) { + CompletableFuture future = get(path); + TimerTask timerTask = getTimerTask(timeoutMs, future, "GET"); + retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); return future; } @@ -298,18 +294,10 @@ protected void doExec(ZooKeeper zk) { return future; } - public CompletableFuture existsWithTimeout(String path, long endTime, HashedWheelTimer retryTimer) { + public CompletableFuture existsWithTimeout(String path, long timeoutMs, HashedWheelTimer retryTimer) { CompletableFuture future = exists(path); - TimerTask timerTask = new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { - future.completeExceptionally( - new DoNotRetryIOException("Zookeeper get could not be completed by " + endTime)); - } - } - }; - retryTimer.newTimeout(timerTask, endTime + 1, TimeUnit.MILLISECONDS); + TimerTask timerTask = getTimerTask(timeoutMs, future, "EXISTS"); + retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); return future; } @@ -328,19 +316,10 @@ protected void doExec(ZooKeeper zk) { return future; } - public CompletableFuture> listWithTimeout(String path, long endTime, HashedWheelTimer retryTimer) { + public CompletableFuture> listWithTimeout(String path, long timeoutMs, HashedWheelTimer retryTimer) { CompletableFuture> future = list(path); - TimerTask timerTask = new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { - future.completeExceptionally( - new DoNotRetryIOException("Zookeeper get could not be completed by " + endTime)); - } - } - }; - - retryTimer.newTimeout(timerTask, endTime + 1, TimeUnit.MILLISECONDS); + TimerTask timerTask = getTimerTask(timeoutMs, future, "LIST"); + retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); return future; } From 44c2254cdba0412cc962ddb737bec43f48fe67b2 Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Fri, 28 Jun 2024 02:15:23 -0500 Subject: [PATCH 08/14] made timeout class variables --- .../hbase/client/AsyncConnectionImpl.java | 2 +- .../hbase/client/ZKConnectionRegistry.java | 52 +++++++------------ .../hbase/zookeeper/ReadOnlyZKClient.java | 25 +++++---- .../hbase/zookeeper/TestReadOnlyZKClient.java | 10 +++- 4 files changed, 43 insertions(+), 46 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 807d192229d7..3f0e3e0b370e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -79,7 +79,7 @@ public class AsyncConnectionImpl implements AsyncConnection { private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class); - public static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( + static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10, TimeUnit.MILLISECONDS); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index c5422380dd52..8fd96f5f282a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -28,11 +28,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import org.apache.hbase.thirdparty.io.netty.util.Timeout; -import org.apache.hbase.thirdparty.io.netty.util.TimerTask; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterId; @@ -52,7 +48,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * Zookeeper based registry implementation. @@ -74,52 +69,45 @@ class ZKConnectionRegistry implements ConnectionRegistry { private final ZNodePaths znodePaths; private final Configuration conf; - + private final int zkRegistryAsyncTimeout; public static final String ZK_REGISTRY_ASYNC_GET_TIMEOUT = "zookeeper.registry.async.get.timeout"; public static final int DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT = 60000; // 1 min - - // User not used, but for rpc based registry we need it + ZKConnectionRegistry(Configuration conf, User ignored) { this.znodePaths = new ZNodePaths(conf); - this.zk = new ReadOnlyZKClient(conf); + this.zk = new ReadOnlyZKClient(conf, AsyncConnectionImpl.RETRY_TIMER); this.conf = conf; + this.zkRegistryAsyncTimeout = + conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT); if (NEEDS_LOG_WARN) { synchronized (WARN_LOCK) { if (NEEDS_LOG_WARN) { LOG.warn( "ZKConnectionRegistry is deprecated. See https://hbase.apache.org/book.html#client.rpcconnectionregistry"); NEEDS_LOG_WARN = false; - } } } } - public ZNodePaths getZNodePaths() { - return znodePaths; - } - private interface Converter { T convert(byte[] data) throws Exception; } private CompletableFuture getAndConvert(String path, Converter converter) { CompletableFuture future = new CompletableFuture<>(); - addListener(zk.getWithTimeout(path, - conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT), - AsyncConnectionImpl.RETRY_TIMER), - (data, error) -> { - if (error != null) { - future.completeExceptionally(error); - return; - } - try { - future.complete(converter.convert(data)); - } catch (Exception e) { - future.completeExceptionally(e); - } - }); + addListener(zk.getWithTimeout(path, this.zkRegistryAsyncTimeout), (data, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + try { + future.complete(converter.convert(data)); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); return future; } @@ -235,11 +223,9 @@ private void getMetaRegionLocation(CompletableFuture future, public CompletableFuture getMetaRegionLocations() { return tracedFuture(() -> { CompletableFuture future = new CompletableFuture<>(); - addListener( - zk.listWithTimeout(znodePaths.baseZNode, - conf.getInt(ZK_REGISTRY_ASYNC_GET_TIMEOUT, DEFAULT_ZK_REGISTRY_ASYNC_GET_TIMEOUT), - AsyncConnectionImpl.RETRY_TIMER).thenApply(children -> children.stream() - .filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), + addListener(zk.listWithTimeout(znodePaths.baseZNode, this.zkRegistryAsyncTimeout) + .thenApply(children -> children.stream().filter(c -> this.znodePaths.isMetaZNodePrefix(c)) + .collect(Collectors.toList())), (metaReplicaZNodes, error) -> { if (error != null) { future.completeExceptionally(error); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 7ef830f44c68..1e03a68f2ca9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -32,11 +32,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; -import org.apache.hbase.thirdparty.io.netty.util.Timeout; -import org.apache.hbase.thirdparty.io.netty.util.TimerTask; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import org.apache.hbase.thirdparty.io.netty.util.TimerTask; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -80,8 +79,9 @@ public final class ReadOnlyZKClient implements Closeable { private final int keepAliveTimeMs; - private final ZKClientConfig zkClientConfig; + private HashedWheelTimer retryTimer; + private final ZKClientConfig zkClientConfig; private static abstract class Task implements Delayed { @@ -131,7 +131,7 @@ private String getId() { return String.format("0x%08x", System.identityHashCode(this)); } - public ReadOnlyZKClient(Configuration conf) { + public ReadOnlyZKClient(Configuration conf, HashedWheelTimer retryTimer) { // We might use a different ZK for client access String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf); if (clientZkQuorumServers != null) { @@ -145,6 +145,7 @@ public ReadOnlyZKClient(Configuration conf) { conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); this.zkClientConfig = ZKConfig.getZKClientConfig(conf); + this.retryTimer = retryTimer; LOG.debug( "Connect {} to {} with session timeout={}ms, retries={}, " + "retry interval={}ms, keepAlive={}ms, zk client config={}", @@ -263,15 +264,17 @@ public void closed(IOException e) { } } - private static TimerTask getTimerTask(final long timeoutMs, final CompletableFuture future, final String api) { + private static TimerTask getTimerTask(final long timeoutMs, final CompletableFuture future, + final String api) { return timeout -> { - if (!future.isCancelled() && !future.isDone() && !future.isCompletedExceptionally()) { - future.completeExceptionally(new DoNotRetryIOException( "Zookeeper " + api + " could not be completed in " + timeoutMs + " ms")); + if (!future.isDone()) { + future.completeExceptionally(new DoNotRetryIOException( + "Zookeeper " + api + " could not be completed in " + timeoutMs + " ms")); } }; } - public CompletableFuture getWithTimeout(final String path, final long timeoutMs, final HashedWheelTimer retryTimer) { + public CompletableFuture getWithTimeout(final String path, final long timeoutMs) { CompletableFuture future = get(path); TimerTask timerTask = getTimerTask(timeoutMs, future, "GET"); retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); @@ -294,7 +297,7 @@ protected void doExec(ZooKeeper zk) { return future; } - public CompletableFuture existsWithTimeout(String path, long timeoutMs, HashedWheelTimer retryTimer) { + public CompletableFuture existsWithTimeout(String path, long timeoutMs) { CompletableFuture future = exists(path); TimerTask timerTask = getTimerTask(timeoutMs, future, "EXISTS"); retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); @@ -316,7 +319,7 @@ protected void doExec(ZooKeeper zk) { return future; } - public CompletableFuture> listWithTimeout(String path, long timeoutMs, HashedWheelTimer retryTimer) { + public CompletableFuture> listWithTimeout(String path, long timeoutMs) { CompletableFuture> future = list(path); TimerTask timerTask = getTimerTask(timeoutMs, future, "LIST"); retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java index 2f08f6276db4..3757bd846e38 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java @@ -43,6 +43,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtil; @@ -51,6 +52,9 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ZKTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -79,6 +83,10 @@ public class TestReadOnlyZKClient { private static int CHILDREN = 5; private static ReadOnlyZKClient RO_ZK; + static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( + new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) + .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), + 10, TimeUnit.MILLISECONDS); @BeforeClass public static void setUp() throws Exception { @@ -98,7 +106,7 @@ public static void setUp() throws Exception { conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY, 3); conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY_INTERVAL_MILLIS, 100); conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000); - RO_ZK = new ReadOnlyZKClient(conf); + RO_ZK = new ReadOnlyZKClient(conf, RETRY_TIMER); // only connect when necessary assertNull(RO_ZK.zookeeper); } From a1027f5c93b73dba5e08102f7a1191ddf71d2c44 Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Fri, 28 Jun 2024 02:15:23 -0500 Subject: [PATCH 09/14] made timeout class variables --- .../java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 1e03a68f2ca9..07308ab07493 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -45,7 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * A very simple read only zookeeper implementation without watcher support. */ From 3df45567ffd5bbf71db750dec1b705ac00762abc Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Mon, 15 Jul 2024 10:19:18 -0500 Subject: [PATCH 10/14] ran spotless apply --- .../org/apache/hadoop/hbase/client/ZKConnectionRegistry.java | 1 + .../org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java | 5 +++-- .../apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java | 5 +++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index 8fd96f5f282a..f67e25d3ff99 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData; + import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 07308ab07493..1348c10f8121 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -34,8 +34,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; -import org.apache.hbase.thirdparty.io.netty.util.TimerTask; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -45,6 +43,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import org.apache.hbase.thirdparty.io.netty.util.TimerTask; + /** * A very simple read only zookeeper implementation without watcher support. */ diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java index 3757bd846e38..21d658c36b50 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java @@ -53,8 +53,6 @@ import org.apache.hadoop.hbase.testclassification.ZKTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -67,6 +65,9 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; + @Category({ ZKTests.class, MediumTests.class }) public class TestReadOnlyZKClient { From 0e96e4c770d8340935f6c8dfdce5511e10a21c6b Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Mon, 15 Jul 2024 15:45:34 -0500 Subject: [PATCH 11/14] added UT for methods with timeout --- .../hbase/zookeeper/TestReadOnlyZKClient.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java index 21d658c36b50..575fa55f3f57 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java @@ -213,4 +213,18 @@ public void testNotCloseZkWhenPending() throws Exception { waitForIdleConnectionClosed(); verify(mockedZK, times(1)).close(); } + + @Test + public void testReadWithTimeout() throws Exception { + assertArrayEquals(DATA, RO_ZK.getWithTimeout(PATH, 60000).get()); + assertEquals(CHILDREN, RO_ZK.existsWithTimeout(PATH, 60000).get().getNumChildren()); + List children = RO_ZK.listWithTimeout(PATH, 60000).get(); + assertEquals(CHILDREN, children.size()); + Collections.sort(children); + for (int i = 0; i < CHILDREN; i++) { + assertEquals("c" + i, children.get(i)); + } + assertNotNull(RO_ZK.zookeeper); + waitForIdleConnectionClosed(); + } } From 5f1752b2ea26302fb7b02bc53fddcbcbf57db863 Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Wed, 17 Jul 2024 10:17:15 -0700 Subject: [PATCH 12/14] closed retry timer variable --- .../apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java index 575fa55f3f57..d98f4ebe2793 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java @@ -84,7 +84,7 @@ public class TestReadOnlyZKClient { private static int CHILDREN = 5; private static ReadOnlyZKClient RO_ZK; - static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( + private static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10, TimeUnit.MILLISECONDS); @@ -114,6 +114,7 @@ public static void setUp() throws Exception { @AfterClass public static void tearDown() throws IOException { + RETRY_TIMER.stop(); RO_ZK.close(); UTIL.shutdownMiniZKCluster(); UTIL.cleanupTestDir(); From df2880928b5e1e8787ece0717b6b9ed77eec4711 Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Wed, 17 Jul 2024 11:48:38 -0700 Subject: [PATCH 13/14] overloaded get,exists and list method --- .../apache/hadoop/hbase/client/ZKConnectionRegistry.java | 4 ++-- .../org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java | 6 +++--- .../apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index f67e25d3ff99..bf27794b8de2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -98,7 +98,7 @@ private interface Converter { private CompletableFuture getAndConvert(String path, Converter converter) { CompletableFuture future = new CompletableFuture<>(); - addListener(zk.getWithTimeout(path, this.zkRegistryAsyncTimeout), (data, error) -> { + addListener(zk.get(path, this.zkRegistryAsyncTimeout), (data, error) -> { if (error != null) { future.completeExceptionally(error); return; @@ -224,7 +224,7 @@ private void getMetaRegionLocation(CompletableFuture future, public CompletableFuture getMetaRegionLocations() { return tracedFuture(() -> { CompletableFuture future = new CompletableFuture<>(); - addListener(zk.listWithTimeout(znodePaths.baseZNode, this.zkRegistryAsyncTimeout) + addListener(zk.list(znodePaths.baseZNode, this.zkRegistryAsyncTimeout) .thenApply(children -> children.stream().filter(c -> this.znodePaths.isMetaZNodePrefix(c)) .collect(Collectors.toList())), (metaReplicaZNodes, error) -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 1348c10f8121..6c26f089d742 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -274,7 +274,7 @@ private static TimerTask getTimerTask(final long timeoutMs, final CompletableFut }; } - public CompletableFuture getWithTimeout(final String path, final long timeoutMs) { + public CompletableFuture get(final String path, final long timeoutMs) { CompletableFuture future = get(path); TimerTask timerTask = getTimerTask(timeoutMs, future, "GET"); retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); @@ -297,7 +297,7 @@ protected void doExec(ZooKeeper zk) { return future; } - public CompletableFuture existsWithTimeout(String path, long timeoutMs) { + public CompletableFuture exists(String path, long timeoutMs) { CompletableFuture future = exists(path); TimerTask timerTask = getTimerTask(timeoutMs, future, "EXISTS"); retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); @@ -319,7 +319,7 @@ protected void doExec(ZooKeeper zk) { return future; } - public CompletableFuture> listWithTimeout(String path, long timeoutMs) { + public CompletableFuture> list(String path, long timeoutMs) { CompletableFuture> future = list(path); TimerTask timerTask = getTimerTask(timeoutMs, future, "LIST"); retryTimer.newTimeout(timerTask, timeoutMs + 1, TimeUnit.MILLISECONDS); diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java index d98f4ebe2793..23a8c339cd71 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java @@ -217,9 +217,9 @@ public void testNotCloseZkWhenPending() throws Exception { @Test public void testReadWithTimeout() throws Exception { - assertArrayEquals(DATA, RO_ZK.getWithTimeout(PATH, 60000).get()); - assertEquals(CHILDREN, RO_ZK.existsWithTimeout(PATH, 60000).get().getNumChildren()); - List children = RO_ZK.listWithTimeout(PATH, 60000).get(); + assertArrayEquals(DATA, RO_ZK.get(PATH, 60000).get()); + assertEquals(CHILDREN, RO_ZK.exists(PATH, 60000).get().getNumChildren()); + List children = RO_ZK.list(PATH, 60000).get(); assertEquals(CHILDREN, children.size()); Collections.sort(children); for (int i = 0; i < CHILDREN; i++) { From afdffaee316540dcf7b147de9bb799784ff961bc Mon Sep 17 00:00:00 2001 From: divneet-kaur Date: Wed, 17 Jul 2024 12:29:45 -0700 Subject: [PATCH 14/14] ran spotless:apply --- .../apache/hadoop/hbase/client/ZKConnectionRegistry.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index bf27794b8de2..8c4bdf4d51c6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -224,9 +224,9 @@ private void getMetaRegionLocation(CompletableFuture future, public CompletableFuture getMetaRegionLocations() { return tracedFuture(() -> { CompletableFuture future = new CompletableFuture<>(); - addListener(zk.list(znodePaths.baseZNode, this.zkRegistryAsyncTimeout) - .thenApply(children -> children.stream().filter(c -> this.znodePaths.isMetaZNodePrefix(c)) - .collect(Collectors.toList())), + addListener( + zk.list(znodePaths.baseZNode, this.zkRegistryAsyncTimeout).thenApply(children -> children + .stream().filter(c -> this.znodePaths.isMetaZNodePrefix(c)).collect(Collectors.toList())), (metaReplicaZNodes, error) -> { if (error != null) { future.completeExceptionally(error);