From 86328cab92d8effe1f188fb0baecdc806308d73e Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Thu, 15 Jun 2023 15:56:32 -0700 Subject: [PATCH 1/5] HBASE-27902 New async admin api to invoke coproc on multiple servers --- .../hadoop/hbase/client/AsyncAdmin.java | 22 +++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 6 + .../hbase/client/RawAsyncHBaseAdmin.java | 27 +++ .../hbase/client/TestInterfaceAlign.java | 1 + ...CoprocessorOnAllRegionServersEndpoint.java | 163 ++++++++++++++++++ 5 files changed, 219 insertions(+) create mode 100644 hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 913350b1d172..4c5c99c7033d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1471,6 +1471,28 @@ CompletableFuture coprocessorService(Function stubMaker CompletableFuture coprocessorService(Function stubMaker, ServiceCaller callable, ServerName serverName); + /** + * Execute the given coprocessor call on all region servers. + *

+ * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a + * one line lambda expression, like: + * + *

+   * channel -> xxxService.newStub(channel)
+   * 
+ * + * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link ServiceCaller} for more details. + * @param the type of the asynchronous stub + * @param the type of the return value + * @return Map of each region server to its result of the protobuf rpc call, wrapped by a + * {@link CompletableFuture}. + * @see ServiceCaller + */ + CompletableFuture> coprocessorServiceOnAllRegionServers( + Function stubMaker, ServiceCaller callable); + /** * List all the dead region servers. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index ce604d90b2ec..ea67909a13aa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -785,6 +785,12 @@ public CompletableFuture coprocessorService(Function st return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverName)); } + @Override + public CompletableFuture> coprocessorServiceOnAllRegionServers( + Function stubMaker, ServiceCaller callable) { + return wrap(rawAdmin.coprocessorServiceOnAllRegionServers(stubMaker, callable)); + } + @Override public CompletableFuture> listDeadServers() { return wrap(rawAdmin.listDeadServers()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index d413905c04af..a154bcd65123 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -3439,6 +3440,32 @@ public CompletableFuture coprocessorService(Function st return future; } + @Override + public CompletableFuture> coprocessorServiceOnAllRegionServers( + Function stubMaker, ServiceCaller callable) { + CompletableFuture> future = new CompletableFuture<>(); + FutureUtils.addListener(getRegionServers(), (regionServers, e1) -> { + if (e1 != null) { + future.completeExceptionally(e1); + return; + } + Map resultMap = new ConcurrentHashMap<>(); + for (ServerName rs : regionServers) { + FutureUtils.addListener(coprocessorService(stubMaker, callable, rs), (r, e2) -> { + if (e2 != null) { + resultMap.put(rs, e2); + } else { + resultMap.put(rs, r); + } + if (resultMap.size() == regionServers.size()) { + future.complete(Collections.unmodifiableMap(resultMap)); + } + }); + } + }); + return future; + } + @Override public CompletableFuture> clearDeadServers(List servers) { return this.> newMasterCaller() diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java index ebdb7adc5887..722ee6a3061a 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java @@ -56,6 +56,7 @@ public void testAdminWithAsyncAdmin() { adminMethodNames.remove("getConfiguration"); adminMethodNames.removeAll(getMethodNames(Abortable.class)); adminMethodNames.removeAll(getMethodNames(Closeable.class)); + asyncAdminMethodNames.remove("coprocessorServiceOnAllRegionServers"); adminMethodNames.forEach(method -> { boolean contains = asyncAdminMethodNames.contains(method); diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java new file mode 100644 index 000000000000..30656b8e9129 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.FileNotFoundException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.TestAsyncAdminBase; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService; + +@RunWith(Parameterized.class) +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncCoprocessorOnAllRegionServersEndpoint extends TestAsyncAdminBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncCoprocessorOnAllRegionServersEndpoint.class); + + private static final String THROW_CLASS_NAME = "java.io.FileNotFoundException"; + private static final String DUMMY_VALUE = "val"; + private static final int NUM_SLAVES = 5; + private static final int NUM_SUCCESS_REGION_SERVERS = 3; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); + TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + DummyRegionServerEndpoint.class.getName()); + TEST_UTIL.startMiniCluster(NUM_SLAVES); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testRegionServersCoprocessorService() + throws ExecutionException, InterruptedException { + DummyRequest request = DummyRequest.getDefaultInstance(); + Map resultMap = + admin. coprocessorServiceOnAllRegionServers( + DummyService::newStub, (s, c, done) -> s.dummyCall(c, request, done)).get(); + + resultMap.forEach((k, v) -> { + assertTrue(v instanceof DummyResponse); + DummyResponse resp = (DummyResponse) v; + assertEquals(DUMMY_VALUE, resp.getValue()); + }); + } + + @Test + public void testRegionServerCoprocessorsServiceAllFail() + throws ExecutionException, InterruptedException { + DummyRequest request = DummyRequest.getDefaultInstance(); + Map resultMap = + admin. coprocessorServiceOnAllRegionServers( + DummyService::newStub, (s, c, done) -> s.dummyThrow(c, request, done)).get(); + + resultMap.forEach((k, v) -> { + assertTrue(v instanceof RetriesExhaustedException); + Throwable e = (Throwable) v; + assertTrue(e.getMessage().contains(THROW_CLASS_NAME)); + }); + } + + @Test + public void testRegionServerCoprocessorsServicePartialFail() + throws ExecutionException, InterruptedException { + DummyRequest request = DummyRequest.getDefaultInstance(); + AtomicInteger callCount = new AtomicInteger(); + Map resultMap = admin. coprocessorServiceOnAllRegionServers(DummyService::newStub, (s, c, done) -> { + callCount.addAndGet(1); + if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) { + s.dummyCall(c, request, done); + } else { + s.dummyThrow(c, request, done); + } + }).get(); + + AtomicInteger successCallCount = new AtomicInteger(); + resultMap.forEach((k, v) -> { + if (v instanceof DummyResponse) { + successCallCount.addAndGet(1); + DummyResponse resp = (DummyResponse) v; + assertEquals(DUMMY_VALUE, resp.getValue()); + } else { + assertTrue(v instanceof RetriesExhaustedException); + Throwable e = (Throwable) v; + assertTrue(e.getMessage().contains(THROW_CLASS_NAME)); + } + }); + assertEquals(NUM_SUCCESS_REGION_SERVERS, successCallCount.get()); + } + + public static class DummyRegionServerEndpoint extends DummyService + implements RegionServerCoprocessor { + @Override + public Iterable getServices() { + return Collections.singleton(this); + } + + @Override + public void dummyCall(RpcController controller, DummyRequest request, + RpcCallback callback) { + callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build()); + } + + @Override + public void dummyThrow(RpcController controller, DummyRequest request, + RpcCallback done) { + CoprocessorRpcUtils.setControllerException(controller, + new FileNotFoundException("/file.txt")); + } + } +} From 7683f91139f24ba167b8ac410dffce26ff8c29f8 Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Thu, 15 Jun 2023 16:52:20 -0700 Subject: [PATCH 2/5] fix spotless check error --- ...CoprocessorOnAllRegionServersEndpoint.java | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java index 30656b8e9129..acfcf0bfa95c 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java @@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; import java.io.FileNotFoundException; import java.util.Collections; import java.util.Map; @@ -31,6 +34,9 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.TestAsyncAdminBase; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -42,14 +48,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; - -import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; -import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; -import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService; - @RunWith(Parameterized.class) @Category({ ClientTests.class, MediumTests.class }) public class TestAsyncCoprocessorOnAllRegionServersEndpoint extends TestAsyncAdminBase { @@ -117,13 +115,13 @@ public void testRegionServerCoprocessorsServicePartialFail() AtomicInteger callCount = new AtomicInteger(); Map resultMap = admin. coprocessorServiceOnAllRegionServers(DummyService::newStub, (s, c, done) -> { - callCount.addAndGet(1); - if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) { - s.dummyCall(c, request, done); - } else { - s.dummyThrow(c, request, done); - } - }).get(); + callCount.addAndGet(1); + if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) { + s.dummyCall(c, request, done); + } else { + s.dummyThrow(c, request, done); + } + }).get(); AtomicInteger successCallCount = new AtomicInteger(); resultMap.forEach((k, v) -> { From 7b55785bc1bd3b1b936e6b96fff6624c51e2bc62 Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Fri, 16 Jun 2023 14:49:28 -0700 Subject: [PATCH 3/5] make coprocessorServiceOnAllRegionServers be a default method in AsyncAdmin interface --- .../hadoop/hbase/client/AsyncAdmin.java | 28 +++++++++++++++++-- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 6 ---- .../hbase/client/RawAsyncHBaseAdmin.java | 27 ------------------ 3 files changed, 26 insertions(+), 35 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 4c5c99c7033d..4a98a32bfbf9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -22,6 +22,7 @@ import com.google.protobuf.RpcChannel; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -29,6 +30,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -1490,8 +1492,30 @@ CompletableFuture coprocessorService(Function stubMaker * {@link CompletableFuture}. * @see ServiceCaller */ - CompletableFuture> coprocessorServiceOnAllRegionServers( - Function stubMaker, ServiceCaller callable); + default CompletableFuture> coprocessorServiceOnAllRegionServers( + Function stubMaker, ServiceCaller callable) { + CompletableFuture> future = new CompletableFuture<>(); + addListener(getRegionServers(), (regionServers, e1) -> { + if (e1 != null) { + future.completeExceptionally(e1); + return; + } + Map resultMap = new ConcurrentHashMap<>(); + for (ServerName rs : regionServers) { + addListener(coprocessorService(stubMaker, callable, rs), (r, e2) -> { + if (e2 != null) { + resultMap.put(rs, e2); + } else { + resultMap.put(rs, r); + } + if (resultMap.size() == regionServers.size()) { + future.complete(Collections.unmodifiableMap(resultMap)); + } + }); + } + }); + return future; + } /** * List all the dead region servers. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index ea67909a13aa..ce604d90b2ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -785,12 +785,6 @@ public CompletableFuture coprocessorService(Function st return wrap(rawAdmin.coprocessorService(stubMaker, callable, serverName)); } - @Override - public CompletableFuture> coprocessorServiceOnAllRegionServers( - Function stubMaker, ServiceCaller callable) { - return wrap(rawAdmin.coprocessorServiceOnAllRegionServers(stubMaker, callable)); - } - @Override public CompletableFuture> listDeadServers() { return wrap(rawAdmin.listDeadServers()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index a154bcd65123..d413905c04af 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -95,7 +95,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; -import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -3440,32 +3439,6 @@ public CompletableFuture coprocessorService(Function st return future; } - @Override - public CompletableFuture> coprocessorServiceOnAllRegionServers( - Function stubMaker, ServiceCaller callable) { - CompletableFuture> future = new CompletableFuture<>(); - FutureUtils.addListener(getRegionServers(), (regionServers, e1) -> { - if (e1 != null) { - future.completeExceptionally(e1); - return; - } - Map resultMap = new ConcurrentHashMap<>(); - for (ServerName rs : regionServers) { - FutureUtils.addListener(coprocessorService(stubMaker, callable, rs), (r, e2) -> { - if (e2 != null) { - resultMap.put(rs, e2); - } else { - resultMap.put(rs, r); - } - if (resultMap.size() == regionServers.size()) { - future.complete(Collections.unmodifiableMap(resultMap)); - } - }); - } - }); - return future; - } - @Override public CompletableFuture> clearDeadServers(List servers) { return this.> newMasterCaller() From 9735a8fafb699398129b317321a501fc3c5e8ec5 Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Mon, 19 Jun 2023 16:40:23 -0700 Subject: [PATCH 4/5] introduce a helper class AsyncAdminClientUtils to put coprocessorServiceOnAllRegionServers method --- .../hadoop/hbase/client/AsyncAdmin.java | 46 ---------- .../hbase/client/AsyncAdminClientUtils.java | 85 +++++++++++++++++++ ...CoprocessorOnAllRegionServersEndpoint.java | 38 +++++---- 3 files changed, 107 insertions(+), 62 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 4a98a32bfbf9..913350b1d172 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -22,7 +22,6 @@ import com.google.protobuf.RpcChannel; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -30,7 +29,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -1473,50 +1471,6 @@ CompletableFuture coprocessorService(Function stubMaker CompletableFuture coprocessorService(Function stubMaker, ServiceCaller callable, ServerName serverName); - /** - * Execute the given coprocessor call on all region servers. - *

- * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a - * one line lambda expression, like: - * - *

-   * channel -> xxxService.newStub(channel)
-   * 
- * - * @param stubMaker a delegation to the actual {@code newStub} call. - * @param callable a delegation to the actual protobuf rpc call. See the comment of - * {@link ServiceCaller} for more details. - * @param the type of the asynchronous stub - * @param the type of the return value - * @return Map of each region server to its result of the protobuf rpc call, wrapped by a - * {@link CompletableFuture}. - * @see ServiceCaller - */ - default CompletableFuture> coprocessorServiceOnAllRegionServers( - Function stubMaker, ServiceCaller callable) { - CompletableFuture> future = new CompletableFuture<>(); - addListener(getRegionServers(), (regionServers, e1) -> { - if (e1 != null) { - future.completeExceptionally(e1); - return; - } - Map resultMap = new ConcurrentHashMap<>(); - for (ServerName rs : regionServers) { - addListener(coprocessorService(stubMaker, callable, rs), (r, e2) -> { - if (e2 != null) { - resultMap.put(rs, e2); - } else { - resultMap.put(rs, r); - } - if (resultMap.size() == regionServers.size()) { - future.complete(Collections.unmodifiableMap(resultMap)); - } - }); - } - }); - return future; - } - /** * List all the dead region servers. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java new file mode 100644 index 000000000000..82269aabdb9d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.RpcChannel; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Additional Asynchronous Admin capabilities for clients. + */ +@InterfaceAudience.Public +public final class AsyncAdminClientUtils { + + private AsyncAdminClientUtils() { + } + + /** + * Execute the given coprocessor call on all region servers. + *

+ * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a + * one line lambda expression, like: + * + *

+   * channel -> xxxService.newStub(channel)
+   * 
+ * + * @param asyncAdmin the asynchronous administrative API for HBase. + * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link ServiceCaller} for more details. + * @param the type of the asynchronous stub + * @param the type of the return value + * @return Map of each region server to its result of the protobuf rpc call, wrapped by a + * {@link CompletableFuture}. + * @see ServiceCaller + */ + public static CompletableFuture> + coprocessorServiceOnAllRegionServers(AsyncAdmin asyncAdmin, Function stubMaker, + ServiceCaller callable) { + CompletableFuture> future = new CompletableFuture<>(); + FutureUtils.addListener(asyncAdmin.getRegionServers(), (regionServers, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + Map resultMap = new ConcurrentHashMap<>(); + for (ServerName regionServer : regionServers) { + FutureUtils.addListener(asyncAdmin.coprocessorService(stubMaker, callable, regionServer), + (server, err) -> { + if (err != null) { + resultMap.put(regionServer, err); + } else { + resultMap.put(regionServer, server); + } + if (resultMap.size() == regionServers.size()) { + future.complete(Collections.unmodifiableMap(resultMap)); + } + }); + } + }); + return future; + } +} diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java index acfcf0bfa95c..018c67588029 100644 --- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java @@ -31,8 +31,10 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AsyncAdminClientUtils; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.ServiceCaller; import org.apache.hadoop.hbase.client.TestAsyncAdminBase; import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; @@ -82,10 +84,11 @@ public static void tearDownAfterClass() throws Exception { public void testRegionServersCoprocessorService() throws ExecutionException, InterruptedException { DummyRequest request = DummyRequest.getDefaultInstance(); - Map resultMap = - admin. coprocessorServiceOnAllRegionServers( - DummyService::newStub, (s, c, done) -> s.dummyCall(c, request, done)).get(); - + Map resultMap = AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin, + DummyService::newStub, (ServiceCaller) (stub, controller, + rpcCallback) -> stub.dummyCall(controller, request, rpcCallback)) + .get(); resultMap.forEach((k, v) -> { assertTrue(v instanceof DummyResponse); DummyResponse resp = (DummyResponse) v; @@ -97,9 +100,11 @@ admin. coprocessorServiceOnAllRegionServers( public void testRegionServerCoprocessorsServiceAllFail() throws ExecutionException, InterruptedException { DummyRequest request = DummyRequest.getDefaultInstance(); - Map resultMap = - admin. coprocessorServiceOnAllRegionServers( - DummyService::newStub, (s, c, done) -> s.dummyThrow(c, request, done)).get(); + Map resultMap = AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin, + DummyService::newStub, (ServiceCaller) (stub, controller, + rpcCallback) -> stub.dummyThrow(controller, request, rpcCallback)) + .get(); resultMap.forEach((k, v) -> { assertTrue(v instanceof RetriesExhaustedException); @@ -113,15 +118,16 @@ public void testRegionServerCoprocessorsServicePartialFail() throws ExecutionException, InterruptedException { DummyRequest request = DummyRequest.getDefaultInstance(); AtomicInteger callCount = new AtomicInteger(); - Map resultMap = admin. coprocessorServiceOnAllRegionServers(DummyService::newStub, (s, c, done) -> { - callCount.addAndGet(1); - if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) { - s.dummyCall(c, request, done); - } else { - s.dummyThrow(c, request, done); - } - }).get(); + Map resultMap = + AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin, DummyService::newStub, + (ServiceCaller) (stub, controller, rpcCallback) -> { + callCount.addAndGet(1); + if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) { + stub.dummyCall(controller, request, rpcCallback); + } else { + stub.dummyThrow(controller, request, rpcCallback); + } + }).get(); AtomicInteger successCallCount = new AtomicInteger(); resultMap.forEach((k, v) -> { From 8cb16a204f3175341768ee2068f64b801a2c11a7 Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Tue, 20 Jun 2023 11:42:01 -0700 Subject: [PATCH 5/5] remove change in TestInterfaceAlign --- .../java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java index 722ee6a3061a..ebdb7adc5887 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestInterfaceAlign.java @@ -56,7 +56,6 @@ public void testAdminWithAsyncAdmin() { adminMethodNames.remove("getConfiguration"); adminMethodNames.removeAll(getMethodNames(Abortable.class)); adminMethodNames.removeAll(getMethodNames(Closeable.class)); - asyncAdminMethodNames.remove("coprocessorServiceOnAllRegionServers"); adminMethodNames.forEach(method -> { boolean contains = asyncAdminMethodNames.contains(method);