From 275a38e1533eafa1d4bd1d50c13bcecd9a397ea8 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Sun, 19 Jul 2020 21:58:12 -0700 Subject: [PATCH] HBASE-24765: Dynamic master discovery This patch adds the ability to discover newly added masters dynamically on the master registry side. The trigger for the re-fetch is either periodic (5 mins) or any registry RPC failure. Master server information is cached in masters to avoid repeated ZK lookups. Updates the client side connection metrics to maintain a counter per RPC type so that clients have visibility into counts grouped by RPC method name. I didn't add the method to ZK registry interface since there is a design discussion going on in splittable meta doc. We can add it later if needed. Signed-off-by: Nick Dimiduk Signed-off-by: Viraj Jasani Signed-off-by: Duo Zhang --- .../hbase/client/MasterAddressRefresher.java | 126 ++++++++ .../hadoop/hbase/client/MasterRegistry.java | 111 +++++-- .../hbase/client/MetricsConnection.java | 11 +- .../client/TestMasterRegistryHedgedReads.java | 12 +- .../hbase/client/TestMetricsConnection.java | 6 + .../main/protobuf/server/master/Master.proto | 22 +- .../hbase/master/ActiveMasterManager.java | 40 ++- .../apache/hadoop/hbase/master/HMaster.java | 55 +--- .../hbase/master/MasterRpcServices.java | 19 ++ .../client/TestMasterAddressRefresher.java | 113 +++++++ .../hbase/client/TestMasterRegistry.java | 52 ++++ .../hbase/master/AlwaysStandByHMaster.java | 8 +- .../hbase/master/TestActiveMasterManager.java | 278 ++++++++++-------- .../TestMasterAddressTracker.java | 40 ++- .../hbase/zookeeper/MasterAddressTracker.java | 58 ++++ 15 files changed, 741 insertions(+), 210 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java new file mode 100644 index 000000000000..3cbb9f74e3ec --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService; + +/** + * Thread safe utility that keeps master end points used by {@link MasterRegistry} up to date. This + * uses the RPC {@link ClientMetaService#getMasters} to fetch the latest list of registered masters. + * By default the refresh happens periodically (configured via + * {@link #PERIODIC_REFRESH_INTERVAL_SECS}). The refresh can also be triggered on demand via + * {@link #refreshNow()}. To prevent a flood of on-demand refreshes we expect that any attempts two + * should be spaced at least {@link #MIN_SECS_BETWEEN_REFRESHES} seconds apart. + */ +@InterfaceAudience.Private +public class MasterAddressRefresher implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(MasterAddressRefresher.class); + public static final String PERIODIC_REFRESH_INTERVAL_SECS = + "hbase.client.master_registry.refresh_interval_secs"; + private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300; + public static final String MIN_SECS_BETWEEN_REFRESHES = + "hbase.client.master_registry.min_secs_between_refreshes"; + private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60; + + private final ExecutorService pool; + private final MasterRegistry registry; + private final long periodicRefreshMs; + private final long timeBetweenRefreshesMs; + private final Object refreshMasters = new Object(); + + @Override + public void close() { + pool.shutdownNow(); + } + + /** + * Thread that refreshes the master end points until it is interrupted via {@link #close()}. + * Multiple callers attempting to refresh at the same time synchronize on {@link #refreshMasters}. + */ + private class RefreshThread implements Runnable { + @Override + public void run() { + long lastRpcTs = 0; + while (!Thread.interrupted()) { + try { + // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't + // have duplicate refreshes because once the thread is past the wait(), notify()s are + // ignored until the thread is back to the waiting state. + synchronized (refreshMasters) { + refreshMasters.wait(periodicRefreshMs); + } + long currentTs = EnvironmentEdgeManager.currentTime(); + if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) { + continue; + } + lastRpcTs = currentTs; + LOG.debug("Attempting to refresh master address end points."); + Set newMasters = new HashSet<>(registry.getMasters().get()); + registry.populateMasterStubs(newMasters); + LOG.debug("Finished refreshing master end points. {}", newMasters); + } catch (InterruptedException e) { + LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e); + break; + } catch (ExecutionException | IOException e) { + LOG.debug("Error populating latest list of masters.", e); + } + } + LOG.info("Master end point refresher loop exited."); + } + } + + MasterAddressRefresher(Configuration conf, MasterRegistry registry) { + pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setNameFormat("master-registry-refresh-end-points").setDaemon(true).build()); + periodicRefreshMs = TimeUnit.SECONDS.toMillis(conf.getLong(PERIODIC_REFRESH_INTERVAL_SECS, + PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT)); + timeBetweenRefreshesMs = TimeUnit.SECONDS.toMillis(conf.getLong(MIN_SECS_BETWEEN_REFRESHES, + MIN_SECS_BETWEEN_REFRESHES_DEFAULT)); + Preconditions.checkArgument(periodicRefreshMs > 0); + Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs); + this.registry = registry; + pool.submit(new RefreshThread()); + } + + /** + * Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh. + * See class comment for details. + */ + void refreshNow() { + synchronized (refreshMasters) { + refreshMasters.notify(); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java index 4d0a591a5b41..2a7ae16df47a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java @@ -33,11 +33,13 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcClient; @@ -57,10 +59,11 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse; @@ -89,11 +92,14 @@ public class MasterRegistry implements ConnectionRegistry { private final int hedgedReadFanOut; // Configured list of masters to probe the meta information from. - private final ImmutableMap masterAddr2Stub; + private volatile ImmutableMap masterAddr2Stub; // RPC client used to talk to the masters. private final RpcClient rpcClient; private final RpcControllerFactory rpcControllerFactory; + private final int rpcTimeoutMs; + + protected final MasterAddressRefresher masterAddressRefresher; /** * Parses the list of master addresses from the provided configuration. Supported format is comma @@ -115,20 +121,27 @@ private static Set parseMasterAddrs(Configuration conf) throws Unkno MasterRegistry(Configuration conf) throws IOException { this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT)); - int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, + rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch // this through the master registry... // This is a problem as we will use the cluster id to determine the authentication method rpcClient = RpcClientFactory.createClient(conf, null); rpcControllerFactory = RpcControllerFactory.instantiate(conf); - Set masterAddrs = parseMasterAddrs(conf); + // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters + // by fetching the end points from this list. + populateMasterStubs(parseMasterAddrs(conf)); + masterAddressRefresher = new MasterAddressRefresher(conf, this); + } + + void populateMasterStubs(Set masters) throws IOException { + Preconditions.checkNotNull(masters); ImmutableMap.Builder builder = - ImmutableMap.builderWithExpectedSize(masterAddrs.size()); + ImmutableMap.builderWithExpectedSize(masters.size()); User user = User.getCurrent(); - for (ServerName masterAddr : masterAddrs) { + for (ServerName masterAddr : masters) { builder.put(masterAddr, - ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs))); + ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs))); } masterAddr2Stub = builder.build(); } @@ -169,7 +182,13 @@ private CompletableFuture call(ClientMetaService.Interfac CompletableFuture future = new CompletableFuture<>(); callable.call(controller, stub, resp -> { if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); + IOException failureReason = controller.getFailed(); + future.completeExceptionally(failureReason); + if (ClientExceptionsUtil.isConnectionException(failureReason)) { + // RPC has failed, trigger a refresh of master end points. We can have some spurious + // refreshes, but that is okay since the RPC is not expensive and not in a hot path. + masterAddressRefresher.refreshNow(); + } } else { future.complete(resp); } @@ -188,8 +207,9 @@ private IOException badResponse(String debug) { * been tried and all of them are failed, we will fail the future. */ private void groupCall(CompletableFuture future, - List masterStubs, int startIndexInclusive, Callable callable, - Predicate isValidResp, String debug, ConcurrentLinkedQueue errors) { + Set masterServers, List masterStubs, + int startIndexInclusive, Callable callable, Predicate isValidResp, String debug, + ConcurrentLinkedQueue errors) { int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size()); AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive); for (int i = startIndexInclusive; i < endIndexExclusive; i++) { @@ -210,10 +230,10 @@ private void groupCall(CompletableFuture future, RetriesExhaustedException ex = new RetriesExhaustedException("masters", masterStubs.size(), new ArrayList<>(errors)); future.completeExceptionally( - new MasterRegistryFetchException(masterAddr2Stub.keySet(), ex)); + new MasterRegistryFetchException(masterServers, ex)); } else { - groupCall(future, masterStubs, endIndexExclusive, callable, isValidResp, debug, - errors); + groupCall(future, masterServers, masterStubs, endIndexExclusive, callable, + isValidResp, debug, errors); } } } else { @@ -226,17 +246,20 @@ private void groupCall(CompletableFuture future, private CompletableFuture call(Callable callable, Predicate isValidResp, String debug) { - List masterStubs = new ArrayList<>(masterAddr2Stub.values()); + ImmutableMap masterAddr2StubRef = masterAddr2Stub; + Set masterServers = masterAddr2StubRef.keySet(); + List masterStubs = new ArrayList<>(masterAddr2StubRef.values()); Collections.shuffle(masterStubs, ThreadLocalRandom.current()); CompletableFuture future = new CompletableFuture<>(); - groupCall(future, masterStubs, 0, callable, isValidResp, debug, new ConcurrentLinkedQueue<>()); + groupCall(future, masterServers, masterStubs, 0, callable, isValidResp, debug, + new ConcurrentLinkedQueue<>()); return future; } /** * Simple helper to transform the result of getMetaRegionLocations() rpc. */ - private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) { + private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) { List regionLocations = new ArrayList<>(); resp.getMetaLocationsList() .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location))); @@ -247,7 +270,7 @@ private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsRespo public CompletableFuture getMetaRegionLocations() { return this. call((c, s, d) -> s.getMetaRegionLocations(c, GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0, - "getMetaLocationsCount").thenApply(this::transformMetaRegionLocations); + "getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations); } @Override @@ -259,17 +282,54 @@ public CompletableFuture getClusterId() { .thenApply(GetClusterIdResponse::getClusterId); } - private ServerName transformServerName(GetActiveMasterResponse resp) { - return ProtobufUtil.toServerName(resp.getServerName()); + private static boolean hasActiveMaster(GetMastersResponse resp) { + List activeMasters = + resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect( + Collectors.toList()); + return activeMasters.size() == 1; + } + + private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOException { + List activeMasters = + resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect( + Collectors.toList()); + if (activeMasters.size() != 1) { + throw new IOException(String.format("Incorrect number of active masters encountered." + + " Expected: 1 found: %d. Content: %s", activeMasters.size(), activeMasters)); + } + return ProtobufUtil.toServerName(activeMasters.get(0).getServerName()); } @Override public CompletableFuture getActiveMaster() { + CompletableFuture future = new CompletableFuture<>(); + addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d), + MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> { + if (ex != null) { + future.completeExceptionally(ex); + } + ServerName result = null; + try { + result = filterActiveMaster((GetMastersResponse)resp); + } catch (IOException e) { + future.completeExceptionally(e); + } + future.complete(result); + }); + return future; + } + + private static List transformServerNames(GetMastersResponse resp) { + return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName( + s.getServerName())).collect(Collectors.toList()); + } + + CompletableFuture> getMasters() { + System.out.println("getMasters()"); return this - . call( - (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d), - GetActiveMasterResponse::hasServerName, "getActiveMaster()") - .thenApply(this::transformServerName); + . call((c, s, d) -> s.getMasters( + c, GetMastersRequest.getDefaultInstance(), d), r -> r.getMasterServersCount() != 0, + "getMasters()").thenApply(MasterRegistry::transformServerNames); } @VisibleForTesting @@ -279,6 +339,9 @@ Set getParsedMasterServers() { @Override public void close() { + if (masterAddressRefresher != null) { + masterAddressRefresher.close(); + } if (rpcClient != null) { rpcClient.close(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index 5d4634492b1f..e9f4c61f5a20 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -58,6 +58,7 @@ public class MetricsConnection implements StatisticTrackable { /** Set this key to {@code true} to enable metrics collection of client requests. */ public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; + private static final String CNT_BASE = "rpcCount_"; private static final String DRTN_BASE = "rpcCallDurationMs_"; private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; @@ -303,6 +304,8 @@ private static interface NewMetric { LOAD_FACTOR, CONCURRENCY_LEVEL); private final ConcurrentMap cacheDroppingExceptions = new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); + @VisibleForTesting protected final ConcurrentMap rpcCounters = + new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); MetricsConnection(String scope, Supplier batchPool, Supplier metaPool) { @@ -434,8 +437,7 @@ private T getMetric(String key, ConcurrentMap map, NewMetric f } /** Update call stats for non-critical-path methods */ - private void updateRpcGeneric(MethodDescriptor method, CallStats stats) { - final String methodName = method.getService().getName() + "_" + method.getName(); + private void updateRpcGeneric(String methodName, CallStats stats) { getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory) .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) @@ -450,6 +452,9 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats) { if (callsPerServer > 0) { concurrentCallsPerServerHist.update(callsPerServer); } + // Update the counter that tracks RPCs by type. + final String methodName = method.getService().getName() + "_" + method.getName(); + getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc(); // this implementation is tied directly to protobuf implementation details. would be better // if we could dispatch based on something static, ie, request Message type. if (method.getService() == ClientService.getDescriptor()) { @@ -511,7 +516,7 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats) { } } // Fallback to dynamic registry lookup for DDL methods. - updateRpcGeneric(method, stats); + updateRpcGeneric(methodName, stats); } public void incrCacheDroppingExceptions(Object exception) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java index 8bbdce64887f..0af01984218d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.security.User; @@ -116,11 +115,20 @@ public boolean hasCellBlockSupport() { } } + /** + * A dummy RpcChannel implementation that intercepts the GetClusterId() RPC calls and injects + * errors. All other RPCs are ignored. + */ public static final class RpcChannelImpl implements RpcChannel { @Override public void callMethod(MethodDescriptor method, RpcController controller, Message request, Message responsePrototype, RpcCallback done) { + if (!method.getName().equals("GetClusterId")) { + // On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list + // fresh. We do not want to intercept those RPCs here and double count. + return; + } // simulate the asynchronous behavior otherwise all logic will perform in the same thread... EXECUTOR.execute(() -> { int index = CALLED.getAndIncrement(); @@ -129,7 +137,7 @@ public void callMethod(MethodDescriptor method, RpcController controller, Messag } else if (GOOD_RESP_INDEXS.contains(index)) { done.run(RESP); } else { - ((HBaseRpcController) controller).setFailed("inject error"); + controller.setFailed("inject error"); done.run(null); } }); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java index 3f4afad8d435..d48806def23d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import com.codahale.metrics.RatioGauge; import com.codahale.metrics.RatioGauge.Ratio; @@ -117,6 +118,11 @@ public void testStaticMetrics() throws IOException { .build(), MetricsConnection.newCallStats()); } + for (String method: new String[]{"Get", "Scan", "Mutate"}) { + final String metricKey = "rpcCount_" + ClientService.getDescriptor().getName() + "_" + method; + final long metricVal = METRICS.rpcCounters.get(metricKey).getCount(); + assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal >= loop); + } for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] { METRICS.getTracker, METRICS.scanTracker, METRICS.multiTracker, METRICS.appendTracker, METRICS.deleteTracker, METRICS.incrementTracker, METRICS.putTracker diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index b70ddef034a3..58ab01d980e6 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -1264,6 +1264,17 @@ message GetActiveMasterResponse { optional ServerName server_name = 1; } +/** Request and response to get the current list of all registers master servers */ +message GetMastersRequest { +} +message GetMastersResponseEntry { + required ServerName server_name = 1; + required bool is_active = 2; +} +message GetMastersResponse { + repeated GetMastersResponseEntry master_servers = 1; +} + /** Request and response to get the current list of meta region locations */ message GetMetaRegionLocationsRequest { } @@ -1273,7 +1284,8 @@ message GetMetaRegionLocationsResponse { } /** - * Implements all the RPCs needed by clients to look up cluster meta information needed for connection establishment. + * Implements all the RPCs needed by clients to look up cluster meta information needed for + * connection establishment. */ service ClientMetaService { /** @@ -1282,10 +1294,16 @@ service ClientMetaService { rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse); /** - * Get active master server name for this cluster. + * Get active master server name for this cluster. Retained for out of sync client and master + * rolling upgrades. Newer clients switched to GetMasters RPC request. */ rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse); + /** + * Get registered list of master servers in this cluster. + */ + rpc GetMasters(GetMastersRequest) returns(GetMastersResponse); + /** * Get current meta replicas' region locations. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java index 606741b0ef29..34e7f75c487a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.master; import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.Server; @@ -34,12 +36,14 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** - * Handles everything on master-side related to master election. + * Handles everything on master-side related to master election. Keeps track of + * currently active master and registered backup masters. * - *

