Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -987,4 +988,9 @@ public ColumnFamilyDescriptor getColumnFamily(byte[] name) {
protected ModifyableTableDescriptor getDelegateeForModification() {
return delegatee;
}

@Override
public Optional<String> getRegionServerGroup() {
return delegatee.getRegionServerGroup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -360,4 +361,11 @@ default boolean matchReplicationScope(boolean enabled) {
}
return !enabled;
}

/**
* Get the region server group this table belongs to. The regions of this table will be placed
* only on the region servers within this group. If not present, will be placed on
* {@link org.apache.hadoop.hbase.rsgroup.RSGroupInfo#DEFAULT_GROUP}.
*/
Optional<String> getRegionServerGroup();
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.HBaseException;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PrettyPrinter;
Expand Down Expand Up @@ -192,6 +193,9 @@ public class TableDescriptorBuilder {
private static final Bytes PRIORITY_KEY
= new Bytes(Bytes.toBytes(PRIORITY));

private static final Bytes RSGROUP_KEY =
new Bytes(Bytes.toBytes(RSGroupInfo.TABLE_DESC_PROP_GROUP));

/**
* Relative priority of the table used for rpc scheduling
*/
Expand Down Expand Up @@ -594,6 +598,17 @@ public TableDescriptorBuilder setReplicationScope(int scope) {
return this;
}

/**
* Set the RSGroup for this table, specified RSGroup must exist before create or modify table.
*
* @param group rsgroup name
* @return a TableDescriptorBuilder
*/
public TableDescriptorBuilder setRegionServerGroup(String group) {
desc.setValue(RSGROUP_KEY, group);
return this;
}

public TableDescriptor build() {
return new ModifyableTableDescriptor(desc);
}
Expand Down Expand Up @@ -1647,6 +1662,16 @@ private static TableDescriptor parseFrom(final byte[] bytes)
public int getColumnFamilyCount() {
return families.size();
}

@Override
public Optional<String> getRegionServerGroup() {
Bytes value = values.get(RSGROUP_KEY);
if (value != null) {
return Optional.of(Bytes.toString(value.get(), value.getOffset(), value.getLength()));
} else {
return Optional.empty();
}
}
}

private static Optional<CoprocessorDescriptor> toCoprocessorDescriptor(String spec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
public class RSGroupInfo {
public static final String DEFAULT_GROUP = "default";
public static final String NAMESPACE_DESC_PROP_GROUP = "hbase.rsgroup.name";
public static final String TABLE_DESC_PROP_GROUP = "hbase.rsgroup.name";

private final String name;
// Keep servers in a sorted set so has an expected ordering when displayed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ boolean rsgroupHasServersOnline(TableDescriptor desc) throws IOException {
}

void assignTableToGroup(TableDescriptor desc) throws IOException {
RSGroupInfo rsGroupInfo = groupInfoManager.determineRSGroupInfoForTable(desc.getTableName());
RSGroupInfo rsGroupInfo = determineRSGroupInfoForTable(desc);
if (rsGroupInfo == null) {
throw new ConstraintException("Default RSGroup for this table " + desc.getTableName()
+ " does not exist.");
Expand All @@ -508,21 +508,75 @@ public void preCreateTableAction(
if (desc.getTableName().isSystemTable()) {
return;
}
RSGroupInfo rsGroupInfo = groupInfoManager.determineRSGroupInfoForTable(desc.getTableName());
moveTableToValidRSGroup(desc);
}

@Override
public void preModifyTableAction(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName, TableDescriptor currentDescriptor, TableDescriptor newDescriptor)
throws IOException {
// If table's rsgroup is changed, it must be valid
if (!currentDescriptor.getRegionServerGroup().equals(newDescriptor.getRegionServerGroup())) {
RSGroupInfo rsGroupInfo = determineRSGroupInfoForTable(newDescriptor);
validateRSGroup(newDescriptor, rsGroupInfo);
}
}

@Override
public void postCompletedModifyTableAction(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName, TableDescriptor oldDescriptor, TableDescriptor currentDescriptor)
throws IOException {
// If table's rsgroup is changed, move table into the rsgroup.
if (!oldDescriptor.getRegionServerGroup().equals(currentDescriptor.getRegionServerGroup())) {
RSGroupInfo rsGroupInfo = determineRSGroupInfoForTable(currentDescriptor);
moveTableToRSGroup(currentDescriptor, rsGroupInfo);
}
}

// Determine and validate rs group then move table to this valid rs group.
private void moveTableToValidRSGroup(TableDescriptor desc) throws IOException {
RSGroupInfo rsGroupInfo = determineRSGroupInfoForTable(desc);
validateRSGroup(desc, rsGroupInfo);
moveTableToRSGroup(desc, rsGroupInfo);
}

private void validateRSGroup(TableDescriptor desc, RSGroupInfo rsGroupInfo) throws IOException {
if (rsGroupInfo == null) {
throw new ConstraintException("Default RSGroup for this table " + desc.getTableName()
+ " does not exist.");
throw new ConstraintException(
"Default RSGroup for this table " + desc.getTableName() + " does not exist.");
}
if (!RSGroupUtil.rsGroupHasOnlineServer(master, rsGroupInfo)) {
throw new HBaseIOException("No online servers in the rsgroup " + rsGroupInfo.getName()
+ " which table " + desc.getTableName().getNameAsString() + " belongs to");
throw new HBaseIOException(
"No online servers in the rsgroup " + rsGroupInfo.getName() + " which table " + desc
.getTableName().getNameAsString() + " belongs to");
}
synchronized (groupInfoManager) {
groupInfoManager.moveTables(
Collections.singleton(desc.getTableName()), rsGroupInfo.getName());
}

private void moveTableToRSGroup(final TableDescriptor desc, RSGroupInfo rsGroupInfo)
throws IOException {
// In case of modify table, when rs group is not changed, move is not required.
if (!rsGroupInfo.containsTable(desc.getTableName())) {
synchronized (groupInfoManager) {
groupInfoManager
.moveTables(Collections.singleton(desc.getTableName()), rsGroupInfo.getName());
}
}
}

private RSGroupInfo determineRSGroupInfoForTable(final TableDescriptor desc) throws IOException {
Optional<String> optGroupNameOfTable = desc.getRegionServerGroup();
if (optGroupNameOfTable.isPresent()) {
final RSGroupInfo rsGroup = groupInfoManager.getRSGroup(optGroupNameOfTable.get());
if (rsGroup == null) {
// When rs group is set in table descriptor then it must exist
throw new ConstraintException(
"Region server group " + optGroupNameOfTable.get() + " does not exist.");
} else {
return rsGroup;
}
}
return groupInfoManager.determineRSGroupInfoForTable(desc.getTableName());
}
// Remove table from its RSGroup.
@Override
public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.master.HMaster;
Expand All @@ -43,7 +47,9 @@
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -459,7 +465,7 @@ public void moveTables(Set<TableName> tables, String targetGroup) throws IOExcep
// targetGroup is null when a table is being deleted. In this case no further
// action is required.
if (targetGroup != null) {
moveTableRegionsToGroup(tables, rsGroupInfoManager.getRSGroup(targetGroup));
modifyOrMoveTables(tables, rsGroupInfoManager.getRSGroup(targetGroup));
}
}
}
Expand Down Expand Up @@ -582,7 +588,7 @@ public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, St
rsGroupInfoManager.getRSGroup(srcGroup).getServers(),
targetGroup, srcGroup);
//move regions of these tables which are not on group servers
moveTableRegionsToGroup(tables, rsGroupInfoManager.getRSGroup(targetGroup));
modifyOrMoveTables(tables, rsGroupInfoManager.getRSGroup(targetGroup));
}
LOG.info("Move servers and tables done. Severs: {}, Tables: {} => {}", servers, tables,
targetGroup);
Expand All @@ -609,6 +615,11 @@ public void removeServers(Set<Address> servers) throws IOException {
public void renameRSGroup(String oldName, String newName) throws IOException {
synchronized (rsGroupInfoManager) {
rsGroupInfoManager.renameRSGroup(oldName, newName);
Set<TableDescriptor> updateTables = master.getTableDescriptors().getAll().values().stream()
.filter(t -> oldName.equals(t.getRegionServerGroup().orElse(null)))
.collect(Collectors.toSet());
// Update rs group info into table descriptors
modifyTablesAndWaitForCompletion(updateTables, newName);
}
}

Expand Down Expand Up @@ -718,4 +729,65 @@ private void checkForDeadOrOnlineServers(Set<Address> servers) throws Constraint
}
}
}

