Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -72,6 +73,8 @@ default List<ContainerInfo> getContainers() {
List<ContainerInfo> getContainers(ContainerID startID, int count);


List<ContainerInfo> getContainers(ReplicationType type);

/**
* Returns all the containers which are in the specified state.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
Expand Down Expand Up @@ -147,6 +148,12 @@ public ContainerInfo getContainer(final ContainerID id)
id + " not found."));
}


@Override
public List<ContainerInfo> getContainers(ReplicationType type) {
return toContainers(containerStateManager.getContainerIDs(type));
}

@Override
public List<ContainerInfo> getContainers(final ContainerID startID,
final int count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;

import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.metadata.Replicate;
Expand Down Expand Up @@ -114,6 +115,12 @@ public interface ContainerStateManager {
*/
Set<ContainerID> getContainerIDs(LifeCycleState state);


/**
* Returns the IDs of the Containers whose ReplicationType matches the given type.
*/
Set<ContainerID> getContainerIDs(ReplicationType type);

/**
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
Expand Down Expand Up @@ -287,6 +288,13 @@ public Set<ContainerID> getContainerIDs(final LifeCycleState state) {
}
}

@Override
public Set<ContainerID> getContainerIDs(final ReplicationType type) {
try (AutoCloseableLock ignored = readLock()) {
return containers.getContainerIDsByType(type);
}
}

@Override
public ContainerInfo getContainer(final ContainerID id) {
try (AutoCloseableLock ignored = readLock(id)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ NavigableSet<ContainerID> getContainerIDsByOwner(final String ownerName) {
* @param type - Replication type -- StandAlone, Ratis etc.
* @return NavigableSet
*/
NavigableSet<ContainerID> getContainerIDsByType(final ReplicationType type) {
public NavigableSet<ContainerID> getContainerIDsByType(final ReplicationType type) {
Preconditions.checkNotNull(type);
return typeMap.getCollection(type);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -27,11 +27,11 @@
import java.util.stream.Collectors;

import com.google.common.collect.Sets;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
Expand All @@ -48,101 +48,113 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT_DEFAULT;

/**
* Class defining Safe mode exit criteria for Containers.
*/
public class ContainerSafeModeRule extends
SafeModeExitRule<NodeRegistrationContainerReport> {

public static final Logger LOG =
LoggerFactory.getLogger(ContainerSafeModeRule.class);
public static final Logger LOG = LoggerFactory.getLogger(ContainerSafeModeRule.class);
private final ContainerManager containerManager;
// Required cutoff % for containers with at least 1 reported replica.
private double safeModeCutoff;
private final double safeModeCutoff;
// Containers read from scm db (excluding containers in ALLOCATED state).
private Set<Long> ratisContainers;
private Set<Long> ecContainers;
private Map<Long, Set<UUID>> ecContainerDNsMap;
private final Set<Long> ratisContainers;
private final Set<Long> ecContainers;
private final Map<Long, Set<UUID>> ecContainerDNsMap;
private final AtomicLong ratisContainerWithMinReplicas = new AtomicLong(0);
private final AtomicLong ecContainerWithMinReplicas = new AtomicLong(0);

private double ratisMaxContainer;
private double ecMaxContainer;
private AtomicLong ratisContainerWithMinReplicas = new AtomicLong(0);
private AtomicLong ecContainerWithMinReplicas = new AtomicLong(0);
private final ContainerManager containerManager;

public ContainerSafeModeRule(String ruleName, EventQueue eventQueue,
ConfigurationSource conf,
ContainerManager containerManager, SCMSafeModeManager manager) {
this(ruleName, eventQueue, conf, containerManager.getContainers(), containerManager, manager);
}

public ContainerSafeModeRule(String ruleName, EventQueue eventQueue,
ConfigurationSource conf,
List<ContainerInfo> containers,
ContainerManager containerManager, SCMSafeModeManager manager) {
public ContainerSafeModeRule(final String ruleName,
final EventQueue eventQueue,
final ConfigurationSource conf,
final ContainerManager containerManager,
final SCMSafeModeManager manager) {
super(manager, ruleName, eventQueue);
this.safeModeCutoff = getSafeModeCutoff(conf);
this.containerManager = containerManager;
safeModeCutoff = conf.getDouble(
HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT,
HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT_DEFAULT);

Preconditions.checkArgument(
(safeModeCutoff >= 0.0 && safeModeCutoff <= 1.0),
HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT +
" value should be >= 0.0 and <= 1.0");
this.ratisContainers = new HashSet<>();
this.ecContainers = new HashSet<>();
this.ecContainerDNsMap = new ConcurrentHashMap<>();
initializeRule();
}

ratisContainers = new HashSet<>();
ecContainers = new HashSet<>();
ecContainerDNsMap = new ConcurrentHashMap<>();

initializeRule(containers);
private static double getSafeModeCutoff(ConfigurationSource conf) {
final double cutoff = conf.getDouble(HDDS_SCM_SAFEMODE_THRESHOLD_PCT,
HDDS_SCM_SAFEMODE_THRESHOLD_PCT_DEFAULT);
Preconditions.checkArgument((cutoff >= 0.0 && cutoff <= 1.0),
HDDS_SCM_SAFEMODE_THRESHOLD_PCT +
" value should be >= 0.0 and <= 1.0");
return cutoff;
}


@Override
protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
return SCMEvents.CONTAINER_REGISTRATION_REPORT;
}

@Override
protected synchronized boolean validate() {
return (getCurrentContainerThreshold() >= safeModeCutoff) &&
(getCurrentECContainerThreshold() >= safeModeCutoff);
if (validateBasedOnReportProcessing()) {
return (getCurrentContainerThreshold() >= safeModeCutoff) &&
(getCurrentECContainerThreshold() >= safeModeCutoff);
}

// TODO: Split ContainerSafeModeRule into RatisContainerSafeModeRule and
// ECContainerSafeModeRule
final List<ContainerInfo> containers = containerManager.getContainers(
ReplicationType.RATIS);

return containers.stream()
.filter(this::isClosed)
.map(ContainerInfo::containerID)
.noneMatch(this::isMissing);
}

@VisibleForTesting
public synchronized double getCurrentContainerThreshold() {
if (ratisMaxContainer == 0) {
return 1;
/**
* Checks if the container has any replica.
*/
private boolean isMissing(ContainerID id) {
try {
return containerManager.getContainerReplicas(id).isEmpty();
} catch (ContainerNotFoundException ex) {
/*
* This should never happen, in case this happens the container
* somehow got removed from SCM.
* Safemode rule doesn't have to log/fix this. We will just exclude this
* from the rule validation.
*/
return false;

}
return (ratisContainerWithMinReplicas.doubleValue() / ratisMaxContainer);
}

@VisibleForTesting
public synchronized double getCurrentECContainerThreshold() {
if (ecMaxContainer == 0) {
return 1;
}
return (ecContainerWithMinReplicas.doubleValue() / ecMaxContainer);
public double getCurrentContainerThreshold() {
return ratisMaxContainer == 0 ? 1 :
(ratisContainerWithMinReplicas.doubleValue() / ratisMaxContainer);
}

private synchronized double getEcMaxContainer() {
if (ecMaxContainer == 0) {
return 1;
}
return ecMaxContainer;
@VisibleForTesting
public double getCurrentECContainerThreshold() {
return ecMaxContainer == 0 ? 1 :
(ecContainerWithMinReplicas.doubleValue() / ecMaxContainer);
}

private synchronized double getRatisMaxContainer() {
if (ratisMaxContainer == 0) {
return 1;
}
return ratisMaxContainer;
}

// TODO: Report processing logic will be removed in future. HDDS-11958.
@Override
protected synchronized void process(
NodeRegistrationContainerReport reportsProto) {
DatanodeDetails datanodeDetails = reportsProto.getDatanodeDetails();
UUID datanodeUUID = datanodeDetails.getUuid();
final NodeRegistrationContainerReport reportsProto) {
final DatanodeDetails datanodeDetails = reportsProto.getDatanodeDetails();
final UUID datanodeUUID = datanodeDetails.getUuid();
StorageContainerDatanodeProtocolProtos.ContainerReportsProto report = reportsProto.getReport();

report.getReportsList().forEach(c -> {
Expand All @@ -166,9 +178,7 @@ protected synchronized void process(
SCMSafeModeManager.getLogger().info(
"SCM in safe mode. {} % containers [Ratis] have at least one"
+ " reported replica, {} % containers [EC] have at N reported replica.",
((ratisContainerWithMinReplicas.doubleValue() / getRatisMaxContainer()) * 100),
((ecContainerWithMinReplicas.doubleValue() / getEcMaxContainer()) * 100)
);
getCurrentContainerThreshold() * 100, getCurrentECContainerThreshold() * 100);
}
}

Expand Down Expand Up @@ -246,8 +256,8 @@ public String getStatusText() {
String status = String.format(
"%1.2f%% of [Ratis] Containers(%s / %s) with at least one reported replica (=%1.2f) >= " +
"safeModeCutoff (=%1.2f);",
(ratisContainerWithMinReplicas.doubleValue() / getRatisMaxContainer()) * 100,
ratisContainerWithMinReplicas, (long) getRatisMaxContainer(),
getCurrentContainerThreshold() * 100,
ratisContainerWithMinReplicas, (long) ratisMaxContainer,
getCurrentContainerThreshold(), this.safeModeCutoff);

Set<Long> sampleRatisContainers = ratisContainers.stream().
Expand All @@ -264,8 +274,8 @@ public String getStatusText() {
String ecStatus = String.format(
"%1.2f%% of [EC] Containers(%s / %s) with at least N reported replica (=%1.2f) >= " +
"safeModeCutoff (=%1.2f);",
(ecContainerWithMinReplicas.doubleValue() / getEcMaxContainer()) * 100,
ecContainerWithMinReplicas, (long) getEcMaxContainer(),
getCurrentECContainerThreshold() * 100,
ecContainerWithMinReplicas, (long) ecMaxContainer,
getCurrentECContainerThreshold(), this.safeModeCutoff);
status = status.concat("\n").concat(ecStatus);

Expand Down Expand Up @@ -295,25 +305,19 @@ public String getStatusText() {

@Override
public synchronized void refresh(boolean forceRefresh) {
List<ContainerInfo> containers = containerManager.getContainers();
if (forceRefresh) {
initializeRule(containers);
} else {
if (!validate()) {
initializeRule(containers);
}
if (forceRefresh || !validate()) {
initializeRule();
}
}

private boolean checkContainerState(LifeCycleState state) {
if (state == LifeCycleState.QUASI_CLOSED || state == LifeCycleState.CLOSED) {
return true;
}
return false;
private boolean isClosed(ContainerInfo container) {
final LifeCycleState state = container.getState();
return state == LifeCycleState.QUASI_CLOSED ||
state == LifeCycleState.CLOSED;
}

private void initializeRule(List<ContainerInfo> containers) {

private void initializeRule() {
final List<ContainerInfo> containers = containerManager.getContainers();
// Clean up the related data in the map.
ratisContainers.clear();
ecContainers.clear();
Expand All @@ -325,10 +329,9 @@ private void initializeRule(List<ContainerInfo> containers) {
// created by the client. We are not considering these containers for
// now. These containers can be handled by tracking pipelines.

LifeCycleState containerState = container.getState();
HddsProtos.ReplicationType replicationType = container.getReplicationType();

if (checkContainerState(containerState) && container.getNumberOfKeys() > 0) {
if (isClosed(container) && container.getNumberOfKeys() > 0) {
// If it's of type Ratis
if (replicationType.equals(HddsProtos.ReplicationType.RATIS)) {
ratisContainers.add(container.getContainerID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -28,7 +27,6 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
Expand Down Expand Up @@ -105,7 +103,6 @@ public class SCMSafeModeManager implements SafeModeManager {
private Set<String> validatedPreCheckRules = new HashSet<>(1);

private final EventQueue eventPublisher;
private final PipelineManager pipelineManager;
private final SCMServiceManager serviceManager;
private final SCMContext scmContext;

Expand All @@ -114,12 +111,10 @@ public class SCMSafeModeManager implements SafeModeManager {

// TODO: Remove allContainers argument. (HDDS-11795)
public SCMSafeModeManager(ConfigurationSource conf,
List<ContainerInfo> allContainers,
ContainerManager containerManager, PipelineManager pipelineManager,
EventQueue eventQueue, SCMServiceManager serviceManager,
SCMContext scmContext) {
this.config = conf;
this.pipelineManager = pipelineManager;
this.eventPublisher = eventQueue;
this.serviceManager = serviceManager;
this.scmContext = scmContext;
Expand Down
Loading
Loading