From 14be37c38442698288c22f5e4051faf9aa3a1e44 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Thu, 3 Feb 2022 18:14:06 -0800 Subject: [PATCH 1/4] HBASE-26730 Extend hbase shell 'status' command to support an option 'tasks' Expose monitored tasks state in ClusterStatus API via new option in ServerLoad. Add shell support for interrogating monitored tasks state in ServerLoad. --- .../apache/hadoop/hbase/ClusterMetrics.java | 10 ++ .../hadoop/hbase/ClusterMetricsBuilder.java | 29 +++- .../apache/hadoop/hbase/ServerMetrics.java | 7 + .../hadoop/hbase/ServerMetricsBuilder.java | 35 ++++- .../org/apache/hadoop/hbase/ServerTask.java | 59 ++++++++ .../hadoop/hbase/ServerTaskBuilder.java | 126 ++++++++++++++++++ .../hbase/shaded/protobuf/ProtobufUtil.java | 22 +++ .../main/protobuf/server/ClusterStatus.proto | 22 +++ .../apache/hadoop/hbase/master/HMaster.java | 63 +++++++-- .../master/balancer/ClusterStatusChore.java | 2 +- .../hbase/regionserver/HRegionServer.java | 11 ++ .../hbase/TestClientClusterMetrics.java | 60 ++++++++- .../master/TestRegionsRecoveryChore.java | 11 ++ hbase-shell/src/main/ruby/hbase/admin.rb | 49 ++++++- .../src/main/ruby/shell/commands/status.rb | 5 +- 15 files changed, 481 insertions(+), 30 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ServerTask.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ServerTaskBuilder.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterMetrics.java index 497ab938856b..29679e6fb6f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterMetrics.java @@ -161,6 +161,12 @@ default double getAverageLoad() { */ Map getTableRegionStatesCount(); + /** + * Provide the list of master tasks + */ + @Nullable + List getMasterTasks(); + /** * Kinds of ClusterMetrics */ @@ -213,5 +219,9 @@ enum Option { * metrics about table to no of regions status count */ TABLE_TO_REGIONS_COUNT, + /** + * metrics about monitored tasks + */ + TASKS, } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterMetricsBuilder.java index 493fe71b8b0f..011f93f9fe90 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ClusterMetricsBuilder.java @@ -83,6 +83,10 @@ public static ClusterStatusProtos.ClusterStatus toClusterStatus(ClusterMetrics m if (metrics.getMasterName() != null) { builder.setMaster(ProtobufUtil.toServerName((metrics.getMasterName()))); } + if (metrics.getMasterTasks() != null) { + builder.addAllMasterTasks(metrics.getMasterTasks().stream() + .map(t -> ProtobufUtil.toServerTask(t)).collect(Collectors.toList())); + } if (metrics.getBalancerOn() != null) { builder.setBalancerOn(metrics.getBalancerOn()); } @@ -122,7 +126,9 @@ public static ClusterMetrics toClusterMetrics( proto.getTableRegionStatesCountList().stream() .collect(Collectors.toMap( e -> ProtobufUtil.toTableName(e.getTableName()), - e -> ProtobufUtil.toTableRegionStatesCount(e.getRegionStatesCount())))); + e -> ProtobufUtil.toTableRegionStatesCount(e.getRegionStatesCount())))) + .setMasterTasks(proto.getMasterTasksList().stream() + .map(t -> ProtobufUtil.getServerTask(t)).collect(Collectors.toList())); if (proto.hasClusterId()) { builder.setClusterId(ClusterId.convert(proto.getClusterId()).toString()); } @@ -164,6 +170,7 @@ public static ClusterMetrics.Option toOption(ClusterStatusProtos.Option option) case SERVERS_NAME: return ClusterMetrics.Option.SERVERS_NAME; case MASTER_INFO_PORT: return ClusterMetrics.Option.MASTER_INFO_PORT; case TABLE_TO_REGIONS_COUNT: return ClusterMetrics.Option.TABLE_TO_REGIONS_COUNT; + case TASKS: return ClusterMetrics.Option.TASKS; // should not reach here default: throw new IllegalArgumentException("Invalid option: " + option); } @@ -188,6 +195,7 @@ public static ClusterStatusProtos.Option toOption(ClusterMetrics.Option option) case SERVERS_NAME: return Option.SERVERS_NAME; case MASTER_INFO_PORT: return ClusterStatusProtos.Option.MASTER_INFO_PORT; case TABLE_TO_REGIONS_COUNT: return ClusterStatusProtos.Option.TABLE_TO_REGIONS_COUNT; + case TASKS: return ClusterStatusProtos.Option.TASKS; // should not reach here default: throw new IllegalArgumentException("Invalid option: " + option); } @@ -231,6 +239,8 @@ public static ClusterMetricsBuilder newBuilder() { private int masterInfoPort; private List serversName = Collections.emptyList(); private Map tableRegionStatesCount = Collections.emptyMap(); + @Nullable + private List masterTasks; private ClusterMetricsBuilder() { } @@ -280,6 +290,10 @@ public ClusterMetricsBuilder setServerNames(List serversName) { this.serversName = serversName; return this; } + public ClusterMetricsBuilder setMasterTasks(List masterTasks) { + this.masterTasks = masterTasks; + return this; + } public ClusterMetricsBuilder setTableRegionStatesCount( Map tableRegionStatesCount) { @@ -300,7 +314,8 @@ public ClusterMetrics build() { balancerOn, masterInfoPort, serversName, - tableRegionStatesCount + tableRegionStatesCount, + masterTasks ); } private static class ClusterMetricsImpl implements ClusterMetrics { @@ -320,6 +335,7 @@ private static class ClusterMetricsImpl implements ClusterMetrics { private final int masterInfoPort; private final List serversName; private final Map tableRegionStatesCount; + private final List masterTasks; ClusterMetricsImpl(String hbaseVersion, List deadServerNames, Map liveServerMetrics, @@ -331,7 +347,8 @@ private static class ClusterMetricsImpl implements ClusterMetrics { Boolean balancerOn, int masterInfoPort, List serversName, - Map tableRegionStatesCount) { + Map tableRegionStatesCount, + List masterTasks) { this.hbaseVersion = hbaseVersion; this.deadServerNames = Preconditions.checkNotNull(deadServerNames); this.liveServerMetrics = Preconditions.checkNotNull(liveServerMetrics); @@ -344,6 +361,7 @@ private static class ClusterMetricsImpl implements ClusterMetrics { this.masterInfoPort = masterInfoPort; this.serversName = serversName; this.tableRegionStatesCount = Preconditions.checkNotNull(tableRegionStatesCount); + this.masterTasks = masterTasks; } @Override @@ -406,6 +424,11 @@ public Map getTableRegionStatesCount() { return Collections.unmodifiableMap(tableRegionStatesCount); } + @Override + public List getMasterTasks() { + return masterTasks; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(1024); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java index 9b2dc409acdb..ddd4b2ec03f5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java @@ -124,4 +124,11 @@ default String getVersion() { */ long getLastReportTimestamp(); + /** + * Called directly from clients such as the hbase shell + * @return the active monitored tasks + */ + @Nullable + List getTasks(); + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java index 05d118ff459b..dd2e836487f8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java @@ -87,6 +87,8 @@ public static ServerMetrics toServerMetrics(ServerName serverName, int versionNu .setReplicationLoadSink(serverLoadPB.hasReplLoadSink() ? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink()) : null) + .setTasks(serverLoadPB.getTasksList().stream() + .map(ProtobufUtil::getServerTask).collect(Collectors.toList())) .setReportTimestamp(serverLoadPB.getReportEndTime()) .setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber) .setVersion(version).build(); @@ -105,19 +107,24 @@ public static ClusterStatusProtos.ServerLoad toServerLoad(ServerMetrics metrics) .setInfoServerPort(metrics.getInfoServerPort()) .setMaxHeapMB((int) metrics.getMaxHeapSize().get(Size.Unit.MEGABYTE)) .setUsedHeapMB((int) metrics.getUsedHeapSize().get(Size.Unit.MEGABYTE)) - .addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames())).addAllRegionLoads( + .addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames())) + .addAllRegionLoads( metrics.getRegionMetrics().values().stream().map(RegionMetricsBuilder::toRegionLoad) - .collect(Collectors.toList())).addAllUserLoads( + .collect(Collectors.toList())) + .addAllUserLoads( metrics.getUserMetrics().values().stream().map(UserMetricsBuilder::toUserMetrics) - .collect(Collectors.toList())).addAllReplLoadSource( + .collect(Collectors.toList())) + .addAllReplLoadSource( metrics.getReplicationLoadSourceList().stream() .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList())) + .addAllTasks( + metrics.getTasks().stream().map(ProtobufUtil::toServerTask) + .collect(Collectors.toList())) .setReportStartTime(metrics.getLastReportTimestamp()) .setReportEndTime(metrics.getReportTimestamp()); if (metrics.getReplicationLoadSink() != null) { builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(metrics.getReplicationLoadSink())); } - return builder.build(); } @@ -143,6 +150,8 @@ public static ServerMetricsBuilder newBuilder(ServerName sn) { private final Set coprocessorNames = new TreeSet<>(); private long reportTimestamp = EnvironmentEdgeManager.currentTime(); private long lastReportTimestamp = 0; + private final List tasks = new ArrayList<>(); + private ServerMetricsBuilder(ServerName serverName) { this.serverName = serverName; } @@ -228,6 +237,11 @@ public ServerMetricsBuilder setLastReportTimestamp(long value) { return this; } + public ServerMetricsBuilder setTasks(List tasks) { + this.tasks.addAll(tasks); + return this; + } + public ServerMetrics build() { return new ServerMetricsImpl( serverName, @@ -246,7 +260,8 @@ public ServerMetrics build() { coprocessorNames, reportTimestamp, lastReportTimestamp, - userMetrics); + userMetrics, + tasks); } private static class ServerMetricsImpl implements ServerMetrics { @@ -268,13 +283,15 @@ private static class ServerMetricsImpl implements ServerMetrics { private final long reportTimestamp; private final long lastReportTimestamp; private final Map userMetrics; + private final List tasks; ServerMetricsImpl(ServerName serverName, int versionNumber, String version, long requestCountPerSecond, long requestCount, long readRequestsCount, long writeRequestsCount, Size usedHeapSize, Size maxHeapSize, int infoServerPort, List sources, ReplicationLoadSink sink, Map regionStatus, Set coprocessorNames, - long reportTimestamp, long lastReportTimestamp, Map userMetrics) { + long reportTimestamp, long lastReportTimestamp, Map userMetrics, + List tasks) { this.serverName = Preconditions.checkNotNull(serverName); this.versionNumber = versionNumber; this.version = version; @@ -292,6 +309,7 @@ private static class ServerMetricsImpl implements ServerMetrics { this.coprocessorNames =Preconditions.checkNotNull(coprocessorNames); this.reportTimestamp = reportTimestamp; this.lastReportTimestamp = lastReportTimestamp; + this.tasks = tasks; } @Override @@ -388,6 +406,11 @@ public long getLastReportTimestamp() { return lastReportTimestamp; } + @Override + public List getTasks() { + return tasks; + } + @Override public String toString() { int storeCount = 0; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerTask.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerTask.java new file mode 100644 index 000000000000..eb322543e052 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerTask.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.yetus.audience.InterfaceAudience; + +/** Information about active monitored server tasks */ +@InterfaceAudience.Public +public interface ServerTask { + + /** Task state */ + enum State { + RUNNING, + WAITING, + COMPLETE, + ABORTED; + } + + /** + * @return the task's description, typically a name + */ + String getDescription(); + + /** + * @return the task's current status + */ + String getStatus(); + + /** + * @return the task's current state + */ + State getState(); + + /** + * @return the time when the task started, or 0 if it has not started yet + */ + long getStartTime(); + + /** + * @return the time when the task completed, or 0 if it has not completed yet + */ + long getCompletionTime(); + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerTaskBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerTaskBuilder.java new file mode 100644 index 000000000000..ed025316a11d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerTaskBuilder.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.yetus.audience.InterfaceAudience; + +/** Builder for information about active monitored server tasks */ +@InterfaceAudience.Private +public final class ServerTaskBuilder { + + public static ServerTaskBuilder newBuilder() { + return new ServerTaskBuilder(); + } + + private String description = ""; + private String status = ""; + private ServerTask.State state = ServerTask.State.RUNNING; + private long startTime; + private long completionTime; + + private ServerTaskBuilder() { } + + private static class ServerTaskImpl implements ServerTask { + + private final String description; + private final String status; + private final ServerTask.State state; + private final long startTime; + private final long completionTime; + + private ServerTaskImpl(final String description, final String status, + final ServerTask.State state, final long startTime, final long completionTime) { + this.description = description; + this.status = status; + this.state = state; + this.startTime = startTime; + this.completionTime = completionTime; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public String getStatus() { + return status; + } + + @Override + public State getState() { + return state; + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public long getCompletionTime() { + return completionTime; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(512); + sb.append(getDescription()); + sb.append(": status="); + sb.append(getStatus()); + sb.append(", state="); + sb.append(getState()); + sb.append(", startTime="); + sb.append(getStartTime()); + sb.append(", completionTime="); + sb.append(getCompletionTime()); + return sb.toString(); + } + + } + + public ServerTaskBuilder setDescription(final String description) { + this.description = description; + return this; + } + + public ServerTaskBuilder setStatus(final String status) { + this.status = status; + return this; + } + + public ServerTaskBuilder setState(final ServerTask.State state) { + this.state = state; + return this; + } + + public ServerTaskBuilder setStartTime(final long startTime) { + this.startTime = startTime; + return this; + } + + public ServerTaskBuilder setCompletionTime(final long completionTime) { + this.completionTime = completionTime; + return this; + } + + public ServerTask build() { + return new ServerTaskImpl(description, status, state, startTime, completionTime); + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 9d853d0e4bac..4ba0631c4e9d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -67,6 +67,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ServerTask; +import org.apache.hadoop.hbase.ServerTaskBuilder; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.BalanceResponse; @@ -3905,4 +3907,24 @@ public static BalanceResponse toBalanceResponse(MasterProtos.BalanceResponse res .build(); } + public static ServerTask getServerTask(ClusterStatusProtos.ServerTask task) { + return ServerTaskBuilder.newBuilder() + .setDescription(task.getDescription()) + .setStatus(task.getStatus()) + .setState(ServerTask.State.valueOf(task.getState().name())) + .setStartTime(task.getStartTime()) + .setCompletionTime(task.getCompletionTime()) + .build(); + } + + public static ClusterStatusProtos.ServerTask toServerTask(ServerTask task) { + return ClusterStatusProtos.ServerTask.newBuilder() + .setDescription(task.getDescription()) + .setStatus(task.getStatus()) + .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name())) + .setStartTime(task.getStartTime()) + .setCompletionTime(task.getCompletionTime()) + .build(); + } + } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto index fdfa2bcdce63..537b96b620dd 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto @@ -229,6 +229,21 @@ message ReplicationLoadSource { optional uint64 oPsShipped = 12; } +message ServerTask { + required string description = 1; + required string status = 2; + required State state = 3; + optional uint64 startTime = 4; + optional uint64 completionTime = 5; + + enum State { + RUNNING = 0; + WAITING = 1; + COMPLETE = 2; + ABORTED = 3; + } +} + message ServerLoad { /** Number of requests since last report. */ optional uint64 number_of_requests = 1; @@ -295,6 +310,11 @@ message ServerLoad { * The metrics for write requests on this region server */ optional uint64 write_requests_count = 14; + + /** + * The active monitored tasks + */ + repeated ServerTask tasks = 15; } message LiveServerInfo { @@ -328,6 +348,7 @@ message ClusterStatus { optional int32 master_info_port = 10 [default = -1]; repeated ServerName servers_name = 11; repeated TableRegionStatesCount table_region_states_count = 12; + repeated ServerTask master_tasks = 13; } enum Option { @@ -343,4 +364,5 @@ enum Option { MASTER_INFO_PORT = 9; SERVERS_NAME = 10; TABLE_TO_REGIONS_COUNT = 11; + TASKS = 12; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 8dd49ff4d04f..6174c3771d04 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -80,6 +80,8 @@ import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ServerTask; +import org.apache.hadoop.hbase.ServerTaskBuilder; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; @@ -1006,7 +1008,7 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc // initialize load balancer this.balancer.setMasterServices(this); this.balancer.initialize(); - this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor()); + this.balancer.updateClusterMetrics(getClusterMetricsInternal()); // start up all service threads. status.setStatus("Initializing master service threads"); @@ -1094,7 +1096,7 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc } // set cluster status again after user regions are assigned - this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor()); + this.balancer.updateClusterMetrics(getClusterMetricsInternal()); // Start balancer and meta catalog janitor after meta and regions have been assigned. status.setStatus("Starting balancer and catalog janitor"); @@ -1916,7 +1918,7 @@ public BalanceResponse balance(BalanceRequest request) throws IOException { } //Give the balancer the current cluster state. - this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor()); + this.balancer.updateClusterMetrics(getClusterMetricsInternal()); List plans = this.balancer.balanceCluster(assignments); @@ -2724,11 +2726,11 @@ public void checkTableModifiable(final TableName tableName) } } - public ClusterMetrics getClusterMetricsWithoutCoprocessor() throws InterruptedIOException { - return getClusterMetricsWithoutCoprocessor(EnumSet.allOf(Option.class)); + public ClusterMetrics getClusterMetricsInternal() throws InterruptedIOException { + return getClusterMetricsInternal(EnumSet.allOf(Option.class)); } - public ClusterMetrics getClusterMetricsWithoutCoprocessor(EnumSet