diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 3ec2c741293e..a1588e0b24b8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.ClusterMetricsBuilder; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NamespaceDescriptor; @@ -180,6 +181,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; @@ -925,7 +928,38 @@ public CompletableFuture flush(TableName tableName) { @Override public CompletableFuture flush(TableName tableName, byte[] columnFamily) { + // This is for keeping compatibility with old implementation. + // If the server version is lower than the client version, it's possible that the + // flushTable method is not present in the server side, if so, we need to fall back + // to the old implementation. + FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamily, + ng.getNonceGroup(), ng.newNonce()); + CompletableFuture procFuture = this. procedureCall( + tableName, request, (s, c, req, done) -> s.flushTable(c, req, done), + (resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName)); + // here we use another new CompletableFuture because the + // procFuture is not fully controlled by ourselves. CompletableFuture future = new CompletableFuture<>(); + addListener(procFuture, (ret, error) -> { + if (error != null) { + if (error instanceof DoNotRetryIOException) { + // usually this is caused by the method is not present on the server or + // the hbase hadoop version does not match the running hadoop version. + // if that happens, we need fall back to the old flush implementation. + LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error); + legacyFlush(future, tableName, columnFamily); + } else { + future.completeExceptionally(error); + } + } else { + future.complete(ret); + } + }); + return future; + } + + private void legacyFlush(CompletableFuture future, TableName tableName, + byte[] columnFamily) { addListener(tableExists(tableName), (exists, err) -> { if (err != null) { future.completeExceptionally(err); @@ -955,7 +989,6 @@ public CompletableFuture flush(TableName tableName, byte[] columnFamily) { }); } }); - return future; } @Override @@ -2611,6 +2644,18 @@ String getOperationType() { } } + private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer { + + FlushTableProcedureBiConsumer(TableName tableName) { + super(tableName); + } + + @Override + String getOperationType() { + return "FLUSH"; + } + } + private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index addda9c59860..b0a0e80dbfee 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; @@ -746,6 +747,16 @@ public static GetOnlineRegionRequest buildGetOnlineRegionRequest() { return GetOnlineRegionRequest.newBuilder().build(); } + public static FlushTableRequest buildFlushTableRequest(final TableName tableName, + final byte[] columnFamily, final long nonceGroup, final long nonce) { + FlushTableRequest.Builder builder = FlushTableRequest.newBuilder(); + builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); + if (columnFamily != null) { + builder.setColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + return builder.setNonceGroup(nonceGroup).setNonce(nonce).build(); + } + /** * Create a protocol buffer FlushRegionRequest for a given region name * @param regionName the name of the region to get info diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index 257abe8f11ca..fb945834e11d 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -200,6 +200,17 @@ message ModifyTableResponse { optional uint64 proc_id = 1; } +message FlushTableRequest { + required TableName table_name = 1; + optional bytes column_family = 2; + optional uint64 nonce_group = 3 [default = 0]; + optional uint64 nonce = 4 [default = 0]; +} + +message FlushTableResponse { + optional uint64 proc_id = 1; +} + /* Namespace-level protobufs */ message CreateNamespaceRequest { @@ -1203,6 +1214,9 @@ service MasterService { rpc FlushMasterStore(FlushMasterStoreRequest) returns(FlushMasterStoreResponse); + + rpc FlushTable(FlushTableRequest) + returns(FlushTableResponse); } // HBCK Service definitions. diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 35125a5a94e6..3ecd9f6cda70 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -191,6 +191,26 @@ message RestoreParentToChildRegionsPair { required string child2_region_name = 3; } +enum FlushTableState { + FLUSH_TABLE_PREPARE = 1; + FLUSH_TABLE_FLUSH_REGIONS = 2; +} + +message FlushTableProcedureStateData { + required TableName table_name = 1; + optional bytes column_family = 2; +} + +message FlushRegionProcedureStateData { + required RegionInfo region = 1; + optional bytes column_family = 2; +} + +message FlushRegionParameter { + required RegionInfo region = 1; + optional bytes column_family = 2; +} + enum SnapshotState { SNAPSHOT_PREPARE = 1; SNAPSHOT_PRE_OPERATION = 2; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index e79c9c2bc415..07f8339a20db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -291,7 +291,13 @@ public enum EventType { * RS verify snapshot.
* RS_VERIFY_SNAPSHOT */ - RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS); + RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS), + + /** + * RS flush regions.
+ * RS_FLUSH_OPERATIONS + */ + RS_FLUSH_REGIONS(89, ExecutorType.RS_FLUSH_OPERATIONS); private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 0bcd3ee05190..b034409af6da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -54,7 +54,8 @@ public enum ExecutorType { RS_SWITCH_RPC_THROTTLE(33), RS_IN_MEMORY_COMPACTION(34), RS_CLAIM_REPLICATION_QUEUE(35), - RS_SNAPSHOT_OPERATIONS(36); + RS_SNAPSHOT_OPERATIONS(36), + RS_FLUSH_OPERATIONS(37); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2b818d9cc238..75290ae0f110 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -150,6 +150,7 @@ import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure; import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -2691,6 +2692,36 @@ protected String getDescription() { }); } + @Override + public long flushTable(TableName tableName, byte[] columnFamily, long nonceGroup, long nonce) + throws IOException { + checkInitialized(); + + if ( + !getConfiguration().getBoolean(MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT) + ) { + throw new DoNotRetryIOException("FlushProcedure is DISABLED"); + } + + return MasterProcedureUtil + .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + getMaster().getMasterCoprocessorHost().preTableFlush(tableName); + LOG.info(getClientIdAuditPrefix() + " flush " + tableName); + submitProcedure( + new FlushTableProcedure(procedureExecutor.getEnvironment(), tableName, columnFamily)); + getMaster().getMasterCoprocessorHost().postTableFlush(tableName); + } + + @Override + protected String getDescription() { + return "FlushTableProcedure"; + } + }); + } + private long modifyTable(final TableName tableName, final TableDescriptorGetter newDescriptorGetter, final long nonceGroup, final long nonce, final boolean shouldCheckDescriptor) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index e209e561f37a..95c116e07512 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -232,6 +232,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; @@ -3483,4 +3485,17 @@ public FlushMasterStoreResponse flushMasterStore(RpcController controller, } return FlushMasterStoreResponse.newBuilder().build(); } + + @Override + public FlushTableResponse flushTable(RpcController controller, FlushTableRequest req) + throws ServiceException { + TableName tableName = ProtobufUtil.toTableName(req.getTableName()); + byte[] columnFamily = req.hasColumnFamily() ? req.getColumnFamily().toByteArray() : null; + try { + long procId = server.flushTable(tableName, columnFamily, req.getNonceGroup(), req.getNonce()); + return FlushTableResponse.newBuilder().setProcId(procId).build(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index d813b39863ef..0a430b8ff11a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -212,6 +212,17 @@ long enableTable(final TableName tableName, final long nonceGroup, final long no long disableTable(final TableName tableName, final long nonceGroup, final long nonce) throws IOException; + /** + * Flush an existing table + * @param tableName The table name + * @param columnFamily The column family + * @param nonceGroup the nonce group + * @param nonce the nonce + * @return the flush procedure id + */ + long flushTable(final TableName tableName, final byte[] columnFamily, final long nonceGroup, + final long nonce) throws IOException; + /** * Add a new column to an existing table * @param tableName The table name diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java new file mode 100644 index 000000000000..b388b6a1173b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.assignment.RegionStateNode; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.regionserver.FlushRegionCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionProcedureStateData; + +@InterfaceAudience.Private +public class FlushRegionProcedure extends IdempotentRegionRemoteProcedureBase { + private static final Logger LOG = LoggerFactory.getLogger(FlushRegionProcedure.class); + + private byte[] columnFamily; + + public FlushRegionProcedure() { + } + + public FlushRegionProcedure(RegionInfo region) { + this(region, null); + } + + public FlushRegionProcedure(RegionInfo region, byte[] columnFamily) { + super(region); + this.columnFamily = columnFamily; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + RegionStateNode regionNode = + env.getAssignmentManager().getRegionStates().getRegionStateNode(region); + if (!regionNode.isInState(State.OPEN) || regionNode.isInTransition()) { + LOG.info("State of region {} is not OPEN or in transition. Skip {} ...", region, this); + return null; + } + return super.execute(env); + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + FlushRegionProcedureStateData.Builder builder = FlushRegionProcedureStateData.newBuilder(); + builder.setRegion(ProtobufUtil.toRegionInfo(region)); + if (columnFamily != null) { + builder.setColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + FlushRegionProcedureStateData data = + serializer.deserialize(FlushRegionProcedureStateData.class); + this.region = ProtobufUtil.toRegionInfo(data.getRegion()); + if (data.hasColumnFamily()) { + this.columnFamily = data.getColumnFamily().toByteArray(); + } + } + + @Override + public Optional remoteCallBuild(MasterProcedureEnv env, ServerName serverName) { + FlushRegionParameter.Builder builder = FlushRegionParameter.newBuilder(); + builder.setRegion(ProtobufUtil.toRegionInfo(region)); + if (columnFamily != null) { + builder.setColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(), + FlushRegionCallable.class, builder.build().toByteArray())); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.FLUSH; + } + + @Override + protected boolean waitInitialized(MasterProcedureEnv env) { + return env.waitInitialized(this); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java new file mode 100644 index 000000000000..3cf537ff4fc8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushTableProcedure.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableProcedureStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushTableState; + +@InterfaceAudience.Private +public class FlushTableProcedure extends AbstractStateMachineTableProcedure { + private static final Logger LOG = LoggerFactory.getLogger(FlushTableProcedure.class); + + private TableName tableName; + + private byte[] columnFamily; + + public FlushTableProcedure() { + super(); + } + + public FlushTableProcedure(MasterProcedureEnv env, TableName tableName) { + this(env, tableName, null); + } + + public FlushTableProcedure(MasterProcedureEnv env, TableName tableName, byte[] columnFamily) { + super(env); + this.tableName = tableName; + this.columnFamily = columnFamily; + } + + @Override + protected LockState acquireLock(MasterProcedureEnv env) { + // Here we don't acquire table lock because the flush operation and other operations (like + // split or merge) are not mutually exclusive. Region will flush memstore when being closed. + // It's safe even if we don't have lock. + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(MasterProcedureEnv env) { + // nothing to do since we don't acquire lock + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, FlushTableState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + LOG.info("{} execute state={}", this, state); + + try { + switch (state) { + case FLUSH_TABLE_PREPARE: + preflightChecks(env, true); + setNextState(FlushTableState.FLUSH_TABLE_FLUSH_REGIONS); + return Flow.HAS_MORE_STATE; + case FLUSH_TABLE_FLUSH_REGIONS: + addChildProcedure(createFlushRegionProcedures(env)); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (IOException e) { + setFailure("master-flush-table", e); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(MasterProcedureEnv env, FlushTableState flushTableState) + throws IOException, InterruptedException { + } + + @Override + protected FlushTableState getState(int stateId) { + return FlushTableState.forNumber(stateId); + } + + @Override + protected int getStateId(FlushTableState state) { + return state.getNumber(); + } + + @Override + protected FlushTableState getInitialState() { + return FlushTableState.FLUSH_TABLE_PREPARE; + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.FLUSH; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + FlushTableProcedureStateData.Builder builder = FlushTableProcedureStateData.newBuilder(); + builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); + if (columnFamily != null) { + builder.setColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + FlushTableProcedureStateData data = serializer.deserialize(FlushTableProcedureStateData.class); + this.tableName = ProtobufUtil.toTableName(data.getTableName()); + if (data.hasColumnFamily()) { + this.columnFamily = data.getColumnFamily().toByteArray(); + } + } + + private FlushRegionProcedure[] createFlushRegionProcedures(MasterProcedureEnv env) { + return env.getAssignmentManager().getTableRegions(getTableName(), true).stream() + .filter(r -> RegionReplicaUtil.isDefaultReplica(r)) + .map(r -> new FlushRegionProcedure(r, columnFamily)).toArray(FlushRegionProcedure[]::new); + } + + @Override + public void toStringClassDetails(StringBuilder builder) { + builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", table=") + .append(tableName); + if (columnFamily != null) { + builder.append(", columnFamily=").append(Bytes.toString(columnFamily)); + } + } + + @Override + protected void afterReplay(MasterProcedureEnv env) { + if ( + !env.getMasterConfiguration().getBoolean( + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED, + MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT) + ) { + setFailure("master-flush-table", new IOException("FlushProcedure is DISABLED")); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/IdempotentRegionRemoteProcedureBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/IdempotentRegionRemoteProcedureBase.java new file mode 100644 index 000000000000..86e3d4e545eb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/IdempotentRegionRemoteProcedureBase.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.assignment.RegionStateNode; +import org.apache.hadoop.hbase.master.assignment.RegionStates; +import org.apache.hadoop.hbase.master.assignment.ServerState; +import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * The base class for the remote procedures for normal operations, like flush or snapshot. Normal + * operations do not change the region state. This is the difference between + * {@link org.apache.hadoop.hbase.master.assignment.RegionRemoteProcedureBase} and + * {@link IdempotentRegionRemoteProcedureBase}. It requires that the state of the region must be + * OPEN. If region is in transition state, the procedure will suspend and retry later. + */ +@InterfaceAudience.Private +public abstract class IdempotentRegionRemoteProcedureBase extends Procedure + implements TableProcedureInterface, RemoteProcedure { + private static final Logger LOG = + LoggerFactory.getLogger(IdempotentRegionRemoteProcedureBase.class); + + protected RegionInfo region; + + private ProcedureEvent event; + private boolean dispatched; + private boolean succ; + private RetryCounter retryCounter; + + public IdempotentRegionRemoteProcedureBase() { + } + + public IdempotentRegionRemoteProcedureBase(RegionInfo region) { + this.region = region; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + dispatched = false; + } + + RegionStates regionStates = env.getAssignmentManager().getRegionStates(); + RegionStateNode regionNode = regionStates.getRegionStateNode(region); + regionNode.lock(); + try { + if (regionNode.isInTransition()) { + setTimeoutForSuspend(env, String.format("region %s has a TRSP attached %s", + region.getRegionNameAsString(), regionNode.getProcedure())); + throw new ProcedureSuspendedException(); + } + if (!regionNode.isInState(RegionState.State.OPEN)) { + setTimeoutForSuspend(env, String.format("region state of %s is %s", + region.getRegionNameAsString(), regionNode.getState())); + throw new ProcedureSuspendedException(); + } + ServerName targetServer = regionNode.getRegionLocation(); + if (targetServer == null) { + setTimeoutForSuspend(env, + String.format("target server of region %s is null", region.getRegionNameAsString())); + throw new ProcedureSuspendedException(); + } + ServerState serverState = regionStates.getServerNode(targetServer).getState(); + if (serverState != ServerState.ONLINE) { + setTimeoutForSuspend(env, String.format("target server of region %s %s is in state %s", + region.getRegionNameAsString(), targetServer, serverState)); + throw new ProcedureSuspendedException(); + } + try { + env.getRemoteDispatcher().addOperationToNode(targetServer, this); + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } catch (FailedRemoteDispatchException e) { + setTimeoutForSuspend(env, "Failed send request to " + targetServer); + throw new ProcedureSuspendedException(); + } + } finally { + regionNode.unlock(); + } + } + + private void setTimeoutForSuspend(MasterProcedureEnv env, String reason) { + if (retryCounter == null) { + retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); + } + long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); + LOG.warn("{} can not run currently because {}, wait {} ms to retry", this, reason, backoff); + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOException e) { + complete(env, e); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + complete(env, error); + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (isFinished()) { + LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId()); + return; + } + if (event == null) { + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); + return; + } + if (error == null) { + succ = true; + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public TableName getTableName() { + return region.getTable(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java index 80e24b684580..10841557851c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java @@ -42,6 +42,7 @@ public enum TableOperationType { READ, SNAPSHOT, REGION_SNAPSHOT, + FLUSH, REGION_EDIT, REGION_SPLIT, REGION_MERGE, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java index 1a9847edcc89..d1acd08ea21c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java @@ -56,6 +56,7 @@ private static boolean requireTableExclusiveLock(TableProcedureInterface proc) { // we allow concurrent edit on the ns family in meta table return !proc.getTableName().equals(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME); case READ: + case FLUSH: case SNAPSHOT: return false; // region operations are using the shared-lock on the table diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java index 15d3d8a73a90..0d5e0088b577 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java @@ -58,6 +58,10 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager { public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; + public static final String FLUSH_PROCEDURE_ENABLED = "hbase.flush.procedure.enabled"; + + public static final boolean FLUSH_PROCEDURE_ENABLED_DEFAULT = true; + private static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.master.timeoutMillis"; private static final int FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; private static final String FLUSH_WAKE_MILLIS_KEY = "hbase.flush.master.wakeMillis"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java new file mode 100644 index 000000000000..02ad0d649b0c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collections; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter; + +@InterfaceAudience.Private +public class FlushRegionCallable extends BaseRSProcedureCallable { + + private static final Logger LOG = LoggerFactory.getLogger(FlushRegionCallable.class); + + private RegionInfo regionInfo; + + private byte[] columnFamily; + + @Override + protected void doCall() throws Exception { + HRegion region = rs.getRegion(regionInfo.getEncodedName()); + if (region == null) { + throw new NotServingRegionException("region=" + regionInfo.getRegionNameAsString()); + } + LOG.debug("Starting region operation on {}", region); + region.startRegionOperation(); + try { + long readPt = region.getReadPoint(IsolationLevel.READ_COMMITTED); + HRegion.FlushResult res; + if (columnFamily == null) { + res = region.flush(true); + } else { + res = region.flushcache(Collections.singletonList(columnFamily), false, + FlushLifeCycleTracker.DUMMY); + } + if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) { + region.waitForFlushes(); + if (region.getMaxFlushedSeqId() < readPt) { + throw new IOException("Unable to complete flush " + regionInfo); + } + } + } finally { + LOG.debug("Closing region operation on {}", region); + region.closeRegionOperation(); + } + } + + @Override + protected void initParameter(byte[] parameter) throws Exception { + FlushRegionParameter param = FlushRegionParameter.parseFrom(parameter); + this.regionInfo = ProtobufUtil.toRegionInfo(param.getRegion()); + if (param.hasColumnFamily()) { + this.columnFamily = param.getColumnFamily().toByteArray(); + } + } + + @Override + public EventType getEventType() { + return EventType.RS_FLUSH_REGIONS; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e79f4bec612a..b2b95983be86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1864,6 +1864,10 @@ executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLI executorService.startExecutorService( executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS) .setCorePoolSize(rsSnapshotOperationThreads)); + final int rsFlushOperationThreads = + conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3); + executorService.startExecutorService(executorService.new ExecutorConfig() + .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads)); Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index bc3969ffd518..5fef27c729d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -518,4 +518,10 @@ public long modifyColumnStoreFileTracker(TableName tableName, byte[] family, Str long nonceGroup, long nonce) throws IOException { return -1; } + + @Override + public long flushTable(TableName tableName, byte[] columnFamily, long nonceGroup, long nonce) + throws IOException { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java new file mode 100644 index 000000000000..f07426fdffae --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedure.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestFlushTableProcedure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushTableProcedure.class); + + protected static HBaseTestingUtil TEST_UTIL; + protected TableName TABLE_NAME; + protected byte[] FAMILY; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + addConfiguration(TEST_UTIL.getConfiguration()); + TEST_UTIL.startMiniCluster(3); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestFlushTable")); + FAMILY = Bytes.toBytes("cf"); + final byte[][] splitKeys = new RegionSplitter.HexStringSplit().split(10); + Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); + TEST_UTIL.loadTable(table, FAMILY, false); + } + + protected void addConfiguration(Configuration config) { + // delay dispatch so that we can do something, for example kill a target server + config.setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 10000); + config.setInt(RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, 128); + } + + protected void assertTableMemStoreNotEmpty() { + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .mapToLong(HRegion::getMemStoreDataSize).sum(); + Assert.assertTrue(totalSize > 0); + } + + protected void assertTableMemStoreEmpty() { + long totalSize = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream() + .mapToLong(HRegion::getMemStoreDataSize).sum(); + Assert.assertEquals(0, totalSize); + } + + @After + public void teardown() throws Exception { + if (TEST_UTIL.getHBaseCluster().getMaster() != null) { + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( + TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false); + } + TEST_UTIL.shutdownMiniCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBasicFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBasicFlush.java new file mode 100644 index 000000000000..1537a2bb05c9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureBasicFlush.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestFlushTableProcedureBasicFlush extends TestFlushTableProcedure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushTableProcedureBasicFlush.class); + + @Test + public void testSimpleFlush() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME); + assertTableMemStoreEmpty(); + } + + @Test + public void testMasterRestarts() throws IOException { + assertTableMemStoreNotEmpty(); + + HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); + ProcedureExecutor procExec = master.getMasterProcedureExecutor(); + MasterProcedureEnv env = procExec.getEnvironment(); + FlushTableProcedure proc = new FlushTableProcedure(env, TABLE_NAME); + long procId = procExec.submitProcedure(proc); + TEST_UTIL.waitFor(1000, () -> proc.getState().getNumber() > 1); + + TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); + TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 30000); + TEST_UTIL.getHBaseCluster().startMaster(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + + master = TEST_UTIL.getHBaseCluster().getMaster(); + procExec = master.getMasterProcedureExecutor(); + ProcedureTestingUtility.waitProcedure(procExec, procId); + assertTableMemStoreEmpty(); + } + + @Test + public void testSkipRIT() throws IOException { + HRegion region = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).get(0); + + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates() + .getRegionStateNode(region.getRegionInfo()) + .setState(RegionState.State.CLOSING, RegionState.State.OPEN); + + FlushRegionProcedure proc = new FlushRegionProcedure(region.getRegionInfo()); + TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor().submitProcedure(proc); + + // wait for a time which is shorter than RSProcedureDispatcher delays + TEST_UTIL.waitFor(5000, () -> proc.isFinished()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java new file mode 100644 index 000000000000..70f4fbec51ae --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestFlushTableProcedureWithDoNotSupportFlushTableMaster.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestFlushTableProcedureWithDoNotSupportFlushTableMaster + extends TestFlushTableProcedure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFlushTableProcedureWithDoNotSupportFlushTableMaster.class); + + @Override + protected void addConfiguration(Configuration config) { + super.addConfiguration(config); + config.set(HConstants.MASTER_IMPL, DoNotSupportFlushTableMaster.class.getName()); + } + + @Test + public void testFlushFallback() throws IOException { + assertTableMemStoreNotEmpty(); + TEST_UTIL.getAdmin().flush(TABLE_NAME); + assertTableMemStoreEmpty(); + } + + public static final class DoNotSupportFlushTableMaster extends HMaster { + + public DoNotSupportFlushTableMaster(Configuration conf) throws IOException { + super(conf); + } + + @Override + public long flushTable(TableName tableName, byte[] columnFamily, long nonceGroup, long nonce) + throws IOException { + throw new DoNotRetryIOException("UnsupportedOperation: flushTable"); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java index dc84642741f9..c1d9e2788d47 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.List; @@ -124,7 +125,14 @@ private Pair generateAndFlushData(Table table) throws IOException table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); } long startTime = System.nanoTime(); - hbtu.getAdmin().flush(tableName); + hbtu.getHBaseCluster().getRegions(tableName).stream().findFirst().ifPresent(r -> { + try { + r.flush(true); + } catch (IOException e) { + LOG.error("Failed flush region {}", r, e); + fail("Failed flush region " + r.getRegionInfo().getRegionNameAsString()); + } + }); duration += System.nanoTime() - startTime; } HStore store = getStoreWithName(tableName);