From c56cdb7fccd6d8e3ba4c2da2b13717703ed2e485 Mon Sep 17 00:00:00 2001 From: huiruan Date: Sun, 20 Mar 2022 13:37:23 +0800 Subject: [PATCH 1/5] HBASE-26867 Introduce a FlushProcedure --- .../hbase/client/RawAsyncHBaseAdmin.java | 87 ++++++--- .../shaded/protobuf/RequestConverter.java | 11 ++ .../main/protobuf/server/master/Master.proto | 14 ++ .../server/master/MasterProcedure.proto | 20 ++ .../hadoop/hbase/executor/EventType.java | 9 +- .../hadoop/hbase/executor/ExecutorType.java | 3 +- .../apache/hadoop/hbase/master/HMaster.java | 30 +++ .../hbase/master/MasterRpcServices.java | 15 ++ .../hadoop/hbase/master/MasterServices.java | 13 ++ .../AbstractRegionRemoteProcedure.java | 184 ++++++++++++++++++ .../procedure/FlushRegionProcedure.java | 110 +++++++++++ .../master/procedure/FlushTableProcedure.java | 171 ++++++++++++++++ .../procedure/TableProcedureInterface.java | 2 +- .../hbase/master/procedure/TableQueue.java | 1 + .../MasterFlushTableProcedureManager.java | 4 + .../regionserver/FlushRegionCallable.java | 67 +++++++ .../hbase/regionserver/HRegionServer.java | 4 + .../hbase/master/MockNoopMasterServices.java | 6 + .../procedure/TestFlushTableProcedure.java | 88 +++++++++ .../TestFlushTableProcedureBasicFlush.java | 84 ++++++++ ...edureWithDoNotSupportFlushTableMaster.java | 49 +++++ 21 files changed, 946 insertions(+), 26 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractRegionRemoteProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBasicFlush.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 572eb0960ea1..07ee1f4b660d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.ClusterMetricsBuilder; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NamespaceDescriptor; @@ -178,6 +179,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; @@ -927,33 +930,59 @@ public CompletableFuture flush(TableName tableName) { @Override public CompletableFuture flush(TableName tableName, byte[] columnFamily) { + // This is for keeping compatibility with old implementation. + // If the server version is lower than the client version, it's possible that the + // flushTable method is not present in the server side, if so, we need to fall back + // to the old implementation. + FlushTableRequest request = RequestConverter + .buildFlushTableRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); + CompletableFuture procFuture = + this.procedureCall(tableName, request, + (s, c, req, done) -> s.flushTable(c, req, done), (resp) -> resp.getProcId(), + new FlushTableProcedureBiConsumer(tableName)); + // here we use another new CompletableFuture because the + // procFuture is not fully controlled by ourselves. CompletableFuture future = new CompletableFuture<>(); - addListener(tableExists(tableName), (exists, err) -> { - if (err != null) { - future.completeExceptionally(err); - } else if (!exists) { - future.completeExceptionally(new TableNotFoundException(tableName)); - } else { - addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else if (!tableEnabled) { - future.completeExceptionally(new TableNotEnabledException(tableName)); - } else { - Map props = new HashMap<>(); - if (columnFamily != null) { - props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); - } - addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), - props), (ret, err3) -> { - if (err3 != null) { - future.completeExceptionally(err3); + addListener(procFuture, (ret, error) -> { + if (error != null) { + if (error instanceof DoNotRetryIOException) { + // usually this is caused by the method is not present on the server or + // the hbase hadoop version does not match the running hadoop version. + // if that happens, we need fall back to the old flush implementation. + LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error); + addListener(tableExists(tableName), (exists, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (!exists) { + future.completeExceptionally(new TableNotFoundException(tableName)); + } else { + addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else if (!tableEnabled) { + future.completeExceptionally(new TableNotEnabledException(tableName)); } else { - future.complete(ret); + Map props = new HashMap<>(); + if (columnFamily != null) { + props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); + } + addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, + tableName.getNameAsString(), props), (ret2, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(ret2); + } + }); } }); - } - }); + } + }); + } else { + future.completeExceptionally(error); + } + } else { + future.complete(ret); } }); return future; @@ -2628,6 +2657,18 @@ String getOperationType() { } } + private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer { + + FlushTableProcedureBiConsumer(TableName tableName) { + super(tableName); + } + + @Override + String getOperationType() { + return "FLUSH"; + } + } + private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 3008956d7517..9b473dfe9f46 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; @@ -813,6 +814,16 @@ public static GetOnlineRegionRequest buildGetOnlineRegionRequest() { return GetOnlineRegionRequest.newBuilder().build(); } + public static FlushTableRequest buildFlushTableRequest(final TableName tableName, + final byte[] columnFamily, final long nonceGroup, final long nonce) { + FlushTableRequest.Builder builder = FlushTableRequest.newBuilder(); + builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); + if (columnFamily != null) { + builder.setColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + return builder.setNonceGroup(nonceGroup).setNonce(nonce).build(); + } + /** * Create a protocol buffer FlushRegionRequest for a given region name * @param regionName the name of the region to get info diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index 94a434755cff..88e872750cd1 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -200,6 +200,17 @@ message ModifyTableResponse { optional uint64 proc_id = 1; } +message FlushTableRequest { + required TableName table_name = 1; + optional bytes column_family = 2; + optional uint64 nonce_group = 3 [default = 0]; + optional uint64 nonce = 4 [default = 0]; +} + +message FlushTableResponse { + optional uint64 proc_id = 1; +} + /* Namespace-level protobufs */ message CreateNamespaceRequest { @@ -1197,6 +1208,9 @@ service MasterService { rpc ModifyColumnStoreFileTracker(ModifyColumnStoreFileTrackerRequest) returns(ModifyColumnStoreFileTrackerResponse); + + rpc FlushTable(FlushTableRequest) + returns(FlushTableResponse); } // HBCK Service definitions. diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 35125a5a94e6..3ecd9f6cda70 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -191,6 +191,26 @@ message RestoreParentToChildRegionsPair { required string child2_region_name = 3; } +enum FlushTableState { + FLUSH_TABLE_PREPARE = 1; + FLUSH_TABLE_FLUSH_REGIONS = 2; +} + +message FlushTableProcedureStateData { + required TableName table_name = 1; + optional bytes column_family = 2; +} + +message FlushRegionProcedureStateData { + required RegionInfo region = 1; + optional bytes column_family = 2; +} + +message FlushRegionParameter { + required RegionInfo region = 1; + optional bytes column_family = 2; +} + enum SnapshotState { SNAPSHOT_PREPARE = 1; SNAPSHOT_PRE_OPERATION = 2; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index 0b608be369a3..53985e2987ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -315,7 +315,14 @@ public enum EventType { * * RS_VERIFY_SNAPSHOT */ - RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS); + RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS), + + /** + * RS flush regions.
+ * + * RS_FLUSH_OPERATIONS + */ + RS_FLUSH_REGIONS(89, ExecutorType.RS_FLUSH_OPERATIONS); private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index cbecb3e8619f..dd88315c86b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -54,7 +54,8 @@ public enum ExecutorType { RS_SWITCH_RPC_THROTTLE(33), RS_IN_MEMORY_COMPACTION(34), RS_CLAIM_REPLICATION_QUEUE(35), - RS_SNAPSHOT_OPERATIONS(36); + RS_SNAPSHOT_OPERATIONS(36), + RS_FLUSH_OPERATIONS(37); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 3d9e149ac395..a623019d31db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -142,6 +142,7 @@ import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure; import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -2619,6 +2620,35 @@ protected String getDescription() { }); } + @Override + public long flushTable(TableName tableName, + byte[] columnFamily, long nonceGroup, long nonce) throws IOException { + checkInitialized(); + + if (!getConfiguration().getBoolean( + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)) { + throw new DoNotRetryIOException("FlushProcedure is DISABLED"); + } + + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preTableFlush(tableName); + LOG.info(getClientIdAuditPrefix() + " flush " + tableName); + submitProcedure(new FlushTableProcedure(procedureExecutor.getEnvironment(), + tableName, columnFamily)); + getMaster().getMasterCoprocessorHost().postTableFlush(tableName); + } + + @Override + protected String getDescription() { + return "FlushTableProcedure"; + } + }); + } + private long modifyTable(final TableName tableName, final TableDescriptorGetter newDescriptorGetter, final long nonceGroup, final long nonce, final boolean shouldCheckDescriptor) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index abfc45bcf1a1..f4c63c4612bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -228,6 +228,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; @@ -3541,4 +3543,17 @@ public ReplicateWALEntryResponse replicateToReplica(RpcController controller, ReplicateWALEntryRequest request) throws ServiceException { throw new ServiceException(new DoNotRetryIOException("Unsupported method on master")); } + + @Override + public FlushTableResponse flushTable(RpcController controller, + FlushTableRequest req) throws ServiceException { + TableName tableName = ProtobufUtil.toTableName(req.getTableName()); + byte[] columnFamily = req.hasColumnFamily() ? req.getColumnFamily().toByteArray() : null; + try { + long procId = server.flushTable(tableName, columnFamily, req.getNonceGroup(), req.getNonce()); + return FlushTableResponse.newBuilder().setProcId(procId).build(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 85d9a11a80ff..a0e5f498218e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -253,6 +253,19 @@ long disableTable( final long nonceGroup, final long nonce) throws IOException; + /** + * Flush an existing table + * @param tableName The table name + * @param columnFamily The column family + * @param nonceGroup the nonce group + * @param nonce the nonce + * @return the flush procedure id + */ + long flushTable( + final TableName tableName, + final byte[] columnFamily, + final long nonceGroup, + final long nonce) throws IOException; /** * Add a new column to an existing table diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractRegionRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractRegionRemoteProcedure.java new file mode 100644 index 000000000000..770475ed7266 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractRegionRemoteProcedure.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.assignment.RegionStateNode; +import org.apache.hadoop.hbase.master.assignment.RegionStates; +import org.apache.hadoop.hbase.master.assignment.ServerState; +import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * The base class for the remote procedures for normal operations, like flush or snapshot. + * Normal operations do not change the region state. This is the difference between + * {@link org.apache.hadoop.hbase.master.assignment.RegionRemoteProcedureBase} and + * {@link org.apache.hadoop.hbase.master.procedure.AbstractRegionRemoteProcedure}. + * It requires that the state of the region must be OPEN. If region is in transition state, + * the procedure will suspend and retry later. + */ +@InterfaceAudience.Private +public abstract class AbstractRegionRemoteProcedure extends Procedure + implements TableProcedureInterface, RemoteProcedure { + private static final Logger LOG = LoggerFactory.getLogger(AbstractRegionRemoteProcedure.class); + + protected RegionInfo region; + + private ProcedureEvent event; + private boolean dispatched; + private boolean succ; + private RetryCounter retryCounter; + + public AbstractRegionRemoteProcedure() { + } + + public AbstractRegionRemoteProcedure(RegionInfo region) { + this.region = region; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + dispatched = false; + } + + RegionStates regionStates = env.getAssignmentManager().getRegionStates(); + RegionStateNode regionNode = regionStates.getRegionStateNode(region); + regionNode.lock(); + try { + if (regionNode.isInTransition()) { + setTimeoutForSuspend(env, String.format("region %s has a TRSP attached %s", + region.getRegionNameAsString(), regionNode.getProcedure())); + throw new ProcedureSuspendedException(); + } + if (!regionNode.isInState(RegionState.State.OPEN)) { + setTimeoutForSuspend(env, String.format("region state of %s is %s", + region.getRegionNameAsString(), regionNode.getState())); + throw new ProcedureSuspendedException(); + } + ServerName targetServer = regionNode.getRegionLocation(); + if (targetServer == null) { + setTimeoutForSuspend(env, String.format("target server of region %s is null", + region.getRegionNameAsString())); + throw new ProcedureSuspendedException(); + } + ServerState serverState = regionStates.getServerNode(targetServer).getState(); + if (serverState != ServerState.ONLINE) { + setTimeoutForSuspend(env, String.format("target server of region %s %s is in state %s", + region.getRegionNameAsString(), targetServer, serverState)); + throw new ProcedureSuspendedException(); + } + try { + env.getRemoteDispatcher().addOperationToNode(targetServer, this); + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } catch (FailedRemoteDispatchException e) { + setTimeoutForSuspend(env, "Failed send request to " + targetServer); + throw new ProcedureSuspendedException(); + } + } finally { + regionNode.unlock(); + } + } + + private void setTimeoutForSuspend(MasterProcedureEnv env, String reason) { + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.warn("{} can not run currently because {}, wait {} ms to retry", this, reason, backoff); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOException e) { + complete(env, e); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + complete(env, error); + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (isFinished()) { + LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId()); + return; + } + if (event == null) { + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); + return; + } + if (error == null) { + succ = true; + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public TableName getTableName() { + return region.getTable(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java new file mode 100644 index 000000000000..6556095bf6fb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.assignment.RegionStateNode; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.regionserver.FlushRegionCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionProcedureStateData; + +@InterfaceAudience.Private +public class FlushRegionProcedure extends AbstractRegionRemoteProcedure { + private static final Logger LOG = LoggerFactory.getLogger(FlushRegionProcedure.class); + + private byte[] columnFamily; + + public FlushRegionProcedure() { + } + + public FlushRegionProcedure(RegionInfo region) { + this(region, null); + } + + public FlushRegionProcedure(RegionInfo region, byte[] columnFamily) { + super(region); + this.columnFamily = columnFamily; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + RegionStateNode regionNode = + env.getAssignmentManager().getRegionStates().getRegionStateNode(region); + if (!regionNode.isInState(State.OPEN) || regionNode.isInTransition()) { + LOG.info("State of region {} is not OPEN or in transition. Skip {} ...", region, this); + return null; + } + return super.execute(env); + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + FlushRegionProcedureStateData.Builder builder = FlushRegionProcedureStateData.newBuilder(); + builder.setRegion(ProtobufUtil.toRegionInfo(region)); + if (columnFamily != null) { + builder.setColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + FlushRegionProcedureStateData data = + serializer.deserialize(FlushRegionProcedureStateData.class); + this.region = ProtobufUtil.toRegionInfo(data.getRegion()); + if (data.hasColumnFamily()) { + this.columnFamily = data.getColumnFamily().toByteArray(); + } + } + + @Override + public Optional remoteCallBuild(MasterProcedureEnv env, ServerName serverName) { + FlushRegionParameter.Builder builder = FlushRegionParameter.newBuilder(); + builder.setRegion(ProtobufUtil.toRegionInfo(region)); + if (columnFamily != null) { + builder.setColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), + FlushRegionCallable.class, builder.build().toByteArray())); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.FLUSH; + } + + @Override + protected boolean waitInitialized(MasterProcedureEnv env) { + return env.waitInitialized(this); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java new file mode 100644 index 000000000000..6d5cb3e3f1c6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableProcedureStateData; + +@InterfaceAudience.Private +public class FlushTableProcedure + extends AbstractStateMachineTableProcedure { + private static final Logger LOG = LoggerFactory.getLogger(FlushTableProcedure.class); + + private TableName tableName; + + private byte[] columnFamily; + + public FlushTableProcedure() { + super(); + } + + public FlushTableProcedure(MasterProcedureEnv env, TableName tableName) { + this(env, tableName, null); + } + + public FlushTableProcedure(MasterProcedureEnv env, TableName tableName, byte[] columnFamily) { + super(env); + this.tableName = tableName; + this.columnFamily = columnFamily; + } + + @Override + protected LockState acquireLock(MasterProcedureEnv env) { + // Here we don't acquire table lock because the flush operation and other operations (like + // split or merge) are not mutually exclusive. Region will flush memstore when being closed. + // It's safe even if we don't have lock. + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(MasterProcedureEnv env) { + // nothing to do since we don't acquire lock + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, FlushTableState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + LOG.info("{} execute state={}", this, state); + + try { + switch (state) { + case FLUSH_TABLE_PREPARE: + preflightChecks(env, true); + setNextState(FlushTableState.FLUSH_TABLE_FLUSH_REGIONS); + return Flow.HAS_MORE_STATE; + case FLUSH_TABLE_FLUSH_REGIONS: + addChildProcedure(createFlushRegionProcedures(env)); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (IOException e) { + setFailure("master-flush-table", e); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(MasterProcedureEnv env, FlushTableState flushTableState) + throws IOException, InterruptedException { + } + + @Override + protected FlushTableState getState(int stateId) { + return FlushTableState.forNumber(stateId); + } + + @Override + protected int getStateId(FlushTableState state) { + return state.getNumber(); + } + + @Override + protected FlushTableState getInitialState() { + return FlushTableState.FLUSH_TABLE_PREPARE; + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.FLUSH; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + FlushTableProcedureStateData.Builder builder = FlushTableProcedureStateData.newBuilder(); + builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); + if (columnFamily != null) { + builder.setColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + FlushTableProcedureStateData data = serializer.deserialize(FlushTableProcedureStateData.class); + this.tableName = ProtobufUtil.toTableName(data.getTableName()); + if (data.hasColumnFamily()) { + this.columnFamily = data.getColumnFamily().toByteArray(); + } + } + + private FlushRegionProcedure[] createFlushRegionProcedures(MasterProcedureEnv env) { + return env.getAssignmentManager().getTableRegions(getTableName(), true) + .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)) + .map(r -> new FlushRegionProcedure(r, columnFamily)) + .toArray(FlushRegionProcedure[]::new); + } + + @Override + public void toStringClassDetails(StringBuilder builder) { + builder.append(getClass().getName()) + .append(", id=").append(getProcId()) + .append(", table=").append(tableName); + if (columnFamily != null) { + builder.append(", columnFamily=").append(Bytes.toString(columnFamily)); + } + } + + @Override + protected void afterReplay(MasterProcedureEnv env) { + if (!env.getMasterConfiguration().getBoolean( + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)) { + setFailure("master-flush-table", new IOException("FlushProcedure is DISABLED")); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java index d7d8d380b1f0..a73dcf614213 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java @@ -34,7 +34,7 @@ public interface TableProcedureInterface { public static final TableName DUMMY_NAMESPACE_TABLE_NAME = TableName.NAMESPACE_TABLE_NAME; public enum TableOperationType { - CREATE, DELETE, DISABLE, EDIT, ENABLE, READ, SNAPSHOT, REGION_SNAPSHOT, + CREATE, DELETE, DISABLE, EDIT, ENABLE, READ, SNAPSHOT, REGION_SNAPSHOT, FLUSH, REGION_EDIT, REGION_SPLIT, REGION_MERGE, REGION_ASSIGN, REGION_UNASSIGN, REGION_GC, MERGED_REGIONS_GC/* region operations */ } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java index 3a53a1fc5da7..6bd11a68032d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java @@ -56,6 +56,7 @@ private static boolean requireTableExclusiveLock(TableProcedureInterface proc) { // we allow concurrent edit on the ns family in meta table return !proc.getTableName().equals(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME); case READ: + case FLUSH: case SNAPSHOT: return false; // region operations are using the shared-lock on the table diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java index 167fea75ca90..a2b7738bfb7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java @@ -58,6 +58,10 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager { public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; + public static final String FLUSH_PROCEDURE_ENABLED = "hbase.flush.procedure.enabled"; + + public static final boolean FLUSH_PROCEDURE_ENABLED_DEFAULT = true; + private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis"; private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java new file mode 100644 index 000000000000..41616e64c6cc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java @@ -0,0 +1,67 @@ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collections; + +@InterfaceAudience.Private +public class FlushRegionCallable extends BaseRSProcedureCallable { + + private static final Logger LOG = LoggerFactory.getLogger(FlushRegionCallable.class); + + private RegionInfo regionInfo; + + private byte[] columnFamily; + + @Override + protected void doCall() throws Exception { + HRegion region = rs.getRegion(regionInfo.getEncodedName()); + if (region == null) { + throw new NotServingRegionException("region=" + regionInfo.getRegionNameAsString()); + } + LOG.debug("Starting region operation on {}", region); + region.startRegionOperation(); + try { + long readPt = region.getReadPoint(IsolationLevel.READ_COMMITTED); + HRegion.FlushResult res; + if (columnFamily == null) { + res = region.flush(true); + } else { + res = region.flushcache(Collections.singletonList(columnFamily), + false, FlushLifeCycleTracker.DUMMY); + } + if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) { + region.waitForFlushes(); + if (region.getMaxFlushedSeqId() < readPt) { + throw new IOException("Unable to complete flush " + regionInfo); + } + } + } finally { + LOG.debug("Closing region operation on {}", region); + region.closeRegionOperation(); + } + } + + @Override + protected void initParameter(byte[] parameter) throws Exception { + FlushRegionParameter param = FlushRegionParameter.parseFrom(parameter); + this.regionInfo = ProtobufUtil.toRegionInfo(param.getRegion()); + if (param.hasColumnFamily()) { + this.columnFamily = param.getColumnFamily().toByteArray(); + } + } + + @Override + public EventType getEventType() { + return EventType.RS_FLUSH_REGIONS; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index ea9acca7d352..b6fcbf1caeba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1846,6 +1846,10 @@ private void startServices() throws IOException { conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3); executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( ExecutorType.RS_SNAPSHOT_OPERATIONS).setCorePoolSize(rsSnapshotOperationThreads)); + final int rsFlushOperationThreads = + conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3); + executorService.startExecutorService(executorService.new ExecutorConfig().setExecutorType( + ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads)); Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index d53cf81fa835..354feab94c93 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -532,4 +532,10 @@ public long modifyColumnStoreFileTracker(TableName tableName, byte[] family, Str long nonceGroup, long nonce) throws IOException { return -1; } + + @Override + public long flushTable(TableName tableName, byte[] columnFamily, long nonceGroup, long nonce) + throws IOException { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java new file mode 100644 index 000000000000..4200e2d8c0c6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; +import java.util.List; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestFlushTableProcedure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushTableProcedure.class); + + protected static HBaseTestingUtil TEST_UTIL; + protected TableName TABLE_NAME; + protected byte[] FAMILY; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + addConfiguration(TEST_UTIL.getConfiguration()); + TEST_UTIL.startMiniCluster(3); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestFlushTable")); + FAMILY = Bytes.toBytes("cf"); + final byte[][] splitKeys = new RegionSplitter.HexStringSplit().split(10); + Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); + TEST_UTIL.loadTable(table, FAMILY, false); + } + + protected void addConfiguration(Configuration config) { + // delay dispatch so that we can do something, for example kill a target server + config.setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 10000); + config.setInt(RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, 128); + } + + protected void assertTableMemStoreNotEmpty() { + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME) + .stream().mapToLong(HRegion::getMemStoreDataSize).sum(); + Assert.assertTrue(totalSize > 0); + } + + protected void assertTableMemStoreEmpty() { + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME) + .stream().mapToLong(HRegion::getMemStoreDataSize).sum(); + Assert.assertEquals(0, totalSize); + } + + @After + public void teardown() throws Exception { + if (TEST_UTIL.getHBaseCluster().getMaster() != null) { + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( + TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false); + } + TEST_UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBasicFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBasicFlush.java new file mode 100644 index 000000000000..69e4529ef4df --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBasicFlush.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestFlushTableProcedureBasicFlush extends TestFlushTableProcedure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushTableProcedureBasicFlush.class); + + @Test + public void testSimpleFlush() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME); + assertTableMemStoreEmpty(); + } + + @Test + public void testMasterRestarts() throws IOException { + assertTableMemStoreNotEmpty(); + + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = procExec.getEnvironment(); + FlushTableProcedure proc = new FlushTableProcedure(env, TABLE_NAME); + long procId = procExec.submitProcedure(proc); + TEST_UTIL.waitFor(1000, () -> proc.getState().getNumber() > 1); + + TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); + TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 30000); + TEST_UTIL.getHBaseCluster().startMaster(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + + master = TEST_UTIL.getHBaseCluster().getMaster(); + procExec = master.getMasterProcedureExecutor(); + ProcedureTestingUtility.waitProcedure(procExec, procId); + assertTableMemStoreEmpty(); + } + + @Test + public void testSkipRIT() throws IOException { + HRegion region = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).get(0); + + TEST_UTIL.getHBaseCluster().getMaster() + .getAssignmentManager().getRegionStates() + .getRegionStateNode(region.getRegionInfo()) + .setState(RegionState.State.CLOSING, RegionState.State.OPEN); + + FlushRegionProcedure proc = new FlushRegionProcedure(region.getRegionInfo()); + TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().submitProcedure(proc); + + // wait for a time which is shorter than RSProcedureDispatcher delays + TEST_UTIL.waitFor(5000, () -> proc.isFinished()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java new file mode 100644 index 000000000000..325217e015cf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java @@ -0,0 +1,49 @@ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestFlushTableProcedureWithDoNotSupportFlushTableMaster + extends TestFlushTableProcedure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushTableProcedureWithDoNotSupportFlushTableMaster.class); + + @Override + protected void addConfiguration(Configuration config) { + super.addConfiguration(config); + config.set(HConstants.MASTER_IMPL, DoNotSupportFlushTableMaster.class.getName()); + } + + @Test + public void testFlushFallback() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME); + assertTableMemStoreEmpty(); + } + + public static final class DoNotSupportFlushTableMaster extends HMaster { + + public DoNotSupportFlushTableMaster(Configuration conf) throws IOException { + super(conf); + } + + @Override + public long flushTable(TableName tableName, byte[] columnFamily, + long nonceGroup, long nonce) throws IOException { + throw new DoNotRetryIOException("UnsupportedOperation: flushTable"); + } + } +} From 02c8bbe3a8c3a3b27e9508594a548b08d2383f6b Mon Sep 17 00:00:00 2001 From: huiruan Date: Sun, 20 Mar 2022 19:47:24 +0800 Subject: [PATCH 2/5] fix checkstyle problem --- .../hbase/client/RawAsyncHBaseAdmin.java | 12 ++--- .../procedure/FlushRegionProcedure.java | 4 +- .../master/procedure/FlushTableProcedure.java | 8 +-- .../regionserver/FlushRegionCallable.java | 51 ++++++++++++------- .../procedure/TestFlushTableProcedure.java | 2 +- ...edureWithDoNotSupportFlushTableMaster.java | 17 +++++++ 6 files changed, 64 insertions(+), 30 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 07ee1f4b660d..4bb95de9cbc4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -968,12 +968,12 @@ public CompletableFuture flush(TableName tableName, byte[] columnFamily) { } addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props), (ret2, err3) -> { - if (err3 != null) { - future.completeExceptionally(err3); - } else { - future.complete(ret2); - } - }); + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(ret2); + } + }); } }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java index 6556095bf6fb..b38ec65a4717 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java @@ -30,10 +30,10 @@ import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.regionserver.FlushRegionCallable; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionProcedureStateData; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java index 6d5cb3e3f1c6..ebbdb984c8b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java @@ -17,21 +17,21 @@ */ package org.apache.hadoop.hbase.master.procedure; +import java.io.IOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableState; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableState; @InterfaceAudience.Private public class FlushTableProcedure diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java index 41616e64c6cc..e0b46666a5d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java @@ -1,17 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; +import java.util.Collections; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Collections; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; @InterfaceAudience.Private public class FlushRegionCallable extends BaseRSProcedureCallable { @@ -31,20 +48,20 @@ protected void doCall() throws Exception { LOG.debug("Starting region operation on {}", region); region.startRegionOperation(); try { - long readPt = region.getReadPoint(IsolationLevel.READ_COMMITTED); - HRegion.FlushResult res; - if (columnFamily == null) { - res = region.flush(true); - } else { - res = region.flushcache(Collections.singletonList(columnFamily), - false, FlushLifeCycleTracker.DUMMY); - } - if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) { - region.waitForFlushes(); - if (region.getMaxFlushedSeqId() < readPt) { - throw new IOException("Unable to complete flush " + regionInfo); - } + long readPt = region.getReadPoint(IsolationLevel.READ_COMMITTED); + HRegion.FlushResult res; + if (columnFamily == null) { + res = region.flush(true); + } else { + res = region.flushcache(Collections.singletonList(columnFamily), + false, FlushLifeCycleTracker.DUMMY); + } + if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) { + region.waitForFlushes(); + if (region.getMaxFlushedSeqId() < readPt) { + throw new IOException("Unable to complete flush " + regionInfo); } + } } finally { LOG.debug("Closing region operation on {}", region); region.closeRegionOperation(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java index 4200e2d8c0c6..bb8f1291a51e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java @@ -34,7 +34,7 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.experimental.categories.Category; -import java.util.List; + @Category({ MasterTests.class, MediumTests.class }) public class TestFlushTableProcedure { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java index 325217e015cf..43685bc5df94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; From 4cff81979f0460d4a3796580549394939cc9be6e Mon Sep 17 00:00:00 2001 From: huiruan Date: Mon, 21 Mar 2022 22:59:03 +0800 Subject: [PATCH 3/5] fix UT --- .../throttle/TestFlushWithThroughputController.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java index b1de5fdfa396..d281da10509b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.List; @@ -123,7 +124,15 @@ private Pair generateAndFlushData(Table table) throws IOException table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } long startTime = System.nanoTime(); - hbtu.getAdmin().flush(tableName); + hbtu.getHBaseCluster().getRegions(tableName).stream() + .findFirst().ifPresent(r -> { + try { + r.flush(true); + } catch (IOException e) { + LOG.error("Failed flush region {}", r, e); + fail("Failed flush region " + r.getRegionInfo().getRegionNameAsString()); + } + }); duration += System.nanoTime() - startTime; } HStore store = getStoreWithName(tableName); From 7e94bd19806bed0f47410381b59d4e3cb19190b1 Mon Sep 17 00:00:00 2001 From: huiruan Date: Sun, 10 Apr 2022 21:37:29 +0800 Subject: [PATCH 4/5] rename AbstractRegionRemoteProcedure to IdempotentRegionRemoteProcedureBase --- .../hbase/client/RawAsyncHBaseAdmin.java | 61 ++++++++++--------- .../procedure/FlushRegionProcedure.java | 2 +- ... IdempotentRegionRemoteProcedureBase.java} | 11 ++-- 3 files changed, 40 insertions(+), 34 deletions(-) rename hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/{AbstractRegionRemoteProcedure.java => IdempotentRegionRemoteProcedureBase.java} (94%) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 4bb95de9cbc4..f37547e1922c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -950,34 +950,7 @@ public CompletableFuture flush(TableName tableName, byte[] columnFamily) { // the hbase hadoop version does not match the running hadoop version. // if that happens, we need fall back to the old flush implementation. LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error); - addListener(tableExists(tableName), (exists, err) -> { - if (err != null) { - future.completeExceptionally(err); - } else if (!exists) { - future.completeExceptionally(new TableNotFoundException(tableName)); - } else { - addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else if (!tableEnabled) { - future.completeExceptionally(new TableNotEnabledException(tableName)); - } else { - Map props = new HashMap<>(); - if (columnFamily != null) { - props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); - } - addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, - tableName.getNameAsString(), props), (ret2, err3) -> { - if (err3 != null) { - future.completeExceptionally(err3); - } else { - future.complete(ret2); - } - }); - } - }); - } - }); + legacyFlush(future, tableName, columnFamily); } else { future.completeExceptionally(error); } @@ -988,6 +961,38 @@ public CompletableFuture flush(TableName tableName, byte[] columnFamily) { return future; } + private void legacyFlush(CompletableFuture future, + TableName tableName, byte[] columnFamily) { + addListener(tableExists(tableName), (exists, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (!exists) { + future.completeExceptionally(new TableNotFoundException(tableName)); + } else { + addListener(isTableEnabled(tableName), (tableEnabled, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else if (!tableEnabled) { + future.completeExceptionally(new TableNotEnabledException(tableName)); + } else { + Map props = new HashMap<>(); + if (columnFamily != null) { + props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); + } + addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, + tableName.getNameAsString(), props), (ret2, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(ret2); + } + }); + } + }); + } + }); + } + @Override public CompletableFuture flushRegion(byte[] regionName) { return flushRegionInternal(regionName, null, false).thenAccept(r -> { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java index b38ec65a4717..bea60b1e5fe5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionProcedureStateData; @InterfaceAudience.Private -public class FlushRegionProcedure extends AbstractRegionRemoteProcedure { +public class FlushRegionProcedure extends IdempotentRegionRemoteProcedureBase { private static final Logger LOG = LoggerFactory.getLogger(FlushRegionProcedure.class); private byte[] columnFamily; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractRegionRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/IdempotentRegionRemoteProcedureBase.java similarity index 94% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractRegionRemoteProcedure.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/IdempotentRegionRemoteProcedureBase.java index 770475ed7266..3a98fbf8136f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractRegionRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/IdempotentRegionRemoteProcedureBase.java @@ -43,14 +43,15 @@ * The base class for the remote procedures for normal operations, like flush or snapshot. * Normal operations do not change the region state. This is the difference between * {@link org.apache.hadoop.hbase.master.assignment.RegionRemoteProcedureBase} and - * {@link org.apache.hadoop.hbase.master.procedure.AbstractRegionRemoteProcedure}. + * {@link IdempotentRegionRemoteProcedureBase}. * It requires that the state of the region must be OPEN. If region is in transition state, * the procedure will suspend and retry later. */ @InterfaceAudience.Private -public abstract class AbstractRegionRemoteProcedure extends Procedure +public abstract class IdempotentRegionRemoteProcedureBase extends Procedure implements TableProcedureInterface, RemoteProcedure { - private static final Logger LOG = LoggerFactory.getLogger(AbstractRegionRemoteProcedure.class); + private static final Logger LOG = LoggerFactory + .getLogger(IdempotentRegionRemoteProcedureBase.class); protected RegionInfo region; @@ -59,10 +60,10 @@ public abstract class AbstractRegionRemoteProcedure extends Procedure Date: Sat, 2 Jul 2022 22:54:01 +0800 Subject: [PATCH 5/5] fix spotless problems --- .../hbase/client/RawAsyncHBaseAdmin.java | 15 ++++----- .../shaded/protobuf/RequestConverter.java | 2 +- .../hadoop/hbase/executor/EventType.java | 1 - .../apache/hadoop/hbase/master/HMaster.java | 19 ++++++----- .../hbase/master/MasterRpcServices.java | 4 +-- .../hadoop/hbase/master/MasterServices.java | 11 +++---- .../procedure/FlushRegionProcedure.java | 6 ++-- .../master/procedure/FlushTableProcedure.java | 33 ++++++++++--------- .../IdempotentRegionRemoteProcedureBase.java | 26 +++++++-------- .../regionserver/FlushRegionCallable.java | 7 ++-- .../hbase/regionserver/HRegionServer.java | 5 ++- .../hbase/master/MockNoopMasterServices.java | 2 +- .../procedure/TestFlushTableProcedure.java | 11 +++---- .../TestFlushTableProcedureBasicFlush.java | 7 ++-- ...edureWithDoNotSupportFlushTableMaster.java | 8 ++--- .../TestFlushWithThroughputController.java | 17 +++++----- 16 files changed, 85 insertions(+), 89 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 8b2c778f4f6d..a1588e0b24b8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -932,12 +932,11 @@ public CompletableFuture flush(TableName tableName, byte[] columnFamily) { // If the server version is lower than the client version, it's possible that the // flushTable method is not present in the server side, if so, we need to fall back // to the old implementation. - FlushTableRequest request = RequestConverter - .buildFlushTableRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - CompletableFuture procFuture = - this.procedureCall(tableName, request, - (s, c, req, done) -> s.flushTable(c, req, done), (resp) -> resp.getProcId(), - new FlushTableProcedureBiConsumer(tableName)); + FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamily, + ng.getNonceGroup(), ng.newNonce()); + CompletableFuture procFuture = this. procedureCall( + tableName, request, (s, c, req, done) -> s.flushTable(c, req, done), + (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName)); // here we use another new CompletableFuture because the // procFuture is not fully controlled by ourselves. CompletableFuture future = new CompletableFuture<>(); @@ -959,8 +958,8 @@ public CompletableFuture flush(TableName tableName, byte[] columnFamily) { return future; } - private void legacyFlush(CompletableFuture future, - TableName tableName, byte[] columnFamily) { + private void legacyFlush(CompletableFuture future, TableName tableName, + byte[] columnFamily) { addListener(tableExists(tableName), (exists, err) -> { if (err != null) { future.completeExceptionally(err); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index fe8e0aaedb6b..b0a0e80dbfee 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -748,7 +748,7 @@ public static GetOnlineRegionRequest buildGetOnlineRegionRequest() { } public static FlushTableRequest buildFlushTableRequest(final TableName tableName, - final byte[] columnFamily, final long nonceGroup, final long nonce) { + final byte[] columnFamily, final long nonceGroup, final long nonce) { FlushTableRequest.Builder builder = FlushTableRequest.newBuilder(); builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); if (columnFamily != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index 20d02a490954..07f8339a20db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -295,7 +295,6 @@ public enum EventType { /** * RS flush regions.
- * * RS_FLUSH_OPERATIONS */ RS_FLUSH_REGIONS(89, ExecutorType.RS_FLUSH_OPERATIONS); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index ebb8084b0268..75290ae0f110 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -2693,24 +2693,25 @@ protected String getDescription() { } @Override - public long flushTable(TableName tableName, - byte[] columnFamily, long nonceGroup, long nonce) throws IOException { + public long flushTable(TableName tableName, byte[] columnFamily, long nonceGroup, long nonce) + throws IOException { checkInitialized(); - if (!getConfiguration().getBoolean( - MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, - MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)) { + if ( + !getConfiguration().getBoolean(MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT) + ) { throw new DoNotRetryIOException("FlushProcedure is DISABLED"); } - return MasterProcedureUtil.submitProcedure( - new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + return MasterProcedureUtil + .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { @Override protected void run() throws IOException { getMaster().getMasterCoprocessorHost().preTableFlush(tableName); LOG.info(getClientIdAuditPrefix() + " flush " + tableName); - submitProcedure(new FlushTableProcedure(procedureExecutor.getEnvironment(), - tableName, columnFamily)); + submitProcedure( + new FlushTableProcedure(procedureExecutor.getEnvironment(), tableName, columnFamily)); getMaster().getMasterCoprocessorHost().postTableFlush(tableName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index fef6cd01aba2..95c116e07512 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -3487,8 +3487,8 @@ public FlushMasterStoreResponse flushMasterStore(RpcController controller, } @Override - public FlushTableResponse flushTable(RpcController controller, - FlushTableRequest req) throws ServiceException { + public FlushTableResponse flushTable(RpcController controller, FlushTableRequest req) + throws ServiceException { TableName tableName = ProtobufUtil.toTableName(req.getTableName()); byte[] columnFamily = req.hasColumnFamily() ? req.getColumnFamily().toByteArray() : null; try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 48e3864e9565..0a430b8ff11a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -214,16 +214,13 @@ long disableTable(final TableName tableName, final long nonceGroup, final long n /** * Flush an existing table - * @param tableName The table name + * @param tableName The table name * @param columnFamily The column family - * @param nonceGroup the nonce group - * @param nonce the nonce + * @param nonceGroup the nonce group + * @param nonce the nonce * @return the flush procedure id */ - long flushTable( - final TableName tableName, - final byte[] columnFamily, - final long nonceGroup, + long flushTable(final TableName tableName, final byte[] columnFamily, final long nonceGroup, final long nonce) throws IOException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java index bea60b1e5fe5..b388b6a1173b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,7 +32,9 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionProcedureStateData; @@ -57,7 +59,7 @@ public FlushRegionProcedure(RegionInfo region, byte[] columnFamily) { @Override protected Procedure[] execute(MasterProcedureEnv env) - throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { RegionStateNode regionNode = env.getAssignmentManager().getRegionStates().getRegionStateNode(region); if (!regionNode.isInState(State.OPEN) || regionNode.isInTransition()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java index ebbdb984c8b8..3cf537ff4fc8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,14 +28,15 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableProcedureStateData; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableState; @InterfaceAudience.Private -public class FlushTableProcedure - extends AbstractStateMachineTableProcedure { +public class FlushTableProcedure extends AbstractStateMachineTableProcedure { private static final Logger LOG = LoggerFactory.getLogger(FlushTableProcedure.class); private TableName tableName; @@ -71,7 +72,7 @@ protected void releaseLock(MasterProcedureEnv env) { @Override protected Flow executeFromState(MasterProcedureEnv env, FlushTableState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { LOG.info("{} execute state={}", this, state); try { @@ -94,7 +95,7 @@ protected Flow executeFromState(MasterProcedureEnv env, FlushTableState state) @Override protected void rollbackState(MasterProcedureEnv env, FlushTableState flushTableState) - throws IOException, InterruptedException { + throws IOException, InterruptedException { } @Override @@ -144,17 +145,15 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws } private FlushRegionProcedure[] createFlushRegionProcedures(MasterProcedureEnv env) { - return env.getAssignmentManager().getTableRegions(getTableName(), true) - .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)) - .map(r -> new FlushRegionProcedure(r, columnFamily)) - .toArray(FlushRegionProcedure[]::new); + return env.getAssignmentManager().getTableRegions(getTableName(), true).stream() + .filter(r -> RegionReplicaUtil.isDefaultReplica(r)) + .map(r -> new FlushRegionProcedure(r, columnFamily)).toArray(FlushRegionProcedure[]::new); } @Override public void toStringClassDetails(StringBuilder builder) { - builder.append(getClass().getName()) - .append(", id=").append(getProcId()) - .append(", table=").append(tableName); + builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", table=") + .append(tableName); if (columnFamily != null) { builder.append(", columnFamily=").append(Bytes.toString(columnFamily)); } @@ -162,9 +161,11 @@ public void toStringClassDetails(StringBuilder builder) { @Override protected void afterReplay(MasterProcedureEnv env) { - if (!env.getMasterConfiguration().getBoolean( - MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, - MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)) { + if ( + !env.getMasterConfiguration().getBoolean( + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT) + ) { setFailure("master-flush-table", new IOException("FlushProcedure is DISABLED")); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/IdempotentRegionRemoteProcedureBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/IdempotentRegionRemoteProcedureBase.java index 3a98fbf8136f..86e3d4e545eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/IdempotentRegionRemoteProcedureBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/IdempotentRegionRemoteProcedureBase.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -37,21 +37,21 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** - * The base class for the remote procedures for normal operations, like flush or snapshot. - * Normal operations do not change the region state. This is the difference between - * {@link org.apache.hadoop.hbase.master.assignment.RegionRemoteProcedureBase} and - * {@link IdempotentRegionRemoteProcedureBase}. - * It requires that the state of the region must be OPEN. If region is in transition state, - * the procedure will suspend and retry later. + * The base class for the remote procedures for normal operations, like flush or snapshot. Normal + * operations do not change the region state. This is the difference between + * {@link org.apache.hadoop.hbase.master.assignment.RegionRemoteProcedureBase} and + * {@link IdempotentRegionRemoteProcedureBase}. It requires that the state of the region must be + * OPEN. If region is in transition state, the procedure will suspend and retry later. */ @InterfaceAudience.Private public abstract class IdempotentRegionRemoteProcedureBase extends Procedure - implements TableProcedureInterface, RemoteProcedure { - private static final Logger LOG = LoggerFactory - .getLogger(IdempotentRegionRemoteProcedureBase.class); + implements TableProcedureInterface, RemoteProcedure { + private static final Logger LOG = + LoggerFactory.getLogger(IdempotentRegionRemoteProcedureBase.class); protected RegionInfo region; @@ -69,7 +69,7 @@ public IdempotentRegionRemoteProcedureBase(RegionInfo region) { @Override protected Procedure[] execute(MasterProcedureEnv env) - throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { if (dispatched) { if (succ) { return null; @@ -93,8 +93,8 @@ protected Procedure[] execute(MasterProcedureEnv env) } ServerName targetServer = regionNode.getRegionLocation(); if (targetServer == null) { - setTimeoutForSuspend(env, String.format("target server of region %s is null", - region.getRegionNameAsString())); + setTimeoutForSuspend(env, + String.format("target server of region %s is null", region.getRegionNameAsString())); throw new ProcedureSuspendedException(); } ServerState serverState = regionStates.getServerNode(targetServer).getState(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java index e0b46666a5d2..02ad0d649b0c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,6 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; @@ -53,8 +54,8 @@ protected void doCall() throws Exception { if (columnFamily == null) { res = region.flush(true); } else { - res = region.flushcache(Collections.singletonList(columnFamily), - false, FlushLifeCycleTracker.DUMMY); + res = region.flushcache(Collections.singletonList(columnFamily), false, + FlushLifeCycleTracker.DUMMY); } if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) { region.waitForFlushes(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index c3e579652045..b2b95983be86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1866,9 +1866,8 @@ executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OP .setCorePoolSize(rsSnapshotOperationThreads)); final int rsFlushOperationThreads = conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3); - executorService.startExecutorService( - executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS) - .setCorePoolSize(rsFlushOperationThreads)); + executorService.startExecutorService(executorService.new ExecutorConfig() + .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads)); Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index bd08da7b33be..5fef27c729d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -521,7 +521,7 @@ public long modifyColumnStoreFileTracker(TableName tableName, byte[] family, Str @Override public long flushTable(TableName tableName, byte[] columnFamily, long nonceGroup, long nonce) - throws IOException { + throws IOException { return 0; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java index bb8f1291a51e..f07426fdffae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -35,7 +35,6 @@ import org.junit.ClassRule; import org.junit.experimental.categories.Category; - @Category({ MasterTests.class, MediumTests.class }) public class TestFlushTableProcedure { @@ -66,14 +65,14 @@ protected void addConfiguration(Configuration config) { } protected void assertTableMemStoreNotEmpty() { - long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME) - .stream().mapToLong(HRegion::getMemStoreDataSize).sum(); + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .mapToLong(HRegion::getMemStoreDataSize).sum(); Assert.assertTrue(totalSize > 0); } protected void assertTableMemStoreEmpty() { - long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME) - .stream().mapToLong(HRegion::getMemStoreDataSize).sum(); + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .mapToLong(HRegion::getMemStoreDataSize).sum(); Assert.assertEquals(0, totalSize); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBasicFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBasicFlush.java index 69e4529ef4df..1537a2bb05c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBasicFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBasicFlush.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -70,12 +70,11 @@ public void testMasterRestarts() throws IOException { public void testSkipRIT() throws IOException { HRegion region = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).get(0); - TEST_UTIL.getHBaseCluster().getMaster() - .getAssignmentManager().getRegionStates() + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() .getRegionStateNode(region.getRegionInfo()) .setState(RegionState.State.CLOSING, RegionState.State.OPEN); - FlushRegionProcedure proc = new FlushRegionProcedure(region.getRegionInfo()); + FlushRegionProcedure proc = new FlushRegionProcedure(region.getRegionInfo()); TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().submitProcedure(proc); // wait for a time which is shorter than RSProcedureDispatcher delays diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java index 43685bc5df94..70f4fbec51ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,7 +32,7 @@ @Category({ MasterTests.class, MediumTests.class }) public class TestFlushTableProcedureWithDoNotSupportFlushTableMaster - extends TestFlushTableProcedure { + extends TestFlushTableProcedure { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -58,8 +58,8 @@ public DoNotSupportFlushTableMaster(Configuration conf) throws IOException { } @Override - public long flushTable(TableName tableName, byte[] columnFamily, - long nonceGroup, long nonce) throws IOException { + public long flushTable(TableName tableName, byte[] columnFamily, long nonceGroup, long nonce) + throws IOException { throw new DoNotRetryIOException("UnsupportedOperation: flushTable"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java index 24eb7438c199..c1d9e2788d47 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java @@ -125,15 +125,14 @@ private Pair generateAndFlushData(Table table) throws IOException table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } long startTime = System.nanoTime(); - hbtu.getHBaseCluster().getRegions(tableName).stream() - .findFirst().ifPresent(r -> { - try { - r.flush(true); - } catch (IOException e) { - LOG.error("Failed flush region {}", r, e); - fail("Failed flush region " + r.getRegionInfo().getRegionNameAsString()); - } - }); + hbtu.getHBaseCluster().getRegions(tableName).stream().findFirst().ifPresent(r -> { + try { + r.flush(true); + } catch (IOException e) { + LOG.error("Failed flush region {}", r, e); + fail("Failed flush region " + r.getRegionInfo().getRegionNameAsString()); + } + }); duration += System.nanoTime() - startTime; } HStore store = getStoreWithName(tableName);