Listens and responds to ZooKeeper notifications on the master znode, + *

Listens and responds to ZooKeeper notifications on the master znodes, * both nodeCreated and nodeDeleted. * *

Contains blocking methods which will hold up backup masters, waiting @@ -65,17 +69,22 @@ public class ActiveMasterManager extends ZKListener { // notifications) and lazily fetched on-demand. // ServerName is immutable, so we don't need heavy synchronization around it. volatile ServerName activeMasterServerName; + // Registered backup masters. List is kept up to date based on ZK change notifications to + // backup znode. + private volatile ImmutableList backupMasters; /** * @param watcher ZK watcher * @param sn ServerName * @param master In an instance of a Master. */ - ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master) { + ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master) + throws InterruptedIOException { super(watcher); watcher.registerListener(this); this.sn = sn; this.master = master; + updateBackupMasters(); } // will be set after jetty server is started @@ -89,8 +98,18 @@ public void nodeCreated(String path) { } @Override - public void nodeDeleted(String path) { + public void nodeChildrenChanged(String path) { + if (path.equals(watcher.getZNodePaths().backupMasterAddressesZNode)) { + try { + updateBackupMasters(); + } catch (InterruptedIOException ioe) { + LOG.error("Error updating backup masters", ioe); + } + } + } + @Override + public void nodeDeleted(String path) { // We need to keep track of the cluster's shutdown status while // we wait on the current master. We consider that, if the cluster // was already in a "shutdown" state when we started, that this master @@ -101,7 +120,6 @@ public void nodeDeleted(String path) { if(path.equals(watcher.getZNodePaths().clusterStateZNode) && !master.isStopped()) { clusterShutDown.set(true); } - handle(path); } @@ -111,6 +129,11 @@ void handle(final String path) { } } + private void updateBackupMasters() throws InterruptedIOException { + backupMasters = + ImmutableList.copyOf(MasterAddressTracker.getBackupMastersAndRenewWatch(watcher)); + } + /** * Fetches the active master's ServerName from zookeeper. */ @@ -318,4 +341,11 @@ public void stop() { e.getMessage())); } } + + /** + * @return list of registered backup masters. + */ + public List getBackupMasters() { + return backupMasters; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index afdec60e0560..a70dc482ca4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -91,7 +91,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.http.InfoServer; @@ -223,7 +222,6 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; @@ -235,8 +233,6 @@ import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector; import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder; import org.apache.hbase.thirdparty.org.eclipse.jetty.webapp.WebAppContext; - -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; @@ -610,8 +606,8 @@ public HMaster(final Configuration conf) throws IOException { * Protected to have custom implementations in tests override the default ActiveMaster * implementation. */ - protected ActiveMasterManager createActiveMasterManager( - ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) { + protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn, + org.apache.hadoop.hbase.Server server) throws InterruptedIOException { return new ActiveMasterManager(zk, sn, server); } @@ -2731,51 +2727,8 @@ public ClusterMetrics getClusterMetrics(EnumSet