Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d48e7f7
HDDS-1577. Add default pipeline placement policy implementation. (#1366)
timmylicheng Sep 5, 2019
260938e
HDDS-1571. Create an interface for pipeline placement policy to suppo…
timmylicheng Sep 10, 2019
7576f4d
HDDS-2089: Add createPipeline CLI. (#1418)
timmylicheng Sep 12, 2019
3749f40
HDDS-1569 Support creating multiple pipelines with same datanode. Con…
timmylicheng Oct 29, 2019
e1b168f
HDDS-1572 Implement a Pipeline scrubber to clean up non-OPEN pipeline…
timmylicheng Nov 27, 2019
41a30ca
Rebase Fix
ChenSammi Dec 3, 2019
90f794f
HDDS-2650 Fix createPipeline CLI. (#340)
timmylicheng Dec 12, 2019
e720e7a
HDDS-2035 Implement datanode level CLI to reveal pipeline relation. (…
timmylicheng Dec 18, 2019
5c6d412
Revert "HDDS-2650 Fix createPipeline CLI. (#340)"
ChenSammi Dec 18, 2019
1a27b25
HDDS-2650 Fix createPipeline CLI and make it message based. (#370)
timmylicheng Dec 18, 2019
0bd5714
HDDS-1574 Average out pipeline allocation on datanodes and add metrcs…
timmylicheng Dec 18, 2019
605f960
Resolve rebase conflict.
timmylicheng Dec 23, 2019
8f57dbb
HDDS-2756. Handle pipeline creation failure in different way when it …
timmylicheng Jan 8, 2020
6613b05
HDDS-2115 Add acceptance test for createPipeline CLI and datanode lis…
timmylicheng Jan 8, 2020
71aa879
HDDS-2772 Better management for pipeline creation limitation. (#410)
timmylicheng Jan 10, 2020
484ab00
HDDS-2913 Update config names and CLI for multi-raft feature. (#462)
timmylicheng Jan 27, 2020
8211fcd
HDDS-2924. Fix Pipeline#nodeIdsHash collision issue. (#478)
xiaoyuyao Jan 27, 2020
a92058e
HDDS-2923 Add fall-back protection for rack awareness in pipeline cre…
timmylicheng Feb 10, 2020
378ee1e
HDDS-3007 Fix CI test failure for TestSCMNodeManager. (#550)
timmylicheng Feb 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
* the License.
*/

package org.apache.hadoop.hdds.scm.container.placement.algorithms;
package org.apache.hadoop.hdds.scm;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;

import java.io.IOException;
import java.util.List;

/**
* A ContainerPlacementPolicy support choosing datanodes to build replication
* pipeline with specified constraints.
* A PlacementPolicy support choosing datanodes to build
* pipelines or containers with specified constraints.
*/
public interface ContainerPlacementPolicy {
public interface PlacementPolicy {

/**
* Given the replication factor and size required, return set of datanodes
* that satisfy the nodes and size requirement.
* Given an initial set of datanodes and the size required,
* return set of datanodes that satisfy the nodes and size requirement.
*
* @param excludedNodes - list of nodes to be excluded.
* @param favoredNodes - list of nodes preferred.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,31 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT =
"ozone.scm.pipeline.owner.container.count";
public static final int OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT = 3;
// Pipeline placement policy:
// Upper limit for how many pipelines a datanode can engage in.
public static final String OZONE_DATANODE_PIPELINE_LIMIT =
"ozone.datanode.pipeline.limit";
public static final int OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT = 2;

// Upper limit for how many pipelines can be created
// across the cluster nodes managed by SCM.
// Only for test purpose now.
public static final String OZONE_SCM_RATIS_PIPELINE_LIMIT =
"ozone.scm.ratis.pipeline.limit";
// Setting to zero by default means this limit doesn't take effect.
public static final int OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT = 0;

public static final String
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
"ozone.scm.keyvalue.container.deletion-choosing.policy";

// Max timeout for pipeline to stay at ALLOCATED state before scrubbed.
public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT =
"ozone.scm.pipeline.allocated.timeout";

public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT =
"5m";

public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT =
"ozone.scm.container.creation.lease.timeout";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
package org.apache.hadoop.hdds.scm.pipeline;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -56,6 +59,8 @@ public final class Pipeline {
private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>();
// Current reported Leader for the pipeline
private UUID leaderId;
// Timestamp for pipeline upon creation
private Instant creationTimestamp;

/**
* The immutable properties of pipeline object is used in
Expand All @@ -70,6 +75,7 @@ private Pipeline(PipelineID id, ReplicationType type,
this.factor = factor;
this.state = state;
this.nodeStatus = nodeStatus;
this.creationTimestamp = Instant.now();
}

/**
Expand Down Expand Up @@ -108,6 +114,24 @@ public PipelineState getPipelineState() {
return state;
}

/**
* Return the creation time of pipeline.
*
* @return Creation Timestamp
*/
public Instant getCreationTimestamp() {
return creationTimestamp;
}

/**
* Set the creation timestamp. Only for protobuf now.
*
* @param creationTimestamp
*/
void setCreationTimestamp(Instant creationTimestamp) {
this.creationTimestamp = creationTimestamp;
}

/**
* Return the pipeline leader's UUID.
*
Expand All @@ -133,6 +157,23 @@ public List<DatanodeDetails> getNodes() {
return new ArrayList<>(nodeStatus.keySet());
}

/**
* Return an immutable set of nodes which form this pipeline.
* @return Set of DatanodeDetails
*/
public Set<DatanodeDetails> getNodeSet() {
return Collections.unmodifiableSet(nodeStatus.keySet());
}

/**
* Check if the input pipeline share the same set of datanodes.
* @param pipeline
* @return true if the input pipeline shares the same set of datanodes.
*/
public boolean sameDatanodes(Pipeline pipeline) {
return getNodeSet().equals(pipeline.getNodeSet());
}

/**
* Returns the leader if found else defaults to closest node.
*
Expand Down Expand Up @@ -221,6 +262,7 @@ public HddsProtos.Pipeline getProtobufMessage()
.setFactor(factor)
.setState(PipelineState.getProtobuf(state))
.setLeaderID(leaderId != null ? leaderId.toString() : "")
.setCreationTimeStamp(creationTimestamp.toEpochMilli())
.addAllMembers(nodeStatus.keySet().stream()
.map(DatanodeDetails::getProtoBufMessage)
.collect(Collectors.toList()));
Expand Down Expand Up @@ -256,6 +298,7 @@ public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline)
.setNodes(pipeline.getMembersList().stream()
.map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList()))
.setNodesInOrder(pipeline.getMemberOrdersList())
.setCreateTimestamp(pipeline.getCreationTimeStamp())
.build();
}

Expand Down Expand Up @@ -299,7 +342,8 @@ public String toString() {
b.append(", Factor:").append(getFactor());
b.append(", State:").append(getPipelineState());
b.append(", leaderId:").append(getLeaderId());
b.append(" ]");
b.append(", CreationTimestamp").append(getCreationTimestamp());
b.append("]");
return b.toString();
}

Expand All @@ -323,6 +367,7 @@ public static class Builder {
private List<Integer> nodeOrder = null;
private List<DatanodeDetails> nodesInOrder = null;
private UUID leaderId = null;
private Instant creationTimestamp = null;

public Builder() {}

Expand All @@ -334,6 +379,7 @@ public Builder(Pipeline pipeline) {
this.nodeStatus = pipeline.nodeStatus;
this.nodesInOrder = pipeline.nodesInOrder.get();
this.leaderId = pipeline.getLeaderId();
this.creationTimestamp = pipeline.getCreationTimestamp();
}

public Builder setId(PipelineID id1) {
Expand Down Expand Up @@ -372,6 +418,11 @@ public Builder setNodesInOrder(List<Integer> orders) {
return this;
}

public Builder setCreateTimestamp(long createTimestamp) {
this.creationTimestamp = Instant.ofEpochMilli(createTimestamp);
return this;
}

public Pipeline build() {
Preconditions.checkNotNull(id);
Preconditions.checkNotNull(type);
Expand All @@ -380,6 +431,10 @@ public Pipeline build() {
Preconditions.checkNotNull(nodeStatus);
Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
pipeline.setLeaderId(leaderId);
// overwrite with original creationTimestamp
if (creationTimestamp != null) {
pipeline.setCreationTimestamp(creationTimestamp);
}

if (nodeOrder != null && !nodeOrder.isEmpty()) {
// This branch is for build from ProtoBuf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public enum SCMAction implements AuditAction {
GET_CONTAINER,
GET_CONTAINER_WITH_PIPELINE,
LIST_CONTAINER,
CREATE_PIPELINE,
LIST_PIPELINE,
CLOSE_PIPELINE,
ACTIVATE_PIPELINE,
Expand Down
1 change: 1 addition & 0 deletions hadoop-hdds/common/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ message Pipeline {
required PipelineID id = 5;
optional string leaderID = 6;
repeated uint32 memberOrders = 7;
optional uint64 creationTimeStamp = 8;
}

message KeyValue {
Expand Down
35 changes: 33 additions & 2 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -776,9 +776,11 @@
</value>
<tag>OZONE, MANAGEMENT</tag>
<description>
The full name of class which implements org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy.
The full name of class which implements
org.apache.hadoop.hdds.scm.PlacementPolicy.
The class decides which datanode will be used to host the container replica. If not set,
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom will be used as default value.
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom will be used as default
value.
</description>
</property>
<property>
Expand All @@ -788,6 +790,35 @@
<description>Number of containers per owner in a pipeline.
</description>
</property>
<property>
<name>ozone.datanode.pipeline.limit</name>
<value>2</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>Max number of pipelines per datanode can be engaged in.
</description>
</property>
<property>
<name>ozone.scm.ratis.pipeline.limit</name>
<value>0</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>Upper limit for how many pipelines can be OPEN in SCM.
0 as default means there is no limit. Otherwise, the number is the limit
of max amount of pipelines which are OPEN.
</description>
</property>
<property>
<name>ozone.scm.pipeline.allocated.timeout</name>
<value>5m</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>
Timeout for every pipeline to stay in ALLOCATED stage. When pipeline is created,
it should be at OPEN stage once pipeline report is successfully received by SCM.
If a pipeline stays at ALLOCATED longer than the specified period of time,
it should be scrubbed so that new pipeline can be created.
This timeout is for how long pipeline can stay at ALLOCATED
stage until it gets scrubbed.
</description>
</property>
<property>
<name>ozone.scm.container.size</name>
<value>5GB</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* the License.
*/

package org.apache.hadoop.hdds.scm.container.placement.algorithms;
package org.apache.hadoop.hdds.scm;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -33,25 +33,25 @@
import java.util.stream.Collectors;

/**
* SCM CommonPolicy implements a set of invariants which are common
* for all container placement policies, acts as the repository of helper
* This policy implements a set of invariants which are common
* for all basic placement policies, acts as the repository of helper
* functions which are common to placement policies.
*/
public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
public abstract class SCMCommonPlacementPolicy implements PlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMCommonPolicy.class);
LoggerFactory.getLogger(SCMCommonPlacementPolicy.class);
private final NodeManager nodeManager;
private final Random rand;
private final Configuration conf;

/**
* Constructs SCM Common Policy Class.
* Constructor.
*
* @param nodeManager NodeManager
* @param conf Configuration class.
*/
public SCMCommonPolicy(NodeManager nodeManager, Configuration conf) {
public SCMCommonPlacementPolicy(NodeManager nodeManager, Configuration conf) {
this.nodeManager = nodeManager;
this.rand = new Random();
this.conf = conf;
Expand Down Expand Up @@ -85,7 +85,7 @@ public Configuration getConf() {
}

/**
* Given the replication factor and size required, return set of datanodes
* Given size required, return set of datanodes
* that satisfy the nodes and size requirement.
* <p>
* Here are some invariants of container placement.
Expand Down Expand Up @@ -149,7 +149,7 @@ public List<DatanodeDetails> chooseDatanodes(
* @param datanodeDetails DatanodeDetails
* @return true if we have enough space.
*/
boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
public boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
long sizeRequired) {
SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails);
return (nodeMetric != null) && (nodeMetric.get() != null)
Expand All @@ -164,7 +164,7 @@ boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
* @param nodesRequired - Nodes Required
* @param healthyNodes - List of Nodes in the result set.
* @return List of Datanodes that can be used for placement.
* @throws SCMException
* @throws SCMException SCMException
*/
public List<DatanodeDetails> getResultSet(
int nodesRequired, List<DatanodeDetails> healthyNodes)
Expand All @@ -190,8 +190,7 @@ public List<DatanodeDetails> getResultSet(

/**
* Choose a datanode according to the policy, this function is implemented
* by the actual policy class. For example, PlacementCapacity or
* PlacementRandom.
* by the actual policy class.
*
* @param healthyNodes - Set of healthy nodes we can choose from.
* @return DatanodeDetails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,13 @@ public AllocatedBlock allocateBlock(final long size, ReplicationType type,
// TODO: #CLUTIL Remove creation logic when all replication types and
// factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(type, factor);

// wait until pipeline is ready
pipelineManager.waitPipelineReady(pipeline.getId(), 0);
} catch (SCMException se) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. " +
"Datanodes may be used up.", type, factor, se);
break;
} catch (IOException e) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
"get pipelines call once.", type, factor, e);
Expand Down
Loading