// Modify table or move table's regions
void modifyOrMoveTables(Set<TableName> tables, RSGroupInfo targetGroup) throws IOException {
Set<TableName> tablesToBeMoved = new HashSet<>(tables.size());
Set<TableDescriptor> tablesToBeModified = new HashSet<>(tables.size());
// Segregate tables into to be modified or to be moved category
for (TableName tableName : tables) {
TableDescriptor descriptor = master.getTableDescriptors().get(tableName);
if (descriptor == null) {
LOG.error(
"TableDescriptor of table {} not found. Skipping the region movement of this table.");
continue;
}
if (descriptor.getRegionServerGroup().isPresent()) {
tablesToBeModified.add(descriptor);
} else {
tablesToBeMoved.add(tableName);
}
}
List<Long> procedureIds = null;
if (!tablesToBeModified.isEmpty()) {
procedureIds = modifyTables(tablesToBeModified, targetGroup.getName());
}
if (!tablesToBeMoved.isEmpty()) {
moveTableRegionsToGroup(tablesToBeMoved, targetGroup);
}
// By this time moveTableRegionsToGroup is finished, lets wait for modifyTables completion
if (procedureIds != null) {
waitForProcedureCompletion(procedureIds);
}
}

private void modifyTablesAndWaitForCompletion(Set<TableDescriptor> tableDescriptors,
String targetGroup) throws IOException {
final List<Long> procIds = modifyTables(tableDescriptors, targetGroup);
waitForProcedureCompletion(procIds);
}

// Modify table internally moves the regions as well. So separate region movement is not needed
private List<Long> modifyTables(Set<TableDescriptor> tableDescriptors, String targetGroup)
throws IOException {
List<Long> procIds = new ArrayList<>(tableDescriptors.size());
for (TableDescriptor oldTd : tableDescriptors) {
TableDescriptor newTd =
TableDescriptorBuilder.newBuilder(oldTd).setRegionServerGroup(targetGroup).build();
procIds.add(master
.modifyTable(oldTd.getTableName(), newTd, HConstants.NO_NONCE, HConstants.NO_NONCE));
}
return procIds;
}

private void waitForProcedureCompletion(List<Long> procIds) throws IOException {
for (long procId : procIds) {
Procedure<?> proc = master.getMasterProcedureExecutor().getProcedure(procId);
if (proc == null) {
continue;
}
ProcedureSyncWait
.waitForProcedureToCompleteIOE(master.getMasterProcedureExecutor(), proc, Long.MAX_VALUE);
}
}
}
Loading